package org.apache.flink.yarn;

import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.yarn.util.TestUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/flink/yarn/YarnConfigurationITCase.class */
class YarnConfigurationITCase extends YarnTestBase {
    private static final Time TIMEOUT = Time.seconds(10);

    YarnConfigurationITCase() {
    }

    @Timeout(60)
    @Test
    void testFlinkContainerMemory() throws Exception {
        runTest(() -> {
            YarnClient yarnClient = getYarnClient();
            Configuration configuration = new Configuration(this.flinkConfiguration);
            configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 3);
            configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768L));
            TaskExecutorProcessSpec processSpecFromConfig = TaskExecutorProcessUtils.processSpecFromConfig(configuration);
            int mebiBytes = processSpecFromConfig.getTotalProcessMemorySize().getMebiBytes();
            YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration, getYarnConfiguration(), yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
            yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
            yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
            try {
                ClusterClient clusterClient = yarnClusterDescriptor.deployJobCluster(new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(768).setTaskManagerMemoryMB(mebiBytes).setSlotsPerTaskManager(3).createClusterSpecification(), PackagedProgramUtils.createJobGraph(PackagedProgram.newBuilder().setJarFile(TestUtils.getTestJarPath("WindowJoin.jar")).build(), configuration, 1, false), true).getClusterClient();
                ApplicationId applicationId = (ApplicationId) clusterClient.getClusterId();
                RestClient restClient = new RestClient(configuration, Executors.directExecutor());
                try {
                    ApplicationAttemptId currentApplicationAttemptId = yarnClient.getApplicationReport(applicationId).getCurrentApplicationAttemptId();
                    List<ContainerReport> containers = yarnClient.getContainers(currentApplicationAttemptId);
                    while (containers.size() < 2) {
                        Thread.sleep(50L);
                        containers = yarnClient.getContainers(currentApplicationAttemptId);
                    }
                    for (ContainerReport containerReport : containers) {
                        if (containerReport.getContainerId().getId() == 1) {
                            Assertions.assertThat(containerReport.getAllocatedResource().getMemorySize()).isEqualTo(768L);
                        } else {
                            Assertions.assertThat(containerReport.getAllocatedResource().getMemorySize()).isEqualTo(mebiBytes);
                        }
                    }
                    URI uri = new URI(clusterClient.getWebInterfaceURL());
                    while (true) {
                        Collection<TaskManagerInfo> taskManagerInfos = ((TaskManagersInfo) restClient.sendRequest(uri.getHost(), uri.getPort(), TaskManagersHeaders.getInstance(), EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()).get()).getTaskManagerInfos();
                        if (hasTaskManagerConnectedAndReportedSlots(taskManagerInfos)) {
                            TaskManagerInfo next = taskManagerInfos.iterator().next();
                            Assertions.assertThat(next.getNumberSlots()).isEqualTo(3);
                            Assertions.assertThat(next.getHardwareDescription().getSizeOfJvmHeap() / processSpecFromConfig.getJvmHeapMemorySize().getBytes()).isCloseTo(1.0d, Offset.offset(Double.valueOf(0.15d)));
                            Assertions.assertThat((int) (next.getHardwareDescription().getSizeOfManagedMemory() >> 20)).isEqualTo(processSpecFromConfig.getManagedMemorySize().getMebiBytes());
                            restClient.shutdown(TIMEOUT);
                            clusterClient.close();
                            yarnClusterDescriptor.killCluster(applicationId);
                            yarnClusterDescriptor.close();
                            return;
                        }
                        Thread.sleep(100L);
                    }
                } catch (Throwable th) {
                    restClient.shutdown(TIMEOUT);
                    clusterClient.close();
                    throw th;
                }
            } catch (Throwable th2) {
                yarnClusterDescriptor.close();
                throw th2;
            }
        });
    }

    private boolean hasTaskManagerConnectedAndReportedSlots(Collection<TaskManagerInfo> collection) {
        return !collection.isEmpty() && collection.iterator().next().getNumberSlots() > 0;
    }
}
