package org.apache.hadoop.ipc;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.TestRpcBase;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestProtosLegacy;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtosLegacy;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.tools.TestCommandShell;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hadoop/ipc/TestProtoBufRpc.class */
public class TestProtoBufRpc extends TestRpcBase {
    private static RPC.Server server;
    private static final int SLEEP_DURATION = 1000;
    private boolean testWithLegacy;
    private boolean testWithLegacyFirst;

    /* loaded from: input_file:org/apache/hadoop/ipc/TestProtoBufRpc$PBServer2Impl.class */
    public static class PBServer2Impl implements TestRpcService2 {
        @Override // org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto.BlockingInterface
        public TestProtos.EmptyResponseProto ping2(RpcController rpcController, TestProtos.EmptyRequestProto emptyRequestProto) throws ServiceException {
            return TestProtos.EmptyResponseProto.newBuilder().m740build();
        }

        @Override // org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto.BlockingInterface
        public TestProtos.EchoResponseProto echo2(RpcController rpcController, TestProtos.EchoRequestProto echoRequestProto) throws ServiceException {
            return TestProtos.EchoResponseProto.newBuilder().setMessage(echoRequestProto.getMessage()).m598build();
        }

        @Override // org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto.BlockingInterface
        public TestProtos.SleepResponseProto sleep(RpcController rpcController, TestProtos.SleepRequestProto sleepRequestProto) throws ServiceException {
            try {
                Thread.sleep(sleepRequestProto.getMilliSeconds());
            } catch (InterruptedException e) {
            }
            return TestProtos.SleepResponseProto.newBuilder().m1069build();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/ipc/TestProtoBufRpc$PBServer2ImplLegacy.class */
    public static class PBServer2ImplLegacy implements TestRpcService2Legacy {
        @Override // org.apache.hadoop.ipc.protobuf.TestRpcServiceProtosLegacy.TestProtobufRpc2Proto.BlockingInterface
        public TestProtosLegacy.EmptyResponseProto ping2(com.google.protobuf.RpcController rpcController, TestProtosLegacy.EmptyRequestProto emptyRequestProto) throws com.google.protobuf.ServiceException {
            return TestProtosLegacy.EmptyResponseProto.newBuilder().m1465build();
        }

        @Override // org.apache.hadoop.ipc.protobuf.TestRpcServiceProtosLegacy.TestProtobufRpc2Proto.BlockingInterface
        public TestProtosLegacy.EchoResponseProto echo2(com.google.protobuf.RpcController rpcController, TestProtosLegacy.EchoRequestProto echoRequestProto) throws com.google.protobuf.ServiceException {
            return TestProtosLegacy.EchoResponseProto.newBuilder().setMessage(echoRequestProto.getMessage()).m1389build();
        }

        @Override // org.apache.hadoop.ipc.protobuf.TestRpcServiceProtosLegacy.TestProtobufRpc2Proto.BlockingInterface
        public TestProtosLegacy.SleepResponseProto sleep(com.google.protobuf.RpcController rpcController, TestProtosLegacy.SleepRequestProto sleepRequestProto) throws com.google.protobuf.ServiceException {
            try {
                Thread.sleep(sleepRequestProto.getMilliSeconds());
            } catch (InterruptedException e) {
            }
            return TestProtosLegacy.SleepResponseProto.newBuilder().m1640build();
        }
    }

    @ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
    /* loaded from: input_file:org/apache/hadoop/ipc/TestProtoBufRpc$TestRpcService2.class */
    public interface TestRpcService2 extends TestRpcServiceProtos.TestProtobufRpc2Proto.BlockingInterface {
    }

    @ProtocolInfo(protocolName = "testProtoLegacy", protocolVersion = 1)
    /* loaded from: input_file:org/apache/hadoop/ipc/TestProtoBufRpc$TestRpcService2Legacy.class */
    public interface TestRpcService2Legacy extends TestRpcServiceProtosLegacy.TestProtobufRpc2Proto.BlockingInterface {
    }

    public void initTestProtoBufRpc(Boolean bool, Boolean bool2) throws IOException {
        this.testWithLegacy = bool.booleanValue();
        this.testWithLegacyFirst = bool2.booleanValue();
        setUp();
    }

    public static Collection<Object[]> params() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{Boolean.TRUE, Boolean.TRUE});
        arrayList.add(new Object[]{Boolean.TRUE, Boolean.FALSE});
        arrayList.add(new Object[]{Boolean.FALSE, Boolean.FALSE});
        return arrayList;
    }

