package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:test-classes/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.class
 */
/* loaded from: input_file:hadoop-yarn-server-resourcemanager-2.10.1-ODI-tests.jar:org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.class */
public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
    public static final Log LOG = LogFactory.getLog(TestZKConfigurationStore.class);
    private static final int ZK_TIMEOUT_MS = 10000;
    private TestingServer curatorTestingServer;
    private CuratorFramework curatorFramework;
    private ResourceManager rm;

    public static TestingServer setupCuratorServer() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        return testingServer;
    }

    public static CuratorFramework setupCuratorFramework(TestingServer testingServer) throws Exception {
        CuratorFramework build = CuratorFrameworkFactory.builder().connectString(testingServer.getConnectString()).retryPolicy(new RetryNTimes(100, 100)).build();
        build.start();
        return build;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.ConfigurationStoreBaseTest
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.curatorTestingServer = setupCuratorServer();
        this.curatorFramework = setupCuratorFramework(this.curatorTestingServer);
        this.conf.set("hadoop.zk.address", this.curatorTestingServer.getConnectString());
        this.rm = new MockRM(this.conf);
        this.rm.start();
        this.rmContext = this.rm.getRMContext();
    }

    @After
    public void cleanup() throws IOException {
        this.rm.stop();
        this.curatorFramework.close();
        this.curatorTestingServer.stop();
    }

    @Test
    public void testVersioning() throws Exception {
        this.confStore.initialize(this.conf, this.schedConf, this.rmContext);
        Assert.assertNull(this.confStore.getConfStoreVersion());
        this.confStore.checkVersion();
        Assert.assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO, this.confStore.getConfStoreVersion());
    }

    @Test
    public void testPersistConfiguration() throws Exception {
        this.schedConf.set("key", "val");
        this.confStore.initialize(this.conf, this.schedConf, this.rmContext);
        Assert.assertEquals("val", this.confStore.retrieve().get("key"));
        Assert.assertNull(this.confStore.retrieve().get("yarn.resourcemanager.hostname"));
        this.confStore = createConfStore();
        this.schedConf.set("key", "badVal");
        this.confStore.initialize(this.conf, this.schedConf, this.rmContext);
        Assert.assertEquals("val", this.confStore.retrieve().get("key"));
    }

    @Test
    public void testPersistUpdatedConfiguration() throws Exception {
        this.confStore.initialize(this.conf, this.schedConf, this.rmContext);
        Assert.assertNull(this.confStore.retrieve().get("key"));
        HashMap hashMap = new HashMap();
        hashMap.put("key", "val");
        this.confStore.logMutation(new YarnConfigurationStore.LogMutation(hashMap, "testUser"));
        this.confStore.confirmMutation(true);
        Assert.assertEquals("val", this.confStore.retrieve().get("key"));
        this.confStore = createConfStore();
        this.schedConf.set("key", "badVal");
        this.confStore.initialize(this.conf, this.schedConf, this.rmContext);
        Assert.assertEquals("val", this.confStore.retrieve().get("key"));
    }

    @Test
    public void testMaxLogs() throws Exception {
        this.conf.setLong("yarn.scheduler.configuration.store.max-logs", 2L);
        this.confStore.initialize(this.conf, this.schedConf, this.rmContext);
        Assert.assertEquals(0L, ((ZKConfigurationStore) this.confStore).getLogs().size());
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "val1");
        this.confStore.logMutation(new YarnConfigurationStore.LogMutation(hashMap, "testUser"));
        LinkedList<YarnConfigurationStore.LogMutation> logs = ((ZKConfigurationStore) this.confStore).getLogs();
        Assert.assertEquals(1L, logs.size());
        Assert.assertEquals("val1", logs.get(0).getUpdates().get("key1"));
        this.confStore.confirmMutation(true);
        Assert.assertEquals(1L, logs.size());
        Assert.assertEquals("val1", logs.get(0).getUpdates().get("key1"));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("key2", "val2");
        this.confStore.logMutation(new YarnConfigurationStore.LogMutation(hashMap2, "testUser"));
        LinkedList<YarnConfigurationStore.LogMutation> logs2 = ((ZKConfigurationStore) this.confStore).getLogs();
        Assert.assertEquals(2L, logs2.size());
        Assert.assertEquals("val1", logs2.get(0).getUpdates().get("key1"));
        Assert.assertEquals("val2", logs2.get(1).getUpdates().get("key2"));
        this.confStore.confirmMutation(true);
        Assert.assertEquals(2L, logs2.size());
        Assert.assertEquals("val1", logs2.get(0).getUpdates().get("key1"));
        Assert.assertEquals("val2", logs2.get(1).getUpdates().get("key2"));
        HashMap hashMap3 = new HashMap();
        hashMap3.put("key3", "val3");
        this.confStore.logMutation(new YarnConfigurationStore.LogMutation(hashMap3, "testUser"));
        LinkedList<YarnConfigurationStore.LogMutation> logs3 = ((ZKConfigurationStore) this.confStore).getLogs();
        Assert.assertEquals(2L, logs3.size());
        Assert.assertEquals("val2", logs3.get(0).getUpdates().get("key2"));
        Assert.assertEquals("val3", logs3.get(1).getUpdates().get("key3"));
        this.confStore.confirmMutation(true);
        Assert.assertEquals(2L, logs3.size());
        Assert.assertEquals("val2", logs3.get(0).getUpdates().get("key2"));
        Assert.assertEquals("val3", logs3.get(1).getUpdates().get("key3"));
    }

    public Configuration createRMHAConf(String str, String str2, int i) {
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, CapacityScheduler.class);
        yarnConfiguration.setBoolean("yarn.resourcemanager.ha.enabled", true);
        yarnConfiguration.set("yarn.resourcemanager.ha.rm-ids", str);
        yarnConfiguration.setBoolean("yarn.resourcemanager.recovery.enabled", true);
        yarnConfiguration.set("yarn.scheduler.configuration.store.class", "zk");
        yarnConfiguration.set("yarn.resourcemanager.store.class", ZKRMStateStore.class.getName());
        yarnConfiguration.set("yarn.resourcemanager.zk-address", this.curatorTestingServer.getConnectString());
        yarnConfiguration.set("yarn.resourcemanager.ha.id", str2);
        yarnConfiguration.set("yarn.resourcemanager.webapp.address", "localhost:0");
        yarnConfiguration.setBoolean("yarn.resourcemanager.ha.automatic-failover.enabled", false);
        for (String str3 : YarnConfiguration.getServiceAddressConfKeys(yarnConfiguration)) {
            Iterator it = HAUtil.getRMHAIds(yarnConfiguration).iterator();
            while (it.hasNext()) {
                yarnConfiguration.set(HAUtil.addSuffix(str3, (String) it.next()), "localhost:0");
            }
        }
        yarnConfiguration.set(HAUtil.addSuffix("yarn.resourcemanager.admin.address", str2), "localhost:" + i);
        return yarnConfiguration;
    }

    @Test
    public void testFailoverReadsFromUpdatedStore() throws Exception {
        HAServiceProtocol.StateChangeRequestInfo stateChangeRequestInfo = new HAServiceProtocol.StateChangeRequestInfo(HAServiceProtocol.RequestSource.REQUEST_BY_USER);
        Configuration createRMHAConf = createRMHAConf("rm1,rm2", "rm1", 1234);
        MockRM mockRM = new MockRM(createRMHAConf);
        mockRM.start();
        mockRM.getRMContext().getRMAdminService().transitionToActive(stateChangeRequestInfo);
        Assert.assertEquals("RM with ZKStore didn't start", Service.STATE.STARTED, mockRM.getServiceState());
        Assert.assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, mockRM.getRMContext().getRMAdminService().getServiceStatus().getState());
        Assert.assertNull(((MutableConfScheduler) mockRM.getResourceScheduler()).getConfiguration().get("key"));
        MockRM mockRM2 = new MockRM(createRMHAConf("rm1,rm2", "rm2", 5678));
        mockRM2.start();
        Assert.assertEquals("RM should be Standby", HAServiceProtocol.HAServiceState.STANDBY, mockRM2.getRMContext().getRMAdminService().getServiceStatus().getState());
        SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
        schedConfUpdateInfo.getGlobalParams().put("key", "val");
        MutableConfigurationProvider mutableConfProvider = ((MutableConfScheduler) mockRM.getResourceScheduler()).getMutableConfProvider();
        UserGroupInformation createUserForTesting = UserGroupInformation.createUserForTesting("testUser", new String[0]);
        mutableConfProvider.logAndApplyMutation(createUserForTesting, schedConfUpdateInfo);
        mockRM.getResourceScheduler().reinitialize(createRMHAConf, mockRM.getRMContext());
        Assert.assertEquals("val", ((MutableConfScheduler) mockRM.getResourceScheduler()).getConfiguration().get("key"));
        mutableConfProvider.confirmPendingMutation(true);
        Assert.assertEquals("val", ((MutableCSConfigurationProvider) mutableConfProvider).getConfStore().retrieve().get("key"));
        schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
        mutableConfProvider.logAndApplyMutation(createUserForTesting, schedConfUpdateInfo);
        mockRM2.getRMContext().getRMAdminService().transitionToActive(stateChangeRequestInfo);
        Assert.assertEquals("RM with ZKStore didn't start", Service.STATE.STARTED, mockRM2.getServiceState());
        Assert.assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, mockRM2.getRMContext().getRMAdminService().getServiceStatus().getState());
        for (int i = 0; i < 200; i++) {
            if (HAServiceProtocol.HAServiceState.ACTIVE == mockRM.getRMContext().getRMAdminService().getServiceStatus().getState()) {
                Thread.sleep(100L);
            }
        }
        Assert.assertEquals("RM should have been fenced", HAServiceProtocol.HAServiceState.STANDBY, mockRM.getRMContext().getRMAdminService().getServiceStatus().getState());
        Assert.assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, mockRM2.getRMContext().getRMAdminService().getServiceStatus().getState());
        Assert.assertEquals("val", ((MutableCSConfigurationProvider) ((CapacityScheduler) mockRM2.getResourceScheduler()).getMutableConfProvider()).getConfStore().retrieve().get("key"));
        Assert.assertEquals("val", ((MutableConfScheduler) mockRM2.getResourceScheduler()).getConfiguration().get("key"));
        Thread.sleep(AllocationFileLoaderService.ALLOC_RELOAD_INTERVAL_MS);
        mockRM.close();
        mockRM2.close();
    }

    @Test
    public void testFailoverAfterRemoveQueue() throws Exception {
        HAServiceProtocol.StateChangeRequestInfo stateChangeRequestInfo = new HAServiceProtocol.StateChangeRequestInfo(HAServiceProtocol.RequestSource.REQUEST_BY_USER);
        Configuration createRMHAConf = createRMHAConf("rm1,rm2", "rm1", 1234);
        MockRM mockRM = new MockRM(createRMHAConf);
        mockRM.start();
        mockRM.getRMContext().getRMAdminService().transitionToActive(stateChangeRequestInfo);
        Assert.assertEquals("RM with ZKStore didn't start", Service.STATE.STARTED, mockRM.getServiceState());
        Assert.assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, mockRM.getRMContext().getRMAdminService().getServiceStatus().getState());
        MockRM mockRM2 = new MockRM(createRMHAConf("rm1,rm2", "rm2", 5678));
        mockRM2.start();
        Assert.assertEquals("RM should be Standby", HAServiceProtocol.HAServiceState.STANDBY, mockRM2.getRMContext().getRMAdminService().getServiceStatus().getState());
        UserGroupInformation createUserForTesting = UserGroupInformation.createUserForTesting("testUser", new String[0]);
        MutableConfigurationProvider mutableConfProvider = ((MutableConfScheduler) mockRM.getResourceScheduler()).getMutableConfProvider();
        SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
        HashMap hashMap = new HashMap();
        hashMap.put(CapacitySchedulerConfiguration.CAPACITY, "100");
        schedConfUpdateInfo.getAddQueueInfo().add(new QueueConfigInfo("root.a", hashMap));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("state", "STOPPED");
        hashMap2.put(CapacitySchedulerConfiguration.CAPACITY, RMWebServices.DEFAULT_START_TIME);
        schedConfUpdateInfo.getUpdateQueueInfo().add(new QueueConfigInfo("root.default", hashMap2));
        mutableConfProvider.logAndApplyMutation(createUserForTesting, schedConfUpdateInfo);
        mockRM.getResourceScheduler().reinitialize(createRMHAConf, mockRM.getRMContext());
        mutableConfProvider.confirmPendingMutation(true);
        Assert.assertTrue(Arrays.asList(((MutableConfScheduler) mockRM.getResourceScheduler()).getConfiguration().get("yarn.scheduler.capacity.root.queues").split(",")).contains("a"));
        schedConfUpdateInfo.getUpdateQueueInfo().clear();
        schedConfUpdateInfo.getAddQueueInfo().clear();
        schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
        mutableConfProvider.logAndApplyMutation(createUserForTesting, schedConfUpdateInfo);
        mockRM.getResourceScheduler().reinitialize(createRMHAConf, mockRM.getRMContext());
        mutableConfProvider.confirmPendingMutation(true);
        Assert.assertEquals("a", ((MutableConfScheduler) mockRM.getResourceScheduler()).getConfiguration().get("yarn.scheduler.capacity.root.queues"));
        mockRM2.getRMContext().getRMAdminService().transitionToActive(stateChangeRequestInfo);
        Assert.assertEquals("RM with ZKStore didn't start", Service.STATE.STARTED, mockRM2.getServiceState());
        Assert.assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, mockRM2.getRMContext().getRMAdminService().getServiceStatus().getState());
        for (int i = 0; i < 200; i++) {
            if (HAServiceProtocol.HAServiceState.ACTIVE == mockRM.getRMContext().getRMAdminService().getServiceStatus().getState()) {
                Thread.sleep(100L);
            }
        }
        Assert.assertEquals("RM should have been fenced", HAServiceProtocol.HAServiceState.STANDBY, mockRM.getRMContext().getRMAdminService().getServiceStatus().getState());
        Assert.assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, mockRM2.getRMContext().getRMAdminService().getServiceStatus().getState());
        Assert.assertEquals("a", ((MutableCSConfigurationProvider) ((CapacityScheduler) mockRM2.getResourceScheduler()).getMutableConfProvider()).getConfStore().retrieve().get("yarn.scheduler.capacity.root.queues"));
        Assert.assertEquals("a", ((MutableConfScheduler) mockRM2.getResourceScheduler()).getConfiguration().get("yarn.scheduler.capacity.root.queues"));
        Thread.sleep(AllocationFileLoaderService.ALLOC_RELOAD_INTERVAL_MS);
        mockRM.close();
        mockRM2.close();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.ConfigurationStoreBaseTest
    public YarnConfigurationStore createConfStore() {
        return new ZKConfigurationStore();
    }
}
