package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.class */
public class TimeoutCallStackTest {
    private static ActorSystem actorSystem;
    private static RpcService rpcService;
    private final List<RpcEndpoint> endpointsToStop = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest$TestingGateway.class */
    public interface TestingGateway extends RpcGateway {
        CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time time);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest$TestingRpcEndpoint.class */
    public static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway {
        TestingRpcEndpoint(RpcService rpcService, String str) {
            super(rpcService, str);
        }

        @Override // org.apache.flink.runtime.rpc.akka.TimeoutCallStackTest.TestingGateway
        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time time) {
            return new CompletableFuture<>();
        }
    }

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void teardown() throws Exception {
        FutureUtils.waitForAll(Arrays.asList(rpcService.stopService(), AkkaFutureUtils.toJava(actorSystem.terminate()))).get(10000L, TimeUnit.MILLISECONDS);
    }

    @After
    public void stopTestEndpoints() {
        this.endpointsToStop.forEach((v0) -> {
            IOUtils.closeQuietly(v0);
        });
    }

    @Test
    public void testTimeoutException() throws Exception {
        Throwable th = null;
        try {
            createTestingGateway().callThatTimesOut(Time.milliseconds(1L)).get();
            Assert.fail("test buggy: the call should never have completed");
        } catch (ExecutionException e) {
            th = e.getCause();
        }
        Assert.assertThat(th, Matchers.instanceOf(TimeoutException.class));
        Assert.assertThat(th.getMessage(), Matchers.containsString("callThatTimesOut"));
        Assert.assertThat(th.getStackTrace()[0].getMethodName(), Matchers.equalTo("testTimeoutException"));
    }

    private TestingGateway createTestingGateway() throws Exception {
        TestingRpcEndpoint testingRpcEndpoint = new TestingRpcEndpoint(rpcService, "test_name");
        this.endpointsToStop.add(testingRpcEndpoint);
        testingRpcEndpoint.start();
        return (TestingGateway) rpcService.connect(testingRpcEndpoint.getAddress(), TestingGateway.class).get();
    }
}
