package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.class */
public class TestContainerResizing {
    private static final Log LOG = LogFactory.getLog(TestContainerResizing.class);
    private final int GB = TestCapacitySchedulerAutoCreatedQueueBase.GB;
    private YarnConfiguration conf;
    RMNodeLabelsManager mgr;

    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing$MyScheduler.class */
    class MyScheduler extends CapacityScheduler {
        MyScheduler() {
        }

        public CSAssignment allocateContainersToNode(CandidateNodeSet<FiCaSchedulerNode> candidateNodeSet, boolean z) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                TestContainerResizing.LOG.debug("Thread interrupted.");
            }
            return super.allocateContainersToNode(candidateNodeSet, z);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.conf = new YarnConfiguration();
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.mgr = new NullRMNodeLabelsManager();
        this.mgr.init(this.conf);
    }

    @Test
    public void testSimpleIncreaseContainer() throws Exception {
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.1
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 20480);
        RMApp submitApp = mockRM.submitApp(TestCapacitySchedulerAutoCreatedQueueBase.GB, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 1L);
        sentRMContainerLaunched(mockRM, newContainerId);
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(3072), (ExecutionType) null)));
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        checkPendingResource(mockRM, "default", 2048, null);
        Assert.assertEquals(2048L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        mockRM.getResourceScheduler().handle(new NodeUpdateSchedulerEvent((RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId())));
        checkPendingResource(mockRM, "default", 0, null);
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        verifyContainerIncreased(launchAndRegisterAM.allocate(null, null), newContainerId, 3072);
        verifyAvailableResourceOfSchedulerNode(mockRM, registerNode.getNodeId(), 17408);
        mockRM.close();
    }

    @Test
    public void testSimpleDecreaseContainer() throws Exception {
        final DrainDispatcher drainDispatcher = new DrainDispatcher();
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.2
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            protected Dispatcher createDispatcher() {
                return drainDispatcher;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 20480);
        RMApp submitApp = mockRM.submitApp(3072, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        checkUsedResource(mockRM, "default", 3072, null);
        Assert.assertEquals(3072L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 1L);
        sentRMContainerLaunched(mockRM, newContainerId);
        verifyContainerDecreased(launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId, ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(TestCapacitySchedulerAutoCreatedQueueBase.GB), (ExecutionType) null))), newContainerId, TestCapacitySchedulerAutoCreatedQueueBase.GB);
        drainDispatcher.waitForEventThreadToWait();
        checkUsedResource(mockRM, "default", TestCapacitySchedulerAutoCreatedQueueBase.GB, null);
        Assert.assertEquals(1024L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        boolean z = false;
        for (Container container : ((RMNodeImpl) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId())).getToBeUpdatedContainers()) {
            if (container.getId().equals(newContainerId) && container.getResource().equals(Resources.createResource(TestCapacitySchedulerAutoCreatedQueueBase.GB))) {
                z = true;
            }
        }
        Assert.assertTrue(z);
        mockRM.close();
    }

    @Test
    public void testSimpleIncreaseRequestReservation() throws Exception {
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.3
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        mockRM.registerNode("h2:1234", 8192);
        RMApp submitApp = mockRM.submitApp(TestCapacitySchedulerAutoCreatedQueueBase.GB, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(2048), 1)), null);
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 2L);
        Assert.assertTrue(mockRM.waitForState(registerNode, newContainerId, RMContainerState.ALLOCATED));
        launchAndRegisterAM.allocate(null, null);
        sentRMContainerLaunched(mockRM, newContainerId);
        ContainerId newContainerId2 = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 1L);
        sentRMContainerLaunched(mockRM, newContainerId2);
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId2, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(7168), (ExecutionType) null)));
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        Assert.assertFalse(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNotNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 9216, null);
        Assert.assertEquals(9216L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(3072L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        launchAndRegisterAM.allocate(null, Arrays.asList(newContainerId));
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        verifyContainerIncreased(launchAndRegisterAM.allocate(null, null), newContainerId2, 7168);
        Assert.assertTrue(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 0, null);
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 7168, null);
        Assert.assertEquals(7168L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        Assert.assertEquals(7168L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        verifyAvailableResourceOfSchedulerNode(mockRM, registerNode.getNodeId(), TestCapacitySchedulerAutoCreatedQueueBase.GB);
        mockRM.close();
    }

    @Test
    public void testIncreaseRequestWithNoHeadroomLeft() throws Exception {
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.4
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        RMApp submitApp = mockRM.submitApp(TestCapacitySchedulerAutoCreatedQueueBase.GB, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(2048), 1)), null);
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 2L);
        Assert.assertTrue(mockRM.waitForState(registerNode, newContainerId, RMContainerState.ALLOCATED));
        launchAndRegisterAM.allocate(null, null);
        sentRMContainerLaunched(mockRM, newContainerId);
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8192), (ExecutionType) null)));
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        resourceScheduler.handle(new NodeUpdateSchedulerEvent((RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId())));
        Assert.assertTrue(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 3072, null);
        Assert.assertEquals(3072L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(3072L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        mockRM.close();
    }

    @Test
    public void testExcessiveReservationWhenCancelIncreaseRequest() throws Exception {
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.5
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        mockRM.registerNode("h2:1234", 8192);
        RMApp submitApp = mockRM.submitApp(TestCapacitySchedulerAutoCreatedQueueBase.GB, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(2048), 1)), null);
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 2L);
        Assert.assertTrue(mockRM.waitForState(registerNode, newContainerId, RMContainerState.ALLOCATED));
        launchAndRegisterAM.allocate(null, null);
        sentRMContainerLaunched(mockRM, newContainerId);
        ContainerId newContainerId2 = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 1L);
        sentRMContainerLaunched(mockRM, newContainerId2);
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId2, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(7168), (ExecutionType) null)));
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        Assert.assertFalse(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNotNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 9216, null);
        Assert.assertEquals(9216L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(3072L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        launchAndRegisterAM.allocate(null, Arrays.asList(newContainerId));
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId2, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(TestCapacitySchedulerAutoCreatedQueueBase.GB), (ExecutionType) null)));
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        Assert.assertTrue(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 0, null);
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", TestCapacitySchedulerAutoCreatedQueueBase.GB, null);
        Assert.assertEquals(1024L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        Assert.assertEquals(1024L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        mockRM.close();
    }

    @Test
    public void testExcessiveReservationWhenDecreaseSameContainer() throws Exception {
        final DrainDispatcher drainDispatcher = new DrainDispatcher();
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.6
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            protected Dispatcher createDispatcher() {
                return drainDispatcher;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        mockRM.registerNode("h2:1234", 8192);
        RMApp submitApp = mockRM.submitApp(2048, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(2048), 1)), null);
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 2L);
        Assert.assertTrue(mockRM.waitForState(registerNode, newContainerId, RMContainerState.ALLOCATED));
        launchAndRegisterAM.allocate(null, null);
        sentRMContainerLaunched(mockRM, newContainerId);
        ContainerId newContainerId2 = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 1L);
        sentRMContainerLaunched(mockRM, newContainerId2);
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId2, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8192), (ExecutionType) null)));
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        Assert.assertFalse(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNotNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 10240, null);
        Assert.assertEquals(10240L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(4096L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        launchAndRegisterAM.allocate(null, Arrays.asList(newContainerId));
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId2, ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(TestCapacitySchedulerAutoCreatedQueueBase.GB), (ExecutionType) null)));
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        drainDispatcher.waitForEventThreadToWait();
        Assert.assertTrue(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 0, null);
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", TestCapacitySchedulerAutoCreatedQueueBase.GB, null);
        Assert.assertEquals(1024L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        Assert.assertEquals(1024L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        mockRM.close();
    }

    @Test
    public void testIncreaseContainerUnreservedWhenContainerCompleted() throws Exception {
        final DrainDispatcher drainDispatcher = new DrainDispatcher();
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.7
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            protected Dispatcher createDispatcher() {
                return drainDispatcher;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        MockNM registerNode2 = mockRM.registerNode("h2:1234", 8192);
        RMApp submitApp = mockRM.submitApp(TestCapacitySchedulerAutoCreatedQueueBase.GB, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(2048), 1)), null);
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 2L);
        Assert.assertTrue(mockRM.waitForState(registerNode, newContainerId, RMContainerState.ALLOCATED));
        launchAndRegisterAM.allocate(null, null);
        sentRMContainerLaunched(mockRM, newContainerId);
        mockRM.waitForState(Arrays.asList(registerNode, registerNode2), newContainerId, RMContainerState.RUNNING);
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8192), (ExecutionType) null)));
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        Assert.assertFalse(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNotNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 9216, null);
        Assert.assertEquals(9216L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(3072L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        launchAndRegisterAM.allocate(null, Arrays.asList(newContainerId));
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        launchAndRegisterAM.allocate(null, null);
        drainDispatcher.waitForEventThreadToWait();
        Assert.assertTrue(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 0, null);
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", TestCapacitySchedulerAutoCreatedQueueBase.GB, null);
        Assert.assertEquals(1024L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        Assert.assertEquals(1024L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        mockRM.close();
    }

    @Test
    public void testIncreaseContainerUnreservedWhenApplicationCompleted() throws Exception {
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.8
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        mockRM.registerNode("h2:1234", 8192);
        RMApp submitApp = mockRM.submitApp(TestCapacitySchedulerAutoCreatedQueueBase.GB, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(2048), 1)), null);
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 2L);
        Assert.assertTrue(mockRM.waitForState(registerNode, newContainerId, RMContainerState.ALLOCATED));
        launchAndRegisterAM.allocate(null, null);
        sentRMContainerLaunched(mockRM, newContainerId);
        launchAndRegisterAM.sendContainerResizingRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, newContainerId, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(8192), (ExecutionType) null)));
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        Assert.assertFalse(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNotNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 9216, null);
        Assert.assertEquals(9216L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(3072L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        resourceScheduler.handle(new AppAttemptRemovedSchedulerEvent(launchAndRegisterAM.getApplicationAttemptId(), RMAppAttemptState.KILLED, false));
        Assert.assertTrue(fiCaSchedulerApp.getReservedContainers().isEmpty());
        Assert.assertNull(resourceScheduler.getNode(registerNode.getNodeId()).getReservedContainer());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        checkPendingResource(mockRM, "default", 0, null);
        checkUsedResource(mockRM, "default", 0, null);
        Assert.assertNull(resourceScheduler.getQueue("default").getUser("user"));
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        mockRM.close();
    }

    private void allocateAndLaunchContainers(MockAM mockAM, MockNM mockNM, MockRM mockRM, int i, int i2, int i3, int i4) throws Exception {
        mockAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(i3), "*", Resources.createResource(i2), i)), null);
        Assert.assertTrue(mockRM.waitForState(mockNM, ContainerId.newContainerId(mockAM.getApplicationAttemptId(), (i4 + i) - 1), RMContainerState.ALLOCATED));
        mockAM.allocate(null, null);
        for (int i5 = i4; i5 < i4 + i; i5++) {
            sentRMContainerLaunched(mockRM, ContainerId.newContainerId(mockAM.getApplicationAttemptId(), i5));
            mockRM.waitForState(mockNM, ContainerId.newContainerId(mockAM.getApplicationAttemptId(), i5), RMContainerState.RUNNING);
        }
    }

    @Test
    public void testOrderOfIncreaseContainerRequestAllocation() throws Exception {
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.9
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM
            public RMNodeLabelsManager createNodeLabelManager() {
                return TestContainerResizing.this.mgr;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 10240);
        RMApp submitApp = mockRM.submitApp(TestCapacitySchedulerAutoCreatedQueueBase.GB, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        FiCaSchedulerApp fiCaSchedulerApp = TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId());
        ApplicationAttemptId applicationAttemptId = launchAndRegisterAM.getApplicationAttemptId();
        allocateAndLaunchContainers(launchAndRegisterAM, registerNode, mockRM, 2, TestCapacitySchedulerAutoCreatedQueueBase.GB, 3, 2);
        allocateAndLaunchContainers(launchAndRegisterAM, registerNode, mockRM, 2, TestCapacitySchedulerAutoCreatedQueueBase.GB, 2, 4);
        allocateAndLaunchContainers(launchAndRegisterAM, registerNode, mockRM, 2, TestCapacitySchedulerAutoCreatedQueueBase.GB, 4, 6);
        ArrayList arrayList = new ArrayList();
        for (int i = 2; i <= 7; i++) {
            arrayList.add(UpdateContainerRequest.newInstance(0, ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), i), ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(2048), (ExecutionType) null));
        }
        launchAndRegisterAM.sendContainerResizingRequest(arrayList);
        checkPendingResource(mockRM, "default", 6144, null);
        Assert.assertEquals(6144L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        resourceScheduler.handle(new NodeUpdateSchedulerEvent((RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId())));
        AllocateResponse allocate = launchAndRegisterAM.allocate(null, null);
        Assert.assertEquals(3L, allocate.getUpdatedContainers().size());
        verifyContainerIncreased(allocate, ContainerId.newContainerId(applicationAttemptId, 4L), 2048);
        verifyContainerIncreased(allocate, ContainerId.newContainerId(applicationAttemptId, 5L), 2048);
        verifyContainerIncreased(allocate, ContainerId.newContainerId(applicationAttemptId, 2L), 2048);
        checkPendingResource(mockRM, "default", 3072, null);
        Assert.assertEquals(3072L, fiCaSchedulerApp.getAppAttemptResourceUsage().getPending().getMemorySize());
        checkUsedResource(mockRM, "default", 10240, null);
        Assert.assertEquals(10240L, resourceScheduler.getQueue("default").getUser("user").getUsed().getMemorySize());
        Assert.assertEquals(0L, fiCaSchedulerApp.getAppAttemptResourceUsage().getReserved().getMemorySize());
        Assert.assertEquals(10240L, fiCaSchedulerApp.getAppAttemptResourceUsage().getUsed().getMemorySize());
        mockRM.close();
    }

    @Test(timeout = 60000)
    public void testDecreaseContainerWillNotDeadlockContainerAllocation() throws Exception {
        MockRM mockRM = new MockRM() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestContainerResizing.10
            public ResourceScheduler createScheduler() {
                MyScheduler myScheduler = new MyScheduler();
                myScheduler.setConf(TestContainerResizing.this.conf);
                return myScheduler;
            }
        };
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 20480);
        RMApp submitApp = mockRM.submitApp(3072, "app", "user", (Map<ApplicationAccessType, String>) null, "default");
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, mockRM, registerNode);
        checkUsedResource(mockRM, "default", 3072, null);
        Assert.assertEquals(3072L, TestUtils.getFiCaSchedulerApp(mockRM, submitApp.getApplicationId()).getAppAttemptResourceUsage().getUsed().getMemorySize());
        ContainerId newContainerId = ContainerId.newContainerId(launchAndRegisterAM.getApplicationAttemptId(), 1L);
        sentRMContainerLaunched(mockRM, newContainerId);
        launchAndRegisterAM.allocate(Collections.singletonList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(2048), 1)), null);
        registerNode.nodeHeartbeat(true);
        verifyContainerDecreased(launchAndRegisterAM.sendContainerResizingRequest(Collections.singletonList(UpdateContainerRequest.newInstance(0, newContainerId, ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(TestCapacitySchedulerAutoCreatedQueueBase.GB), (ExecutionType) null))), newContainerId, TestCapacitySchedulerAutoCreatedQueueBase.GB);
        mockRM.close();
    }

    private void checkPendingResource(MockRM mockRM, String str, int i, String str2) {
        Assert.assertEquals(i, mockRM.getResourceScheduler().getQueue(str).getQueueResourceUsage().getPending(str2 == null ? "" : str2).getMemorySize());
    }

    private void checkUsedResource(MockRM mockRM, String str, int i, String str2) {
        Assert.assertEquals(i, mockRM.getResourceScheduler().getQueue(str).getQueueResourceUsage().getUsed(str2 == null ? "" : str2).getMemorySize());
    }

    private void verifyContainerIncreased(AllocateResponse allocateResponse, ContainerId containerId, int i) {
        boolean z = false;
        for (UpdatedContainer updatedContainer : allocateResponse.getUpdatedContainers()) {
            if (updatedContainer.getContainer().getId().equals(containerId)) {
                z = true;
                Assert.assertEquals(ContainerUpdateType.INCREASE_RESOURCE, updatedContainer.getUpdateType());
                Assert.assertEquals(i, updatedContainer.getContainer().getResource().getMemorySize());
            }
        }
        if (z) {
            return;
        }
        Assert.fail("Container not increased: containerId=" + containerId);
    }

    private void verifyContainerDecreased(AllocateResponse allocateResponse, ContainerId containerId, int i) {
        boolean z = false;
        for (UpdatedContainer updatedContainer : allocateResponse.getUpdatedContainers()) {
            if (updatedContainer.getContainer().getId().equals(containerId)) {
                z = true;
                Assert.assertEquals(ContainerUpdateType.DECREASE_RESOURCE, updatedContainer.getUpdateType());
                Assert.assertEquals(i, updatedContainer.getContainer().getResource().getMemorySize());
            }
        }
        if (z) {
            return;
        }
        Assert.fail("Container not decreased: containerId=" + containerId);
    }

    private void sentRMContainerLaunched(MockRM mockRM, ContainerId containerId) {
        RMContainer rMContainer = mockRM.getResourceScheduler().getRMContainer(containerId);
        if (rMContainer != null) {
            rMContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
        } else {
            Assert.fail("Cannot find RMContainer");
        }
    }

    private void verifyAvailableResourceOfSchedulerNode(MockRM mockRM, NodeId nodeId, int i) {
        Assert.assertEquals(i, mockRM.getResourceScheduler().getNode(nodeId).getUnallocatedResource().getMemorySize());
    }
}
