package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.testjob.YarnTestJob;
import org.apache.flink.yarn.util.TestUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YARNHighAvailabilityITCase.class */
class YARNHighAvailabilityITCase extends YarnTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(YARNHighAvailabilityITCase.class);
    private static final String LOG_DIR = "flink-yarn-tests-ha";
    private static TestingServer zkServer;
    private static String storageDir;
    private YarnTestJob.StopJobSignal stopJobSignal;
    private JobGraph job;

    YARNHighAvailabilityITCase() {
    }

    @BeforeAll
    static void setup(@TempDir File file) throws Exception {
        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
        storageDir = file.getAbsolutePath();
        YARN_CONFIGURATION.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        YARN_CONFIGURATION.set("flink-yarn-minicluster-name", LOG_DIR);
        YARN_CONFIGURATION.setInt("yarn.nodemanager.resource.memory-mb", 4096);
        startYARNWithConfig(YARN_CONFIGURATION);
    }

    @AfterAll
    static void teardown() throws Exception {
        try {
            YarnTestBase.teardown();
            if (zkServer != null) {
                zkServer.close();
                zkServer = null;
            }
        } catch (Throwable th) {
            if (zkServer != null) {
                zkServer.close();
                zkServer = null;
            }
            throw th;
        }
    }

    @BeforeEach
    void setUp(@TempDir File file) {
        this.stopJobSignal = YarnTestJob.StopJobSignal.usingMarkerFile(file.toPath());
        this.job = YarnTestJob.stoppableJob(this.stopJobSignal);
        File findFile = TestUtils.findFile("..", new TestUtils.TestJarFinder("flink-yarn-tests"));
        Assertions.assertThat(findFile).isNotNull();
        this.job.addJar(new Path(findFile.toURI()));
    }

    @Timeout(value = 30, unit = TimeUnit.MINUTES)
    @Test
    void testKillYarnSessionClusterEntrypoint() throws Exception {
        runTest(() -> {
            ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris()).as("This test kills processes via the pkill command. Thus, it only runs on Linux, Mac OS, Free BSD and Solaris.", new Object[0])).isTrue();
            YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
            RestClusterClient<ApplicationId> deploySessionCluster = deploySessionCluster(yarnClusterDescriptor);
            try {
                JobID submitJob = submitJob(deploySessionCluster);
                ApplicationId applicationId = (ApplicationId) deploySessionCluster.getClusterId();
                waitUntilJobIsRunning(deploySessionCluster, submitJob);
                killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
                waitForApplicationAttempt(applicationId, 2);
                waitForJobTermination(deploySessionCluster, submitJob);
                killApplicationAndWait(applicationId);
                deploySessionCluster.close();
            } catch (Throwable th) {
                deploySessionCluster.close();
                throw th;
            }
        });
    }

    @Timeout(value = 30, unit = TimeUnit.MINUTES)
    @Test
    void testJobRecoversAfterKillingTaskManager() throws Exception {
        runTest(() -> {
            RestClusterClient<ApplicationId> deploySessionCluster = deploySessionCluster(setupYarnClusterDescriptor());
            Throwable th = null;
            try {
                JobID submitJob = submitJob(deploySessionCluster);
                waitUntilJobIsRunning(deploySessionCluster, submitJob);
                stopTaskManagerContainer();
                waitUntilJobIsRestarted(deploySessionCluster, submitJob, 1);
                waitForJobTermination(deploySessionCluster, submitJob);
                killApplicationAndWait((ApplicationId) deploySessionCluster.getClusterId());
                if (deploySessionCluster != null) {
                    if (0 == 0) {
                        deploySessionCluster.close();
                        return;
                    }
                    try {
                        deploySessionCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (deploySessionCluster != null) {
                    if (0 != 0) {
                        try {
                            deploySessionCluster.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        deploySessionCluster.close();
                    }
                }
                throw th3;
            }
        });
    }

    @Timeout(value = 30, unit = TimeUnit.MINUTES)
    @Test
    void testClusterClientRetrieval() throws Exception {
        runTest(() -> {
            RestClusterClient<ApplicationId> deploySessionCluster = deploySessionCluster(setupYarnClusterDescriptor());
            ClusterClient clusterClient = null;
            try {
                clusterClient = setupYarnClusterDescriptor().retrieve((ApplicationId) deploySessionCluster.getClusterId()).getClusterClient();
                Assertions.assertThat((Collection) clusterClient.listJobs().join()).isEmpty();
                clusterClient.shutDownCluster();
                deploySessionCluster.close();
                if (clusterClient != null) {
                    clusterClient.close();
                }
            } catch (Throwable th) {
                deploySessionCluster.close();
                if (clusterClient != null) {
                    clusterClient.close();
                }
                throw th;
            }
        });
    }

    private void waitForApplicationAttempt(ApplicationId applicationId, int i) throws Exception {
        YarnClient yarnClient = getYarnClient();
        Preconditions.checkState(yarnClient != null, "yarnClient must be initialized");
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(yarnClient.getApplicationReport(applicationId).getCurrentApplicationAttemptId().getAttemptId() >= i);
        });
        LOG.info("Attempt {} id detected.", Integer.valueOf(i));
    }

    private void stopTaskManagerContainer() throws Exception {
        ContainerId containerId = null;
        NodeManager nodeManager = null;
        TokenIdentifier tokenIdentifier = null;
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        for (int i = 0; i < 2; i++) {
            NodeManager nodeManager2 = yarnCluster.getNodeManager(i);
            for (Map.Entry entry : nodeManager2.getNMContext().getContainers().entrySet()) {
                if (StringUtils.join(((Container) entry.getValue()).getLaunchContext().getCommands(), " ").contains(YarnTaskExecutorRunner.class.getSimpleName())) {
                    containerId = (ContainerId) entry.getKey();
                    nodeManager = nodeManager2;
                    tokenIdentifier = new NMTokenIdentifier(containerId.getApplicationAttemptId(), (NodeId) null, "", 0);
                    currentUser.addTokenIdentifier(tokenIdentifier);
                }
            }
        }
        Assertions.assertThat(containerId).isNotNull();
        Assertions.assertThat(nodeManager).isNotNull();
        nodeManager.getNMContext().getContainerManager().stopContainers(StopContainersRequest.newInstance(Collections.singletonList(containerId)));
        currentUser.getTokenIdentifiers().remove(tokenIdentifier);
    }

    private void killApplicationAndWait(ApplicationId applicationId) throws Exception {
        YarnClient yarnClient = getYarnClient();
        Preconditions.checkState(yarnClient != null, "yarnClient must be initialized");
        yarnClient.killApplication(applicationId);
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(!getApplicationReportWithRetryOnNPE(yarnClient, EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FINISHED)).isEmpty());
        });
    }

    private void waitForJobTermination(RestClusterClient<ApplicationId> restClusterClient, JobID jobID) throws Exception {
        LOG.info("Sending stop job signal");
        this.stopJobSignal.signal();
        restClusterClient.requestJobResult(jobID).get(200L, TimeUnit.SECONDS);
    }

    @Nonnull
    private YarnClusterDescriptor setupYarnClusterDescriptor() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768L));
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
        configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
        configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
        configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 20000);
        configuration.setString(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
        configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
        return createYarnClusterDescriptor(configuration);
    }

    private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException {
        RestClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deploySessionCluster(new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(((MemorySize) yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY)).getMebiBytes()).setTaskManagerMemoryMB(1024).setSlotsPerTaskManager(1).createClusterSpecification()).getClusterClient();
        Assertions.assertThat(clusterClient).isInstanceOf(RestClusterClient.class);
        return clusterClient;
    }

    private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient) throws InterruptedException, ExecutionException {
        return (JobID) restClusterClient.submitJob(this.job).get();
    }

    private void killApplicationMaster(String str) throws Exception {
        Set<Integer> applicationMasterPids = getApplicationMasterPids(str);
        Assertions.assertThat(applicationMasterPids).isNotEmpty();
        Assertions.assertThat(Runtime.getRuntime().exec("pkill -f " + str).waitFor()).isEqualTo(0);
        CommonTestUtils.waitUntilCondition(() -> {
            Set<Integer> applicationMasterPids2 = getApplicationMasterPids(str);
            Stream stream = applicationMasterPids.stream();
            applicationMasterPids2.getClass();
            return Boolean.valueOf(stream.noneMatch((v1) -> {
                return r1.contains(v1);
            }));
        });
    }

    private Set<Integer> getApplicationMasterPids(String str) throws IOException, InterruptedException {
        Process exec = Runtime.getRuntime().exec("pgrep -f " + str);
        return exec.waitFor() != 0 ? Collections.emptySet() : (Set) Arrays.stream(IOUtils.toString(exec.getInputStream(), StandardCharsets.UTF_8).split("\\s+")).map(Integer::valueOf).collect(Collectors.toSet());
    }

    private static void waitUntilJobIsRunning(RestClusterClient<ApplicationId> restClusterClient, JobID jobID) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            JobDetailsInfo jobDetailsInfo = (JobDetailsInfo) restClusterClient.getJobDetails(jobID).get();
            return Boolean.valueOf(jobDetailsInfo.getJobStatus() == JobStatus.RUNNING && jobDetailsInfo.getJobVertexInfos().stream().map(toExecutionState()).allMatch(isRunning()));
        });
    }

    private static Function<JobDetailsInfo.JobVertexDetailsInfo, ExecutionState> toExecutionState() {
        return (v0) -> {
            return v0.getExecutionState();
        };
    }

    private static Predicate<ExecutionState> isRunning() {
        return executionState -> {
            return executionState == ExecutionState.RUNNING;
        };
    }

    private static void waitUntilJobIsRestarted(RestClusterClient<ApplicationId> restClusterClient, JobID jobID, int i) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(getJobFullRestarts(restClusterClient, jobID) >= i);
        });
    }

    private static int getJobFullRestarts(RestClusterClient<ApplicationId> restClusterClient, JobID jobID) throws Exception {
        return ((Integer) getJobMetric(restClusterClient, jobID, "fullRestarts").map((v0) -> {
            return v0.getValue();
        }).map(Integer::parseInt).orElse(0)).intValue();
    }

    private static Optional<Metric> getJobMetric(RestClusterClient<ApplicationId> restClusterClient, JobID jobID, String str) throws Exception {
        JobMetricsMessageParameters jobMetricsMessageParameters = new JobMetricsMessageParameters();
        jobMetricsMessageParameters.jobPathParameter.resolve(jobID);
        jobMetricsMessageParameters.metricsFilterParameter.resolveFromString(str);
        Metric metric = (Metric) Iterables.getOnlyElement(((MetricCollectionResponseBody) restClusterClient.sendRequest(JobMetricsHeaders.getInstance(), jobMetricsMessageParameters, EmptyRequestBody.getInstance()).get()).getMetrics(), (Object) null);
        Preconditions.checkState(metric == null || metric.getId().equals(str));
        return Optional.ofNullable(metric);
    }
}
