package org.apache.hadoop.hdfs.server.federation.fairness;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.class */
public class TestRouterHandlersFairness {
    private static final Logger LOG = LoggerFactory.getLogger(TestRouterHandlersFairness.class);
    private StateStoreDFSCluster cluster;
    private Map<String, Integer> expectedHandlerPerNs;
    private Class<RouterRpcFairnessPolicyController> policyControllerClass;
    private int handlerCount;
    private Map<String, String> configuration;

    public TestRouterHandlersFairness(Class<RouterRpcFairnessPolicyController> cls, int i, Map<String, String> map, Map<String, Integer> map2) {
        this.expectedHandlerPerNs = map2;
        this.policyControllerClass = cls;
        this.handlerCount = i;
        this.configuration = map;
    }

    @Parameterized.Parameters
    public static Collection primes() {
        return Arrays.asList(new Object[]{StaticRouterRpcFairnessPolicyController.class, 3, setConfiguration(null), expectedHandlerPerNs("ns0:1, ns1:1, concurrent:1")}, new Object[]{ProportionRouterRpcFairnessPolicyController.class, 20, setConfiguration("dfs.federation.router.fairness.handler.proportion.ns0=0.5, dfs.federation.router.fairness.handler.proportion.ns1=0.8, dfs.federation.router.fairness.handler.proportion.concurrent=1"), expectedHandlerPerNs("ns0:10, ns1:16, concurrent:20")});
    }

    @After
    public void cleanup() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void setupCluster(boolean z, boolean z2) throws Exception {
        LOG.info("Test {}", this.policyControllerClass.getSimpleName());
        this.cluster = new StateStoreDFSCluster(z2, 2);
        Configuration build = new RouterConfigBuilder().stateStore().rpc().build();
        if (z) {
            build.setClass("dfs.federation.router.fairness.policy.controller.class", this.policyControllerClass, RouterRpcFairnessPolicyController.class);
        }
        build.setTimeDuration("dfs.federation.router.fairness.acquire.timeout", 10L, TimeUnit.MILLISECONDS);
        build.setInt("dfs.federation.router.handler.count", this.handlerCount);
        for (Map.Entry<String, String> entry : this.configuration.entrySet()) {
            build.set(entry.getKey(), entry.getValue());
        }
        this.cluster.setNumDatanodesPerNameservice(0);
        this.cluster.addRouterOverrides(build);
        this.cluster.startCluster();
        this.cluster.startRouters();
        this.cluster.waitClusterUp();
    }

    @Test
    public void testFairnessControlOff() throws Exception {
        setupCluster(false, false);
        startLoadTest(false);
    }

    @Test
    public void testFairnessControlOn() throws Exception {
        setupCluster(true, false);
        startLoadTest(true);
    }