    public void setUp() throws IOException {
        conf = new Configuration();
        conf.setInt("ipc.maximum.data.length", 1024);
        conf.setBoolean("ipc.server.log.slow.rpc", true);
        if (this.testWithLegacy) {
            RPC.setProtocolEngine(conf, TestRpcService2Legacy.class, ProtobufRpcEngine.class);
        }
        RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class, ProtobufRpcEngine2.class);
        RPC.setProtocolEngine(conf, TestRpcService2.class, ProtobufRpcEngine2.class);
        BlockingService newReflectiveBlockingService = TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(new TestRpcBase.PBServerImpl());
        if (this.testWithLegacy && this.testWithLegacyFirst) {
            server = new RPC.Builder(conf).setProtocol(TestRpcService2Legacy.class).setInstance(TestRpcServiceProtosLegacy.TestProtobufRpc2Proto.newReflectiveBlockingService(new PBServer2ImplLegacy())).setBindAddress("0.0.0.0").setPort(0).build();
            server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcBase.TestRpcService.class, newReflectiveBlockingService);
        } else {
            server = new RPC.Builder(conf).setProtocol(TestRpcBase.TestRpcService.class).setInstance(newReflectiveBlockingService).setBindAddress("0.0.0.0").setPort(0).build();
        }
        addr = NetUtils.getConnectAddress(server);
        server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, TestRpcServiceProtos.TestProtobufRpc2Proto.newReflectiveBlockingService(new PBServer2Impl()));
        if (this.testWithLegacy && !this.testWithLegacyFirst) {
            server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2Legacy.class, TestRpcServiceProtosLegacy.TestProtobufRpc2Proto.newReflectiveBlockingService(new PBServer2ImplLegacy()));
        }
        server.start();
    }

    @AfterEach
    public void tearDown() throws Exception {
        server.stop();
    }

    private TestRpcService2 getClient2() throws IOException {
        return (TestRpcService2) RPC.getProxy(TestRpcService2.class, 0L, addr, conf);
    }

    private TestRpcService2Legacy getClientLegacy() throws IOException {
        return (TestRpcService2Legacy) RPC.getProxy(TestRpcService2Legacy.class, 0L, addr, conf);
    }

    @MethodSource({"params"})
    @Timeout(5)
    @ParameterizedTest
    public void testProtoBufRpc(boolean z, boolean z2) throws Exception {
        initTestProtoBufRpc(Boolean.valueOf(z), Boolean.valueOf(z2));
        testProtoBufRpc(getClient(addr, conf));
    }

    public static void testProtoBufRpc(TestRpcBase.TestRpcService testRpcService) throws Exception {
        testRpcService.ping(null, newEmptyRequest());
        Assertions.assertThat(testRpcService.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(TestCommandShell.Example.HELLO).m503build()).getMessage()).isEqualTo(TestCommandShell.Example.HELLO);
        try {
            testRpcService.error(null, newEmptyRequest());
            org.junit.jupiter.api.Assertions.fail("Expected exception is not thrown");
        } catch (ServiceException e) {
            RemoteException cause = e.getCause();
            Assertions.assertThat(cause.unwrapRemoteException(new Class[]{RpcServerException.class})).isNotNull();
            Assertions.assertThat(cause.getErrorCode()).isEqualTo(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_RPC_SERVER);
        }
    }

    @MethodSource({"params"})
    @ParameterizedTest
    public void testProtoBufRpc2(boolean z, boolean z2) throws Exception {
        initTestProtoBufRpc(Boolean.valueOf(z), Boolean.valueOf(z2));
        TestRpcService2 client2 = getClient2();
        client2.ping2(null, newEmptyRequest());
        Assertions.assertThat(client2.echo2(null, newEchoRequest(TestCommandShell.Example.HELLO)).getMessage()).isEqualTo(TestCommandShell.Example.HELLO);
        MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(server.getRpcMetrics().name());
        MetricsAsserts.assertCounterGt("RpcQueueTimeNumOps", 0L, metrics);
        MetricsAsserts.assertCounterGt("RpcProcessingTimeNumOps", 0L, metrics);
        MetricsAsserts.assertCounterGt("Echo2NumOps", 0L, MetricsAsserts.getMetrics(server.getRpcDetailedMetrics().name()));
        if (this.testWithLegacy) {
            testProtobufLegacy();
        }
    }

    private void testProtobufLegacy() throws IOException, com.google.protobuf.ServiceException {
        TestRpcService2Legacy clientLegacy = getClientLegacy();
        clientLegacy.ping2(null, TestProtosLegacy.EmptyRequestProto.newBuilder().m1440build());
        Assertions.assertThat(clientLegacy.echo2(null, TestProtosLegacy.EchoRequestProto.newBuilder().setMessage(TestCommandShell.Example.HELLO).m1338build()).getMessage()).isEqualTo(TestCommandShell.Example.HELLO);
        MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(server.getRpcMetrics().name());
        MetricsAsserts.assertCounterGt("RpcQueueTimeNumOps", 0L, metrics);
        MetricsAsserts.assertCounterGt("RpcProcessingTimeNumOps", 0L, metrics);
        MetricsAsserts.assertCounterGt("Echo2NumOps", 0L, MetricsAsserts.getMetrics(server.getRpcDetailedMetrics().name()));
    }

    @MethodSource({"params"})
    @Timeout(5)
    @ParameterizedTest
    public void testProtoBufRandomException(boolean z, boolean z2) throws Exception {
        initTestProtoBufRpc(Boolean.valueOf(z), Boolean.valueOf(z2));
        Assume.assumeFalse(this.testWithLegacy);
        try {
            getClient(addr, conf).error2(null, newEmptyRequest());
        } catch (ServiceException e) {
            Assertions.assertThat(e.getCause()).isInstanceOf(RemoteException.class);
            RemoteException cause = e.getCause();
            Assertions.assertThat(cause.getClassName()).isEqualTo(URISyntaxException.class.getName());
            Assertions.assertThat(cause.getMessage()).contains(new CharSequence[]{"testException"});
            Assertions.assertThat(cause.getErrorCode()).isEqualTo(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_APPLICATION);
        }
    }

    @MethodSource({"params"})
    @Timeout(6)
    @ParameterizedTest
    public void testExtraLongRpc(boolean z, boolean z2) throws Exception {
        initTestProtoBufRpc(Boolean.valueOf(z), Boolean.valueOf(z2));
        Assume.assumeFalse(this.testWithLegacy);
        TestRpcService2 client2 = getClient2();
        String repeat = StringUtils.repeat("X", 4);
        Assertions.assertThat(client2.echo2(null, newEchoRequest(repeat)).getMessage()).isEqualTo(repeat);
        try {
            client2.echo2(null, newEchoRequest(StringUtils.repeat("X", 4096)));
            org.junit.jupiter.api.Assertions.fail("expected extra-long RPC to fail");
        } catch (ServiceException e) {
        }
    }

    @MethodSource({"params"})
    @Timeout(12)
    @ParameterizedTest
    public void testLogSlowRPC(boolean z, boolean z2) throws IOException, ServiceException, TimeoutException, InterruptedException {
        initTestProtoBufRpc(Boolean.valueOf(z), Boolean.valueOf(z2));
        Assume.assumeFalse(this.testWithLegacy);
        server.setLogSlowRPCThresholdTime(1000L);
        TestRpcService2 client2 = getClient2();
        for (int i = 0; i < 10000; i++) {
            try {
                client2.ping2(null, newEmptyRequest());
            } catch (Exception e) {
                throw e;
            }
        }
        RpcMetrics rpcMetrics = server.getRpcMetrics();
        Assertions.assertThat(rpcMetrics.getProcessingSampleCount()).isGreaterThan(999L);
        long rpcSlowCalls = rpcMetrics.getRpcSlowCalls();
        client2.sleep(null, newSleepRequest(500));
        Assertions.assertThat(rpcSlowCalls).isEqualTo(rpcMetrics.getRpcSlowCalls());
        client2.sleep(null, newSleepRequest(3000));
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(rpcMetrics.getRpcSlowCalls() == rpcSlowCalls + 1);
        }, 10L, 1000L);
    }

    @MethodSource({"params"})
    @Timeout(12)
    @ParameterizedTest
    public void testEnsureNoLogIfDisabled(boolean z, boolean z2) throws IOException, ServiceException {
        initTestProtoBufRpc(Boolean.valueOf(z), Boolean.valueOf(z2));
        Assume.assumeFalse(this.testWithLegacy);
        server.setLogSlowRPC(false);
        TestRpcService2 client2 = getClient2();
        for (int i = 0; i < 10000; i++) {
            client2.ping2(null, newEmptyRequest());
        }
        RpcMetrics rpcMetrics = server.getRpcMetrics();
        Assertions.assertThat(rpcMetrics.getProcessingSampleCount()).isGreaterThan(999L);
        long rpcSlowCalls = rpcMetrics.getRpcSlowCalls();
        client2.sleep(null, newSleepRequest(SLEEP_DURATION));
        Assertions.assertThat(rpcSlowCalls).isEqualTo(rpcMetrics.getRpcSlowCalls());
    }
}
