package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.class */
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    private static final File ALLOC_FILE;
    private static final int GB = 1024;
    private static final String TC_DISABLE_AM_PREEMPTION_GLOBALLY = "testDisableAMPreemptionGlobally";
    private static final int NODE_CAPACITY_MULTIPLE = 4;
    private final boolean fairsharePreemption;
    private final boolean drf;
    private FSAppAttempt greedyApp;
    private FSAppAttempt starvingApp;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ControlledClock clock = new ControlledClock();

    @Rule
    public TestName testName = new TestName();

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> getParameters() {
        return Arrays.asList(new Object[]{"MinSharePreemption", 0}, new Object[]{"MinSharePreemptionWithDRF", 1}, new Object[]{"FairSharePreemption", 2}, new Object[]{"FairSharePreemptionWithDRF", 3});
    }

    public TestFairSchedulerPreemption(String str, int i) throws IOException {
        this.fairsharePreemption = i > 1;
        this.drf = i % 2 == 1;
        writeAllocFile();
    }

    @Before
    public void setup() throws IOException {
        createConfiguration();
        this.conf.set("yarn.scheduler.fair.allocation.file", ALLOC_FILE.getAbsolutePath());
        this.conf.setBoolean("yarn.scheduler.fair.preemption", true);
        this.conf.setFloat("yarn.scheduler.fair.preemption.cluster-utilization-threshold", 0.0f);
        this.conf.setInt("yarn.scheduler.fair.waitTimeBeforeKill", 0);
        this.conf.setLong("yarn.scheduler.fair.update-interval-ms", 60000L);
        if (this.testName.getMethodName().startsWith(TC_DISABLE_AM_PREEMPTION_GLOBALLY)) {
            this.conf.setBoolean("yarn.scheduler.fair.am.preemption", false);
        }
        setupCluster();
    }

    @After
    public void teardown() {
        ALLOC_FILE.delete();
        this.conf = null;
        if (this.resourceManager != null) {
            this.resourceManager.stop();
            this.resourceManager = null;
        }
    }

    private void writeAllocFile() {
        AllocationFileWriter addQueue = this.fairsharePreemption ? AllocationFileWriter.create().addQueue(new AllocationFileQueue.Builder(ActivitiesTestUtils.FN_SCHEDULER_ACT_ALLOCATIONS_ROOT).subQueue(new AllocationFileQueue.Builder("preemptable").fairSharePreemptionThreshold(1.0d).fairSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").build()).subQueue(new AllocationFileQueue.Builder("child-2").build()).build()).subQueue(new AllocationFileQueue.Builder("preemptable-sibling").fairSharePreemptionThreshold(1.0d).fairSharePreemptionTimeout(0).build()).subQueue(new AllocationFileQueue.Builder("nonpreemptable").allowPreemptionFrom(false).fairSharePreemptionThreshold(1.0d).fairSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").build()).subQueue(new AllocationFileQueue.Builder("child-2").build()).build()).build()) : AllocationFileWriter.create().addQueue(new AllocationFileQueue.Builder(ActivitiesTestUtils.FN_SCHEDULER_ACT_ALLOCATIONS_ROOT).subQueue(new AllocationFileQueue.Builder("preemptable").minSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").minResources("4096mb,4vcores").build()).subQueue(new AllocationFileQueue.Builder("child-2").minResources("4096mb,4vcores").build()).build()).subQueue(new AllocationFileQueue.Builder("preemptable-sibling").minSharePreemptionTimeout(0).build()).subQueue(new AllocationFileQueue.Builder("nonpreemptable").allowPreemptionFrom(false).minSharePreemptionTimeout(0).subQueue(new AllocationFileQueue.Builder("child-1").minResources("4096mb,4vcores").build()).subQueue(new AllocationFileQueue.Builder("child-2").minResources("4096mb,4vcores").build()).build()).build());
        if (this.drf) {
            addQueue.drfDefaultQueueSchedulingPolicy();
        }
        addQueue.writeToFile(ALLOC_FILE.getAbsolutePath());
        Assert.assertTrue("Allocation file does not exist, not running the test", ALLOC_FILE.exists());
    }

    private void setupCluster() throws IOException {
        this.resourceManager = new MockRM(this.conf);
        this.scheduler = this.resourceManager.getResourceScheduler();
        this.clock.setTime(SystemClock.getInstance().getTime());
        this.scheduler.setClock(this.clock);
        this.resourceManager.start();
        addNode(4096, 12);
        addNode(4096, 12);
        this.scheduler.reinitialize(this.conf, this.resourceManager.getRMContext());
        Assert.assertFalse(this.scheduler.getQueueManager().getQueue("nonpreemptable.child-1").isPreemptable());
        Assert.assertFalse(this.scheduler.getQueueManager().getQueue("nonpreemptable.child-2").isPreemptable());
    }

    private void sendEnoughNodeUpdatesToAssignFully() {
        Iterator<RMNode> it = this.rmNodes.iterator();
        while (it.hasNext()) {
            NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = new NodeUpdateSchedulerEvent(it.next());
            for (int i = 0; i < 4; i++) {
                this.scheduler.handle(nodeUpdateSchedulerEvent);
            }
        }
    }

    private void takeAllResources(String str) {
        this.greedyApp = this.scheduler.getSchedulerApp(createSchedulingRequest(1024, 1, str, "default", 4 * this.rmNodes.size()));
        this.scheduler.update();
        sendEnoughNodeUpdatesToAssignFully();
        Assert.assertEquals(8L, this.greedyApp.getLiveContainers().size());
        Assert.assertTrue(this.scheduler.getQueueManager().getQueue(str).isPreemptable() == this.greedyApp.isPreemptable());
    }

    private void preemptHalfResources(String str) throws InterruptedException {
        this.starvingApp = this.scheduler.getSchedulerApp(createSchedulingRequest(2048, 2, str, "default", (4 * this.rmNodes.size()) / 2));
        this.clock.tickSec(1);
        this.scheduler.update();
    }

    private void submitApps(String str, String str2) throws InterruptedException {
        takeAllResources(str);
        preemptHalfResources(str2);
    }

    private void verifyPreemption(int i, int i2) throws InterruptedException {
        for (int i3 = 0; i3 < 1000 && this.greedyApp.getLiveContainers().size() != i2; i3++) {
            Thread.sleep(10L);
        }
        Assert.assertEquals("Incorrect # of containers on the greedy app", i2, this.greedyApp.getLiveContainers().size());
        Assert.assertEquals("Incorrect # of preempted containers in QueueMetrics", 8 - i2, this.greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
        Iterator<RMNode> it = this.rmNodes.iterator();
        while (it.hasNext()) {
            FSSchedulerNode node = this.scheduler.getNodeTracker().getNode(it.next().getNodeID());
            if (node.getContainersForPreemption().size() > 0) {
                Assert.assertTrue("node should be reserved for the starvingApp", node.getPreemptionList().keySet().contains(this.starvingApp));
            }
        }
        sendEnoughNodeUpdatesToAssignFully();
        Assert.assertEquals("Starved app is not assigned the right # of containers", i, this.starvingApp.getLiveContainers().size());
        Iterator<RMNode> it2 = this.rmNodes.iterator();
        while (it2.hasNext()) {
            FSSchedulerNode node2 = this.scheduler.getNodeTracker().getNode(it2.next().getNodeID());
            if (node2.getContainersForPreemption().size() > 0) {
                Assert.assertFalse(node2.getPreemptionList().keySet().contains(this.starvingApp));
            }
        }
    }

    private void verifyNoPreemption() throws InterruptedException {
        for (int i = 0; i < 100 && this.greedyApp.getLiveContainers().size() == 8; i++) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(8L, this.greedyApp.getLiveContainers().size());
    }

    @Test
    public void testPreemptionWithinSameLeafQueue() throws Exception {
        submitApps("root.preemptable.child-1", "root.preemptable.child-1");
        if (this.fairsharePreemption) {
            verifyPreemption(2, 4);
        } else {
            verifyNoPreemption();
        }
    }

    @Test
    public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
        submitApps("root.preemptable.child-1", "root.preemptable.child-2");
        verifyPreemption(2, 4);
    }

    @Test
    public void testPreemptionBetweenNonSiblingQueues() throws Exception {
        submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
        verifyPreemption(2, 4);
    }

    @Test
    public void testNoPreemptionFromDisallowedQueue() throws Exception {
        submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
        verifyNoPreemption();
    }

    private void setNumAMContainersPerNode(int i) {
        Iterator it = this.scheduler.getNodeTracker().getNodesByResourceName("*").iterator();
        while (it.hasNext()) {
            List copiedListOfRunningContainers = ((FSSchedulerNode) it.next()).getCopiedListOfRunningContainers();
            for (int i2 = 0; i2 < i; i2++) {
                ((RMContainerImpl) copiedListOfRunningContainers.get(i2)).setAMContainer(true);
            }
        }
    }

    @Test
    public void testPreemptionSelectNonAMContainer() throws Exception {
        takeAllResources("root.preemptable.child-1");
        setNumAMContainersPerNode(2);
        preemptHalfResources("root.preemptable.child-2");
        verifyPreemption(2, 4);
        ArrayList arrayList = (ArrayList) this.starvingApp.getLiveContainers();
        Assert.assertTrue("Preempted containers should come from two different nodes.", !((RMContainer) arrayList.get(0)).getNodeId().getHost().equals(((RMContainer) arrayList.get(1)).getNodeId().getHost()));
    }

    @Test
    public void testAppNotPreemptedBelowFairShare() throws Exception {
        takeAllResources("root.preemptable.child-1");
        tryPreemptMoreThanFairShare("root.preemptable.child-2");
    }

    private void tryPreemptMoreThanFairShare(String str) throws InterruptedException {
        this.starvingApp = this.scheduler.getSchedulerApp(createSchedulingRequest(3072, 3, str, "default", (4 * this.rmNodes.size()) / 2));
        verifyPreemption(1, 5);
    }

    @Test
    public void testDisableAMPreemption() {
        testDisableAMPreemption(false);
    }

    @Test
    public void testDisableAMPreemptionGlobally() {
        testDisableAMPreemption(true);
    }

    private void testDisableAMPreemption(boolean z) {
        takeAllResources("root.preemptable.child-1");
        setNumAMContainersPerNode(2);
        RMContainer rMContainer = (RMContainer) this.greedyApp.getLiveContainers().stream().filter(rMContainer2 -> {
            return rMContainer2.isAMContainer();
        }).findFirst().get();
        if (!z) {
            this.greedyApp.setEnableAMPreemption(false);
        }
        Assert.assertFalse(this.greedyApp.canContainerBePreempted(rMContainer, (Resource) null));
    }

    @Test
    public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() throws InterruptedException {
        if (this.fairsharePreemption) {
            takeAllResources("root.preemptable.child-1");
            preemptHalfResources("root.preemptable-sibling");
            verifyPreemption(2, 4);
            preemptHalfResources("root.preemptable.child-2");
            verifyPreemption(1, 2);
        }
    }

    @Test
    public void testRelaxLocalityPreemptionWithLessAMInRemainingNodes() throws Exception {
        takeAllResources("root.preemptable.child-1");
        RMNode rMNode = this.rmNodes.get(0);
        setAllAMContainersOnNode(rMNode.getNodeID());
        ApplicationAttemptId greedyAppAttemptIdOnNode = getGreedyAppAttemptIdOnNode(rMNode.getNodeID());
        updateRelaxLocalityRequestSchedule(rMNode, 1024, 4);
        verifyRelaxLocalityPreemption(rMNode.getNodeID(), greedyAppAttemptIdOnNode, 4);
    }

    @Test
    public void testRelaxLocalityPreemptionWithNoLessAMInRemainingNodes() throws Exception {
        takeAllResources("root.preemptable.child-1");
        RMNode rMNode = this.rmNodes.get(0);
        setNumAMContainersOnNode(3, rMNode.getNodeID());
        RMNode rMNode2 = this.rmNodes.get(1);
        setAllAMContainersOnNode(rMNode2.getNodeID());
        ApplicationAttemptId greedyAppAttemptIdOnNode = getGreedyAppAttemptIdOnNode(rMNode2.getNodeID());
        updateRelaxLocalityRequestSchedule(rMNode, 2048, 1);
        verifyRelaxLocalityPreemption(rMNode2.getNodeID(), greedyAppAttemptIdOnNode, 6);
    }

    private void setAllAMContainersOnNode(NodeId nodeId) {
        setNumAMContainersOnNode(Integer.MAX_VALUE, nodeId);
    }

    private void setNumAMContainersOnNode(int i, NodeId nodeId) {
        int i2 = 0;
        for (RMContainerImpl rMContainerImpl : this.scheduler.getNodeTracker().getNode(nodeId).getCopiedListOfRunningContainers()) {
            i2++;
            if (i2 > i) {
                return;
            } else {
                rMContainerImpl.setAMContainer(true);
            }
        }
    }

    private ApplicationAttemptId getGreedyAppAttemptIdOnNode(NodeId nodeId) {
        return ((RMContainer) this.scheduler.getNodeTracker().getNode(nodeId).getCopiedListOfRunningContainers().get(0)).getApplicationAttemptId();
    }

    private void updateRelaxLocalityRequestSchedule(RMNode rMNode, int i, int i2) {
        this.starvingApp = this.scheduler.getSchedulerApp(createSchedulingRequest("root.preemptable.child-2", "default", Arrays.asList(createResourceRequest(i, rMNode.getHostName(), 1, i2, true), createResourceRequest(10240, rMNode.getRackName(), 1, 1, true), createResourceRequest(10240, "*", 1, 1, true))));
        this.clock.tickSec(1);
        this.scheduler.update();
    }

    private void verifyRelaxLocalityPreemption(NodeId nodeId, ApplicationAttemptId applicationAttemptId, int i) throws Exception {
        verifyPreemption(0, i);
        for (RMContainer rMContainer : this.scheduler.getNodeTracker().getNode(nodeId).getCopiedListOfRunningContainers()) {
            if (!$assertionsDisabled && !rMContainer.isAMContainer()) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !rMContainer.getApplicationAttemptId().equals(applicationAttemptId)) {
                throw new AssertionError();
            }
        }
    }

    static {
        $assertionsDisabled = !TestFairSchedulerPreemption.class.desiredAssertionStatus();
        ALLOC_FILE = new File(TEST_DIR, "test-queues");
    }
}