    @Test
    public void testReleasedWhenExceptionOccurs() throws Exception {
        setupCluster(true, false);
        MiniRouterDFSCluster.RouterContext randomRouter = this.cluster.getRandomRouter();
        RouterRpcClient rPCClient = randomRouter.getRouter().getRpcServer().getRPCClient();
        ActiveNamenodeResolver activeNamenodeResolver = (ActiveNamenodeResolver) Mockito.mock(ActiveNamenodeResolver.class);
        Field declaredField = rPCClient.getClass().getDeclaredField("namenodeResolver");
        declaredField.setAccessible(true);
        declaredField.set(rPCClient, activeNamenodeResolver);
        DFSClient client = randomRouter.getClient();
        int availablePermits = rPCClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0");
        LambdaTestUtils.intercept(IOException.class, () -> {
            LOG.info("Use getFileInfo test invokeSequential.");
            client.getFileInfo("/test.txt");
        });
        Assert.assertEquals(availablePermits, rPCClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RemoteLocation("ns0", "/", "/"));
        RemoteMethod remoteMethod = new RemoteMethod("renewLease", new Class[]{String.class, List.class}, new Object[]{null, null});
        int availablePermits2 = rPCClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0");
        LambdaTestUtils.intercept(IOException.class, () -> {
            LOG.info("Use renewLease test invokeConcurrent.");
            rPCClient.invokeConcurrent(arrayList, remoteMethod);
        });
        Assert.assertEquals(availablePermits2, rPCClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"));
    }

    private void startLoadTest(boolean z) throws Exception {
        startLoadTest(true, z);
        startLoadTest(false, z);
    }

    private void startLoadTest(boolean z, boolean z2) throws Exception {
        MiniRouterDFSCluster.RouterContext randomRouter = this.cluster.getRandomRouter();
        URI fileSystemURI = randomRouter.getFileSystemURI();
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        AtomicInteger atomicInteger = new AtomicInteger();
        if (z2) {
            if (z) {
                LOG.info("Taking fanout lock first");
                for (int i = 0; i < this.expectedHandlerPerNs.get("concurrent").intValue(); i++) {
                    Assert.assertTrue(randomRouter.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().acquirePermit("concurrent"));
                }
            } else {
                for (String str : this.cluster.getNameservices()) {
                    LOG.info("Taking lock first for ns: {}", str);
                    for (int i2 = 0; i2 < this.expectedHandlerPerNs.get(str).intValue(); i2++) {
                        Assert.assertTrue(randomRouter.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().acquirePermit(str));
                    }
                }
            }
        }
        int totalRejectedPermits = getTotalRejectedPermits(randomRouter);
        innerCalls(fileSystemURI, 10, z, hdfsConfiguration, atomicInteger);
        Assert.assertEquals(getTotalRejectedPermits(randomRouter) - totalRejectedPermits, atomicInteger.get());
        if (z2) {
            Assert.assertTrue(atomicInteger.get() > 0);
            if (z) {
                LOG.info("Release fanout lock that was taken before test");
                for (int i3 = 0; i3 < this.expectedHandlerPerNs.get("concurrent").intValue(); i3++) {
                    randomRouter.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().releasePermit("concurrent");
                }
            } else {
                for (String str2 : this.cluster.getNameservices()) {
                    for (int i4 = 0; i4 < this.expectedHandlerPerNs.get(str2).intValue(); i4++) {
                        randomRouter.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().releasePermit(str2);
                    }
                }
            }
        } else {
            Assert.assertEquals("Number of failed RPCs without fairness configured", 0L, atomicInteger.get());
        }
        int totalAcceptedPermits = getTotalAcceptedPermits(randomRouter);
        innerCalls(fileSystemURI, 10, z, hdfsConfiguration, new AtomicInteger());
        Assert.assertEquals(getTotalAcceptedPermits(randomRouter) - totalAcceptedPermits, 10L);
        Assert.assertEquals(r0.get(), 0L);
    }

    private void invokeSequential(ClientProtocol clientProtocol) throws IOException {
        clientProtocol.getFileInfo("/test.txt");
    }

    private void invokeConcurrent(ClientProtocol clientProtocol, String str) throws IOException {
        clientProtocol.renewLease(str, (List) null);
    }

    private int getTotalRejectedPermits(MiniRouterDFSCluster.RouterContext routerContext) {
        int i = 0;
        Iterator<String> it = this.cluster.getNameservices().iterator();
        while (it.hasNext()) {
            i = (int) (i + routerContext.getRouterRpcClient().getRejectedPermitForNs(it.next()).longValue());
        }
        return (int) (i + routerContext.getRouterRpcClient().getRejectedPermitForNs("concurrent").longValue());
    }

    private int getTotalAcceptedPermits(MiniRouterDFSCluster.RouterContext routerContext) {
        int i = 0;
        Iterator<String> it = this.cluster.getNameservices().iterator();
        while (it.hasNext()) {
            i = (int) (i + routerContext.getRouterRpcClient().getAcceptedPermitForNs(it.next()).longValue());
        }
        return (int) (i + routerContext.getRouterRpcClient().getAcceptedPermitForNs("concurrent").longValue());
    }

    private void innerCalls(URI uri, int i, boolean z, Configuration configuration, AtomicInteger atomicInteger) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            DFSClient dFSClient = null;
            try {
                try {
                    dFSClient = new DFSClient(uri, configuration);
                    String clientName = dFSClient.getClientName();
                    ClientProtocol namenode = dFSClient.getNamenode();
                    if (z) {
                        invokeConcurrent(namenode, clientName);
                    } else {
                        invokeSequential(namenode);
                    }
                    if (dFSClient != null) {
                        try {
                            dFSClient.close();
                        } catch (IOException e) {
                            LOG.error("Cannot close the client");
                        }
                    }
                } catch (Throwable th) {
                    if (dFSClient != null) {
                        try {
                            dFSClient.close();
                        } catch (IOException e2) {
                            LOG.error("Cannot close the client");
                        }
                    }
                    throw th;
                }
            } catch (RemoteException e3) {
                IOException unwrapRemoteException = e3.unwrapRemoteException();
                Assert.assertTrue("Wrong exception: " + unwrapRemoteException, unwrapRemoteException instanceof StandbyException);
                GenericTestUtils.assertExceptionContains("is overloaded for NS", unwrapRemoteException);
                atomicInteger.incrementAndGet();
                if (dFSClient != null) {
                    try {
                        dFSClient.close();
                    } catch (IOException e4) {
                        LOG.error("Cannot close the client");
                    }
                }
            }
            atomicInteger.get();
        }
    }

    private static Map<String, Integer> expectedHandlerPerNs(String str) {
        HashMap hashMap = new HashMap();
        if (str == null) {
            return hashMap;
        }
        for (String str2 : str.split(", ")) {
            String[] split = str2.split(":");
            if (split.length == 2) {
                hashMap.put(split[0], Integer.valueOf(split[1]));
            }
        }
        return hashMap;
    }

    private static Map<String, String> setConfiguration(String str) {
        HashMap hashMap = new HashMap();
        if (str == null) {
            return hashMap;
        }
        for (String str2 : str.split(", ")) {
            String[] split = str2.split("=");
            if (split.length == 2) {
                hashMap.put(split[0], split[1]);
            }
        }
        return hashMap;
    }
}
