package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.shaded.guava30.com.google.common.net.HostAndPort;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.YarnTestBase;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.util.TestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.class */
class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
    private static RestClient restClient;
    private static ExecutorService restClientExecutor;
    private boolean checkForProhibitedLogContents = true;

    @RegisterExtension
    private final LoggerAuditingExtension cliLoggerAuditingExtension = new LoggerAuditingExtension(CliFrontend.class, Level.INFO);

    @RegisterExtension
    private final LoggerAuditingExtension yarLoggerAuditingExtension = new LoggerAuditingExtension(YarnClusterDescriptor.class, Level.WARN);

    YARNSessionCapacitySchedulerITCase() {
    }

    @BeforeAll
    static void setup() throws Exception {
        YARN_CONFIGURATION.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        YARN_CONFIGURATION.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
        YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
        YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
        YARN_CONFIGURATION.set("flink-yarn-minicluster-name", "flink-yarn-tests-capacityscheduler");
        startYARNWithConfig(YARN_CONFIGURATION);
        restClientExecutor = Executors.newSingleThreadExecutor();
        restClient = new RestClient(new Configuration(), restClientExecutor);
    }

    @AfterAll
    static void teardown() throws Exception {
        try {
            YarnTestBase.teardown();
            if (restClient != null) {
                restClient.shutdown(Time.seconds(5L));
            }
            if (restClientExecutor != null) {
                restClientExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            if (restClient != null) {
                restClient.shutdown(Time.seconds(5L));
            }
            if (restClientExecutor != null) {
                restClientExecutor.shutdownNow();
            }
            throw th;
        }
    }

    @Test
    void testStartYarnSessionClusterInQaTeamQueue() throws Exception {
        runTest(() -> {
            runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-qu", "qa-team"}, "JobManager Web Interface:", null, YarnTestBase.RunTypes.YARN_SESSION, 0);
        });
    }

    @Test
    void perJobYarnCluster() throws Exception {
        runTest(() -> {
            LOG.info("Starting perJobYarnCluster()");
            YarnTestBase.RunTypes runTypes = YarnTestBase.RunTypes.CLI_FRONTEND;
            LoggerAuditingExtension loggerAuditingExtension = this.cliLoggerAuditingExtension;
            loggerAuditingExtension.getClass();
            runWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-ys", "2", "-yjm", "768m", "-ytm", "1024m", TestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath()}, "Program execution finished", new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, runTypes, 0, loggerAuditingExtension::getMessages);
            LOG.info("Finished perJobYarnCluster()");
        });
    }

    @Test
    void perJobYarnClusterOffHeap() throws Exception {
        runTest(() -> {
            LOG.info("Starting perJobYarnCluster()");
            YarnTestBase.RunTypes runTypes = YarnTestBase.RunTypes.CLI_FRONTEND;
            LoggerAuditingExtension loggerAuditingExtension = this.cliLoggerAuditingExtension;
            loggerAuditingExtension.getClass();
            runWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-ys", "2", "-yjm", "768m", "-ytm", "1024m", TestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath()}, "Program execution finished", new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, runTypes, 0, loggerAuditingExtension::getMessages);
            LOG.info("Finished perJobYarnCluster()");
        });
    }

    @Test
    void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception {
        runTest(() -> {
            this.checkForProhibitedLogContents = false;
            YarnTestBase.Runner startWithArgs = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-s", "3", "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-D" + YarnConfigOptions.VCORES.key() + "=2"}, "JobManager Web Interface:", YarnTestBase.RunTypes.YARN_SESSION);
            try {
                HostAndPort parseJobManagerHostname = parseJobManagerHostname(outContent.toString());
                String host = parseJobManagerHostname.getHost();
                int port = parseJobManagerHostname.getPort();
                LOG.info("Extracted hostname:port: {}:{}", host, Integer.valueOf(port));
                submitJob("WindowJoin.jar");
                Assertions.assertThat(getOnlyApplicationReport().getName()).isEqualTo("customName");
                waitForTaskManagerRegistration(host, port);
                assertNumberOfSlotsPerTask(host, port, 3);
                Assertions.assertThat(getFlinkConfig(host, port)).containsEntry("fancy-configuration-value", "veryFancy").containsEntry(YarnConfigOptions.VCORES.key(), "2").containsEntry(JobManagerOptions.ADDRESS.key(), host);
                startWithArgs.sendStop();
                startWithArgs.join();
            } catch (Throwable th) {
                startWithArgs.sendStop();
                startWithArgs.join();
                throw th;
            }
        });
    }

    private static HostAndPort parseJobManagerHostname(String str) {
        String str2;
        Matcher matcher = Pattern.compile("JobManager Web Interface: http://([a-zA-Z0-9.-]+):([0-9]+)").matcher(str);
        String str3 = null;
        String str4 = null;
        while (true) {
            str2 = str4;
            if (!matcher.find()) {
                break;
            }
            str3 = matcher.group(1).toLowerCase();
            str4 = matcher.group(2);
        }
        Preconditions.checkState(str3 != null, "hostname not found in log");
        Preconditions.checkState(str2 != null, "port not found in log");
        return HostAndPort.fromParts(str3, Integer.parseInt(str2));
    }

    private void submitJob(String str) throws IOException, InterruptedException {
        startWithArgs(new String[]{"run", "--detached", TestUtils.getTestJarPath(str).getAbsolutePath()}, "Job has been submitted with JobID", YarnTestBase.RunTypes.CLI_FRONTEND).join();
    }

    private static void waitForTaskManagerRegistration(String str, int i) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(getNumberOfTaskManagers(str, i) > 0);
        });
    }

    private static void assertNumberOfSlotsPerTask(String str, int i, int i2) throws Exception {
        try {
            CommonTestUtils.waitUntilCondition(() -> {
                return Boolean.valueOf(getNumberOfSlotsPerTaskManager(str, i) == i2);
            });
        } catch (TimeoutException e) {
            Fail.fail(String.format("Expected slots per TM to be %d, was: %d", Integer.valueOf(i2), Integer.valueOf(getNumberOfSlotsPerTaskManager(str, i))));
        }
    }

    private static int getNumberOfTaskManagers(String str, int i) throws Exception {
        return ((ClusterOverviewWithVersion) restClient.sendRequest(str, i, ClusterOverviewHeaders.getInstance()).get(30000L, TimeUnit.MILLISECONDS)).getNumTaskManagersConnected();
    }

    private static int getNumberOfSlotsPerTaskManager(String str, int i) throws Exception {
        return ((Integer) ((TaskManagersInfo) restClient.sendRequest(str, i, TaskManagersHeaders.getInstance()).get()).getTaskManagerInfos().stream().map((v0) -> {
            return v0.getNumberSlots();
        }).findFirst().orElse(0)).intValue();
    }

    private static Map<String, String> getFlinkConfig(String str, int i) throws Exception {
        return (Map) ((ConfigurationInfo) restClient.sendRequest(str, i, ClusterConfigurationInfoHeaders.getInstance()).get()).stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    @Test
    void testNonexistingQueueWARNmessage() throws Exception {
        runTest(() -> {
            LOG.info("Starting testNonexistingQueueWARNmessage()");
            Assertions.assertThatThrownBy(() -> {
                runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-qu", "doesntExist"}, "to unknown queue: doesntExist", null, YarnTestBase.RunTypes.YARN_SESSION, 1);
            }).isInstanceOf(Exception.class).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches("to unknown queue: doesntExist")});
            Assertions.assertThat(this.yarLoggerAuditingExtension.getMessages()).anySatisfy(str -> {
                Assertions.assertThat(str).contains(new CharSequence[]{"The specified queue 'doesntExist' does not exist. Available queues"});
            });
            LOG.info("Finished testNonexistingQueueWARNmessage()");
        });
    }

    @Test
    void perJobYarnClusterWithParallelism() throws Exception {
        runTest(() -> {
            LOG.info("Starting perJobYarnClusterWithParallelism()");
            YarnTestBase.RunTypes runTypes = YarnTestBase.RunTypes.CLI_FRONTEND;
            LoggerAuditingExtension loggerAuditingExtension = this.cliLoggerAuditingExtension;
            loggerAuditingExtension.getClass();
            runWithArgs(new String[]{"run", "-p", "2", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-ys", "2", "-yjm", "768m", "-ytm", "1024m", TestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath()}, "Program execution finished", new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, runTypes, 0, loggerAuditingExtension::getMessages);
            LOG.info("Finished perJobYarnClusterWithParallelism()");
        });
    }

    @Test
    void testDetachedPerJobYarnCluster(@TempDir File file) throws Exception {
        runTest(() -> {
            LOG.info("Starting testDetachedPerJobYarnCluster()");
            testDetachedPerJobYarnClusterInternal(file, TestUtils.getTestJarPath("BatchWordCount.jar").getAbsolutePath());
            LOG.info("Finished testDetachedPerJobYarnCluster()");
        });
    }

    @Test
    void testDetachedPerJobYarnClusterWithStreamingJob(@TempDir File file) throws Exception {
        runTest(() -> {
            LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
            testDetachedPerJobYarnClusterInternal(file, TestUtils.getTestJarPath("StreamingWordCount.jar").getAbsolutePath());
            LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
        });
    }

    private void testDetachedPerJobYarnClusterInternal(File file, String str) throws Exception {
        ApplicationId applicationId;
        ApplicationReport applicationReport;
        YarnClient createYarnClient = YarnClient.createYarnClient();
        createYarnClient.init(YARN_CONFIGURATION);
        createYarnClient.start();
        File file2 = file.toPath().resolve(UUID.randomUUID().toString()).toFile();
        file2.createNewFile();
        try {
            FileUtils.writeStringToFile(file2, "Goethe - Faust: Der Tragoedie erster Teil\nProlog im Himmel.\nDer Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\nErzengel treten vor.\nRAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\nUnd ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\ngibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\nhohen Werke Sind herrlich wie am ersten Tag.\nGABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\nPracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\nschaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\nFels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\nMICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\naufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\nDa flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\ndeine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\nZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden\nmag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.\nMEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie\nalles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So\nsiehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte\nmachen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte\ndich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von\nSonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die\nMenschen plagen. Der kleine Gott der Welt bleibt stets von gleichem\nSchlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser\nwuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;\nEr nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier\nzu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der\nlangbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im\nGras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In\njeden Quark begraebt er seine Nase.\nDER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer\nanzuklagen? Ist auf der Erde ewig dir nichts recht?\nMEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich\nschlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar\ndie armen selbst nicht plagen.\nDER HERR: Kennst du den Faust?\nMEPHISTOPHELES: Den Doktor?\nDER HERR: Meinen Knecht!\nMEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch\nist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er\nist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten\nSterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne\nBefriedigt nicht die tiefbewegte Brust.\nDER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in\ndie Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das\nBluet und Frucht die kuenft'gen Jahre zieren.\nMEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr\nmir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.\nDER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,\nEs irrt der Mensch so lang er strebt.\nMEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals\ngern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer\neinem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.\nDER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem\nUrquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit\nherab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in\nseinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.\nMEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine\nWette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir\nTriumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine\nMuhme, die beruehmte Schlange.\nDER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen\nnie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am\nwenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,\ner liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen\nzu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten\nGoettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das\newig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was\nin schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!\n(Der Himmel schliesst, die Erzengel verteilen sich.)\nMEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und\nhuete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,\nSo menschlich mit dem Teufel selbst zu sprechen.", Charset.defaultCharset());
            YarnTestBase.Runner startWithArgs = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), "-yjm", "768m", "-yD", YarnConfigOptions.APPLICATION_TAGS.key() + "=test-tag", "-ytm", "1024m", "-ys", "2", "-p", "2", "--detached", str, "--input", file2.getAbsoluteFile().toString(), "--output", file.getAbsoluteFile().toString()}, "Job has been submitted with JobID", YarnTestBase.RunTypes.CLI_FRONTEND);
            Assertions.assertThat(getRunningContainers()).isLessThanOrEqualTo(2);
            for (int i = 0; startWithArgs.isAlive() && i < 5; i++) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
            Assertions.assertThat(startWithArgs.isAlive()).isFalse();
            LOG.info("CLI Frontend has returned, so the job is running");
            try {
                List<ApplicationReport> applicationReportWithRetryOnNPE = getApplicationReportWithRetryOnNPE(createYarnClient, EnumSet.of(YarnApplicationState.RUNNING));
                if (applicationReportWithRetryOnNPE.size() == 1) {
                    applicationId = applicationReportWithRetryOnNPE.get(0).getApplicationId();
                    LOG.info("waiting for the job with appId {} to finish", applicationId);
                    while (getApplicationReportWithRetryOnNPE(createYarnClient, EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
                        sleep(500);
                    }
                } else {
                    List<ApplicationReport> applicationReportWithRetryOnNPE2 = getApplicationReportWithRetryOnNPE(createYarnClient);
                    Collections.sort(applicationReportWithRetryOnNPE2, (applicationReport2, applicationReport3) -> {
                        return applicationReport2.getApplicationId().compareTo(applicationReport3.getApplicationId()) * (-1);
                    });
                    applicationId = applicationReportWithRetryOnNPE2.get(0).getApplicationId();
                    LOG.info("Selected {} as the last appId from {}", applicationId, Arrays.toString(applicationReportWithRetryOnNPE2.toArray()));
                }
                ApplicationId applicationId2 = applicationId;
                File[] listFiles = file.listFiles();
                Assertions.assertThat(listFiles).isNotNull();
                LOG.info("The job has finished. TaskManager output files found in {}", file);
                StringBuilder sb = new StringBuilder();
                for (File file3 : listFiles) {
                    if (file3.isFile()) {
                        sb.append(FileUtils.readFileToString(file3, Charset.defaultCharset())).append("\n");
                    }
                }
                File findFile = TestUtils.findFile("..", (file4, str2) -> {
                    return str2.contains("jobmanager.log") && file4.getAbsolutePath().contains(applicationId2.toString());
                });
                Assertions.assertThat(findFile).isNotNull();
                Assertions.assertThat(new StringBuilder(FileUtils.readFileToString(findFile, Charset.defaultCharset())).toString()).contains(new CharSequence[]{"Starting TaskManagers"}).contains(new CharSequence[]{" (2/2) (attempt #0) with attempt id "});
                LOG.info("Checking again that app has finished");
                do {
                    sleep(500);
                    applicationReport = createYarnClient.getApplicationReport(applicationId2);
                    LOG.info("Got report {}", applicationReport);
                } while (applicationReport.getYarnApplicationState() == YarnApplicationState.RUNNING);
                verifyApplicationTags(applicationReport);
                File file5 = new File(System.getenv("FLINK_CONF_DIR"));
                LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file5.getAbsolutePath());
                LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                try {
                    File yarnPropertiesLocation = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration(file5.getAbsolutePath()).getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
                    if (yarnPropertiesLocation.exists()) {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation.getAbsolutePath());
                        yarnPropertiesLocation.delete();
                    }
                } catch (Exception e2) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e2);
                }
                try {
                    LOG.info("testDetachedPerJobYarnClusterInternal: Closing the yarn client");
                    createYarnClient.stop();
                } catch (Exception e3) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while close the yarn client", e3);
                }
            } catch (Throwable th) {
                File file6 = new File(System.getenv("FLINK_CONF_DIR"));
                LOG.info("testDetachedPerJobYarnClusterInternal: Using configuration directory " + file6.getAbsolutePath());
                LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
                try {
                    File yarnPropertiesLocation2 = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration(file6.getAbsolutePath()).getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
                    if (yarnPropertiesLocation2.exists()) {
                        LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesLocation2.getAbsolutePath());
                        yarnPropertiesLocation2.delete();
                    }
                } catch (Exception e4) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e4);
                }
                try {
                    LOG.info("testDetachedPerJobYarnClusterInternal: Closing the yarn client");
                    createYarnClient.stop();
                } catch (Exception e5) {
                    LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while close the yarn client", e5);
                }
                throw th;
            }
        } catch (IOException e6) {
            throw new RuntimeException(e6);
        }
    }

    private void verifyApplicationTags(ApplicationReport applicationReport) throws InvocationTargetException, IllegalAccessException {
        try {
            Assertions.assertThat((Set) ApplicationReport.class.getMethod("getApplicationTags", new Class[0]).invoke(applicationReport, new Object[0])).containsOnly(new String[]{"test-tag"});
        } catch (NoSuchMethodException e) {
        }
    }

    @AfterEach
    void checkForProhibitedLogContents() {
        if (this.checkForProhibitedLogContents) {
            ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
        }
    }
}
