package org.apache.flink.yarn;

import java.time.Duration;
import java.util.Arrays;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.util.TestUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/yarn/YARNFileReplicationITCase.class */
class YARNFileReplicationITCase extends YarnTestBase {
    private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
    private static final int sleepIntervalInMS = 100;

    YARNFileReplicationITCase() {
    }

    @BeforeAll
    public static void setup() {
        YARN_CONFIGURATION.set("flink-yarn-minicluster-name", "flink-yarn-tests-per-job");
        startYARNWithConfig(YARN_CONFIGURATION, true);
    }

    @Test
    void testPerJobModeWithCustomizedFileReplication() throws Exception {
        Configuration defaultConfiguration = getDefaultConfiguration();
        defaultConfiguration.setInteger(YarnConfigOptions.FILE_REPLICATION, 4);
        runTest(() -> {
            deployPerJob(defaultConfiguration, getTestingJobGraph());
        });
    }

    @Test
    void testPerJobModeWithDefaultFileReplication() throws Exception {
        runTest(() -> {
            deployPerJob(getDefaultConfiguration(), getTestingJobGraph());
        });
    }

    private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws Exception {
        YarnClusterDescriptor createYarnClusterDescriptor = createYarnClusterDescriptor(configuration);
        Throwable th = null;
        try {
            createYarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
            createYarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
            ClusterSpecification createClusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(((MemorySize) createYarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY)).getMebiBytes()).setTaskManagerMemoryMB(1024).setSlotsPerTaskManager(1).createClusterSpecification();
            jobGraph.addJar(new org.apache.flink.core.fs.Path(TestUtils.findFile("..", new TestUtils.TestJarFinder("flink-yarn-tests")).toURI()));
            ClusterClient clusterClient = createYarnClusterDescriptor.deployJobCluster(createClusterSpecification, jobGraph, false).getClusterClient();
            Throwable th2 = null;
            try {
                try {
                    ApplicationId applicationId = (ApplicationId) clusterClient.getClusterId();
                    extraVerification(configuration, applicationId);
                    JobResult jobResult = (JobResult) clusterClient.requestJobResult(jobGraph.getJobID()).get();
                    Assertions.assertThat(jobResult).isNotNull();
                    jobResult.getSerializedThrowable().ifPresent(serializedThrowable -> {
                        throw new AssertionError("Job failed", serializedThrowable.deserializeError(YARNFileReplicationITCase.class.getClassLoader()));
                    });
                    waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, createYarnClusterDescriptor, sleepIntervalInMS);
                    if (clusterClient != null) {
                        if (0 != 0) {
                            try {
                                clusterClient.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            clusterClient.close();
                        }
                    }
                    if (createYarnClusterDescriptor != null) {
                        if (0 == 0) {
                            createYarnClusterDescriptor.close();
                            return;
                        }
                        try {
                            createYarnClusterDescriptor.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (clusterClient != null) {
                    if (th2 != null) {
                        try {
                            clusterClient.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        clusterClient.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createYarnClusterDescriptor != null) {
                if (0 != 0) {
                    try {
                        createYarnClusterDescriptor.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createYarnClusterDescriptor.close();
                }
            }
            throw th8;
        }
    }

    private JobGraph getTestingJobGraph() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.addSource(new NoDataSource()).shuffle().addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private Configuration getDefaultConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768L));
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30L));
        configuration.set(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED);
        return configuration;
    }

    private void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception {
        FileSystem fileSystem = FileSystem.get(getYarnConfiguration());
        Path path = new Path(fileSystem.getHomeDirectory(), ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName());
        org.junit.jupiter.api.Assertions.assertTrue(fileSystem.exists(path), "The Flink uber jar needs to exist. If it does not exist, then this indicates that the Flink cluster has already terminated and Yarn has already deleted the working directory.");
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        int integer = configuration.getInteger(YarnConfigOptions.FILE_REPLICATION);
        Assertions.assertThat(fileStatus.getReplication()).isEqualTo(integer > 0 ? integer : YARN_CONFIGURATION.getInt("dfs.replication", 3));
    }
}
