package org.apache.hadoop.yarn.service;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/yarn/service/TestServiceAM.class */
public class TestServiceAM extends ServiceTestUtils {
    private static final Logger LOG = LoggerFactory.getLogger(TestServiceAM.class);
    private File basedir;
    TestingCluster zkCluster;
    YarnConfiguration conf = new YarnConfiguration();

    @Rule
    public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher();

    /* loaded from: input_file:org/apache/hadoop/yarn/service/TestServiceAM$MockServiceScheduler.class */
    class MockServiceScheduler extends ServiceScheduler {
        private AsyncDispatcher dispatcher;
        private ServiceScheduler.AMRMClientCallback callbackHandler;

        MockServiceScheduler(ServiceContext serviceContext) {
            super(serviceContext);
            this.callbackHandler = new ServiceScheduler.AMRMClientCallback(this);
        }

        protected AsyncDispatcher createAsyncDispatcher() {
            this.dispatcher = (AsyncDispatcher) Mockito.mock(AsyncDispatcher.class);
            ((AsyncDispatcher) Mockito.doReturn((EventHandler) Mockito.mock(EventHandler.class)).when(this.dispatcher)).getEventHandler();
            return this.dispatcher;
        }

        protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
            return AMRMClientAsync.createAMRMClientAsync(1000, this.callbackHandler);
        }
    }

    @Before
    public void setup() throws Exception {
        this.basedir = new File("target", "apps");
        if (this.basedir.exists()) {
            FileUtils.deleteDirectory(this.basedir);
        } else {
            this.basedir.mkdirs();
        }
        this.zkCluster = new TestingCluster(1);
        this.zkCluster.start();
        this.conf.set("hadoop.registry.zk.quorum", this.zkCluster.getConnectString());
        LOG.info("ZK cluster: {}", this.zkCluster.getConnectString());
    }

    @After
    public void tearDown() throws IOException {
        if (this.basedir != null) {
            FileUtils.deleteDirectory(this.basedir);
        }
        if (this.zkCluster != null) {
            this.zkCluster.stop();
        }
    }

    @Test
    public void testContainerCompleted() throws TimeoutException, InterruptedException {
        ApplicationId newInstance = ApplicationId.newInstance(123456L, 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setVersion("v1");
        service.setName("testContainerCompleted");
        service.addComponent(createComponent("compa", 1L, "pwd"));
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        ComponentInstance compInstance = mockServiceAM.getCompInstance("compa", "compa-0");
        mockServiceAM.feedContainerToComp(service, 1, "compa");
        mockServiceAM.waitForCompInstanceState(compInstance, ComponentInstanceState.STARTED);
        LOG.info("Fail the container 1");
        mockServiceAM.feedFailedContainerToComp(service, 1, "compa");
        mockServiceAM.feedContainerToComp(service, 2, "compa");
        mockServiceAM.waitForCompInstanceState(compInstance, ComponentInstanceState.INIT);
        Assert.assertEquals(1L, mockServiceAM.getComponent("compa").getPendingInstances().size());
        mockServiceAM.stop();
    }

    @Test(timeout = 200000)
    public void testContainersFromPreviousAttemptsWithRMRestart() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setVersion("v1");
        service.setName("testContainersRecovers");
        String str = "comp1";
        String str2 = "comp1-0";
        service.addComponent(createComponent("comp1", 1L, "sleep"));
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        ContainerId createContainerId = mockServiceAM.createContainerId(1);
        mockServiceAM.feedRegistryComponent(createContainerId, "comp1", "comp1-0");
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        ComponentInstance compInstance = mockServiceAM.getCompInstance("comp1", "comp1-0");
        mockServiceAM.feedRecoveredContainer(createContainerId, "comp1");
        mockServiceAM.waitForCompInstanceState(compInstance, ComponentInstanceState.STARTED);
        Assert.assertEquals(0L, mockServiceAM.getComponent("comp1").getPendingInstances().size());
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(mockServiceAM.getCompInstance(str, str2).getContainerStatus() != null);
        }, 2000L, 200000L);
        Assert.assertEquals("container state", ContainerState.RUNNING, mockServiceAM.getCompInstance("comp1", "comp1-0").getContainerStatus().getState());
        mockServiceAM.stop();
    }

    @Test(timeout = 200000)
    public void testContainersReleasedWhenExpired() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setName("testContainersRecovers");
        service.setVersion("v1");
        String str = "comp1";
        String str2 = "comp1-0";
        service.addComponent(createComponent("comp1", 1L, "sleep"));
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        mockServiceAM.feedRegistryComponent(mockServiceAM.createContainerId(1), "comp1", "comp1-0");
        this.conf.setLong("yarn.service.container-recovery.timeout.ms", 10L);
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        Thread.sleep(100L);
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(mockServiceAM.getComponent(str).getState().equals(ComponentState.FLEXING));
        }, 100L, 2000L);
        Assert.assertEquals(1L, mockServiceAM.getComponent("comp1").getPendingInstances().size());
        mockServiceAM.feedContainerToComp(service, 2, "comp1");
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(mockServiceAM.getCompInstance(str, str2).getContainerStatus() != null);
        }, 2000L, 200000L);
        Assert.assertEquals("container state", ContainerState.RUNNING, mockServiceAM.getCompInstance("comp1", "comp1-0").getContainerStatus().getState());
    }

    @Test(timeout = 200000)
    public void testContainersFromDifferentApp() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setName("testContainersFromDifferentApp");
        service.setVersion("v1");
        String str = "comp1";
        String str2 = "comp1-0";
        service.addComponent(createComponent("comp1", 1L, "sleep"));
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        mockServiceAM.feedRegistryComponent(mockServiceAM.createContainerId(1), "comp1", "comp1-0");
        service.setId(ApplicationId.newInstance(System.currentTimeMillis(), 2).toString());
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        Assert.assertEquals(1L, mockServiceAM.getComponent("comp1").getPendingInstances().size());
        mockServiceAM.feedContainerToComp(service, 1, "comp1");
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(mockServiceAM.getCompInstance(str, str2).getContainerStatus() != null);
        }, 2000L, 200000L);
        Assert.assertEquals("container state", ContainerState.RUNNING, mockServiceAM.getCompInstance("comp1", "comp1-0").getContainerStatus().getState());
        mockServiceAM.stop();
    }

    @Test
    public void testScheduleWithMultipleResourceTypes() throws TimeoutException, InterruptedException, IOException {
        ApplicationId newInstance = ApplicationId.newInstance(123456L, 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setName("testScheduleWithMultipleResourceTypes");
        service.setVersion("v1");
        ArrayList arrayList = new ArrayList(ResourceUtils.getResourcesTypeInfo());
        arrayList.add(ResourceTypeInfo.newInstance("resource-1", "", ResourceTypes.COUNTABLE));
        ResourceUtils.reinitializeResources(arrayList);
        Component createComponent = createComponent("compa", 1L, "pwd");
        createComponent.getResource().setResourceInformations(ImmutableMap.of("resource-1", new ResourceInformation().value(3333L).unit("Gi")));
        service.addComponent(createComponent);
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        Collection matchingRequests = mockServiceAM.context.scheduler.getAmRMClient().getMatchingRequests(0L);
        Assert.assertEquals(1L, matchingRequests.size());
        Resource capability = ((AMRMClient.ContainerRequest) matchingRequests.iterator().next()).getCapability();
        Assert.assertEquals(3333L, capability.getResourceValue("resource-1"));
        Assert.assertEquals("Gi", capability.getResourceInformation("resource-1").getUnits());
        mockServiceAM.stop();
    }

    @Test
    public void testContainerCompletedEventProcessed() throws Exception {
        MockServiceScheduler mockServiceScheduler = new MockServiceScheduler(createServiceContext("abc"));
        mockServiceScheduler.init(this.conf);
        ApplicationAttemptId newInstance = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0L, 0), 1);
        ContainerStatus newInstance2 = ContainerStatus.newInstance(ContainerId.newContainerId(newInstance, 0L), ContainerState.COMPLETE, "successful", 0);
        ContainerId newContainerId = ContainerId.newContainerId(newInstance, 1L);
        ContainerStatus newInstance3 = ContainerStatus.newInstance(newContainerId, ContainerState.COMPLETE, "successful", 0);
        ComponentInstance componentInstance = (ComponentInstance) Mockito.mock(ComponentInstance.class);
        ((ComponentInstance) Mockito.doReturn("componentInstance").when(componentInstance)).getCompName();
        mockServiceScheduler.addLiveCompInstance(newContainerId, componentInstance);
        ArrayList arrayList = new ArrayList();
        arrayList.add(newInstance2);
        mockServiceScheduler.addLiveCompInstance(newContainerId, componentInstance);
        arrayList.add(newInstance3);
        mockServiceScheduler.callbackHandler.onContainersCompleted(arrayList);
        ((AsyncDispatcher) Mockito.verify(mockServiceScheduler.dispatcher, Mockito.times(1))).getEventHandler();
        DefaultMetricsSystem.shutdown();
    }

    private ServiceContext createServiceContext(String str) throws Exception {
        Artifact artifact = new Artifact();
        artifact.setId("1");
        artifact.setType(Artifact.TypeEnum.TARBALL);
        Service createExampleApplication = ServiceTestUtils.createExampleApplication();
        createExampleApplication.setId(ApplicationId.newInstance(System.currentTimeMillis(), 1).toString());
        createExampleApplication.setName(str);
        createExampleApplication.setState(ServiceState.STARTED);
        createExampleApplication.getComponents().forEach(component -> {
            component.setArtifact(artifact);
        });
        MockRunningServiceContext mockRunningServiceContext = new MockRunningServiceContext(this.rule, createExampleApplication);
        ((ServiceContext) mockRunningServiceContext).scheduler.getDispatcher().setDrainEventsOnStop();
        ((ServiceContext) mockRunningServiceContext).scheduler.getDispatcher().start();
        return mockRunningServiceContext;
    }

    @Test
    public void testRecordTokensForContainers() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(123456L, 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setName("testContainerCompleted");
        service.addComponent(createComponent("compa", 1L, "pwd"));
        File file = new File("target", "docker-tmp");
        FileUtils.deleteQuietly(file);
        file.mkdirs();
        String str = file + "/config.json";
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(str));
        bufferedWriter.write("{\"auths\": {\"https://index.docker.io/v1/\": {\"auth\": \"foobarbaz\"},\"registry.example.com\": {\"auth\": \"bazbarfoo\"}}}");
        bufferedWriter.close();
        MockServiceAM mockServiceAM = new MockServiceAM(service, DockerClientConfigHandler.readCredentialsFromConfigFile(new Path(str), this.conf, newInstance.toString()));
        Credentials credentialsFromTokensByteBuffer = DockerClientConfigHandler.getCredentialsFromTokensByteBuffer(mockServiceAM.recordTokensForContainers());
        Assert.assertEquals(2L, credentialsFromTokensByteBuffer.numberOfTokens());
        Iterator it = credentialsFromTokensByteBuffer.getAllTokens().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Token) it.next()).getKind().equals(DockerCredentialTokenIdentifier.KIND));
        }
        mockServiceAM.stop();
    }

    @Test
    public void testIPChange() throws TimeoutException, InterruptedException {
        ApplicationId newInstance = ApplicationId.newInstance(123456L, 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setVersion("v1");
        service.setName("testIPChange");
        Component createComponent = createComponent("comp1", 1L, "sleep 60");
        createComponent.setArtifact(new Artifact().type(Artifact.TypeEnum.DOCKER));
        service.addComponent(createComponent);
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        ComponentInstance compInstance = mockServiceAM.getCompInstance("comp1", "comp1-0");
        mockServiceAM.feedContainerToComp(service, 1, "comp1");
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(compInstance.getContainerStatus() != null);
        }, 2000L, 200000L);
        Assert.assertEquals("localhost", compInstance.getContainerStatus().getHost());
        LOG.info("Change the IP and host");
        mockServiceAM.updateContainerStatus(service, 1, "comp1", "new.host");
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(compInstance.getContainerStatus().getHost().equals("new.host"));
        }, 2000L, 200000L);
        LOG.info("Change the IP and host again");
        mockServiceAM.updateContainerStatus(service, 1, "comp1", "newer.host");
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(compInstance.getContainerStatus().getHost().equals("newer.host"));
        }, 2000L, 200000L);
        mockServiceAM.stop();
    }

    @Test(timeout = 30000)
    public void testContainersReleasedWhenPreLaunchFails() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setVersion("v1");
        service.setName("testContainersReleasedWhenPreLaunchFails");
        Component createComponent = createComponent("compa", 1L, "pwd");
        Artifact artifact = new Artifact();
        artifact.setType(Artifact.TypeEnum.TARBALL);
        createComponent.artifact(artifact);
        service.addComponent(createComponent);
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        ContainerId createContainerId = mockServiceAM.createContainerId(1);
        mockServiceAM.feedContainerToComp(service, createContainerId, "compa");
        mockServiceAM.waitForContainerToRelease(createContainerId);
        ComponentInstance compInstance = mockServiceAM.getCompInstance(createComponent.getName(), "compa-0");
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(mockServiceAM.getComponent(createComponent.getName()).getPendingInstances().contains(compInstance));
        }, 2000L, 30000L);
        Assert.assertEquals(1L, mockServiceAM.getComponent("compa").getPendingInstances().size());
        mockServiceAM.stop();
    }

    @Test(timeout = 30000)
    public void testSyncSysFS() {
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setVersion("v1");
        service.setName("tensorflow");
        Component createComponent = createComponent("compa", 1L, "pwd");
        createComponent.getConfiguration().getEnv().put("YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE", "true");
        Artifact artifact = new Artifact();
        artifact.setType(Artifact.TypeEnum.TARBALL);
        createComponent.artifact(artifact);
        service.addComponent(createComponent);
        try {
            MockServiceAM mockServiceAM = new MockServiceAM(service);
            mockServiceAM.init(this.conf);
            mockServiceAM.start();
            ServiceScheduler serviceScheduler = mockServiceAM.context.scheduler;
            serviceScheduler.syncSysFs(service);
            serviceScheduler.close();
            mockServiceAM.stop();
            mockServiceAM.close();
        } catch (Exception e) {
            LOG.error("Fail to sync sysfs.", e);
            Assert.fail("Fail to sync sysfs.");
        }
    }

    @Test
    public void testScheduleWithResourceAttributes() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(123456L, 1);
        Service service = new Service();
        service.setId(newInstance.toString());
        service.setName("testScheduleWithResourceAttributes");
        service.setVersion("v1");
        ArrayList arrayList = new ArrayList(ResourceUtils.getResourcesTypeInfo());
        arrayList.add(ResourceTypeInfo.newInstance("test-resource", "", ResourceTypes.COUNTABLE));
        ResourceUtils.reinitializeResources(arrayList);
        Component createComponent = createComponent("compa", 1L, "pwd");
        createComponent.getResource().setResourceInformations(ImmutableMap.of("test-resource", new ResourceInformation().value(1234L).unit("Gi").attributes(ImmutableMap.of("k1", "v1", "k2", "v2"))));
        service.addComponent(createComponent);
        MockServiceAM mockServiceAM = new MockServiceAM(service);
        mockServiceAM.init(this.conf);
        mockServiceAM.start();
        Collection matchingRequests = mockServiceAM.context.scheduler.getAmRMClient().getMatchingRequests(0L);
        Assert.assertEquals(1L, matchingRequests.size());
        Resource capability = ((AMRMClient.ContainerRequest) matchingRequests.iterator().next()).getCapability();
        Assert.assertEquals(1234L, capability.getResourceValue("test-resource"));
        Assert.assertEquals("Gi", capability.getResourceInformation("test-resource").getUnits());
        Assert.assertEquals(2L, capability.getResourceInformation("test-resource").getAttributes().size());
        mockServiceAM.stop();
    }
}
