package org.apache.hadoop.hive.llap;

import com.google.protobuf.Message;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.class */
public class AsyncResponseHandlerTest {
    private AsyncResponseHandler responseHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncResponseHandlerTest$AssertTask.class */
    public interface AssertTask {
        void call() throws AssertionError;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncResponseHandlerTest$DummyAsyncRequest.class */
    public final class DummyAsyncRequest extends AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> {
        private volatile boolean isFinished;
        private Message returnValue;
        private Exception remoteException;

        protected DummyAsyncRequest(Message message, Exception exc, AsyncPbRpcProxy.ExecuteRequestCallback<Message> executeRequestCallback) {
            super((LlapNodeId) Mockito.mock(LlapNodeId.class), (Message) Mockito.mock(Message.class), executeRequestCallback);
            this.isFinished = false;
            this.returnValue = message;
            this.remoteException = exc;
        }

        public void callInternal() throws Exception {
        }

        public AsyncGet<Message, Exception> getResponseFuture() {
            return new AsyncGet<Message, Exception>() { // from class: org.apache.hadoop.hive.llap.AsyncResponseHandlerTest.DummyAsyncRequest.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public Message m0get(long j, TimeUnit timeUnit) throws Exception {
                    if (DummyAsyncRequest.this.remoteException != null) {
                        throw DummyAsyncRequest.this.remoteException;
                    }
                    return DummyAsyncRequest.this.returnValue;
                }

                public boolean isDone() {
                    return DummyAsyncRequest.this.isFinished;
                }
            };
        }

        public void finish() {
            this.isFinished = true;
        }
    }

    @Before
    public void setup() {
        this.responseHandler = new AsyncResponseHandler((AsyncPbRpcProxy.RequestManager) Mockito.mock(AsyncPbRpcProxy.RequestManager.class));
        this.responseHandler.start();
    }

    @After
    public void teardown() {
        this.responseHandler.shutdownNow();
    }

    @Test
    public void testAck() throws InterruptedException {
        AsyncPbRpcProxy.ExecuteRequestCallback<Message> executeRequestCallback = (AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.mock(AsyncPbRpcProxy.ExecuteRequestCallback.class);
        Message message = (Message) Mockito.mock(Message.class);
        DummyAsyncRequest createAsyncRequest = createAsyncRequest(message, null, executeRequestCallback);
        this.responseHandler.addToAsyncResponseFutureQueue(createAsyncRequest);
        ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(0))).setResponse((Message) Matchers.any());
        ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(0))).indicateError((Throwable) Matchers.any());
        createAsyncRequest.finish();
        assertTrueEventually(() -> {
            ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(1))).setResponse(message);
            ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(0))).indicateError((Throwable) Matchers.any());
        });
    }

    @Test
    public void testRemoteFail() throws InterruptedException {
        AsyncPbRpcProxy.ExecuteRequestCallback<Message> executeRequestCallback = (AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.mock(AsyncPbRpcProxy.ExecuteRequestCallback.class);
        Exception exc = new Exception();
        DummyAsyncRequest createAsyncRequest = createAsyncRequest(null, exc, executeRequestCallback);
        this.responseHandler.addToAsyncResponseFutureQueue(createAsyncRequest);
        ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(0))).setResponse((Message) Matchers.any());
        ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(0))).indicateError((Throwable) Matchers.any());
        createAsyncRequest.finish();
        assertTrueEventually(() -> {
            ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(1))).indicateError(exc);
            ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallback, Mockito.times(0))).setResponse((Message) Matchers.any());
        });
    }

    @Test
    public void testStress() throws InterruptedException {
        AsyncPbRpcProxy.ExecuteRequestCallback<Message>[] executeRequestCallbackArr = new AsyncPbRpcProxy.ExecuteRequestCallback[200000];
        final DummyAsyncRequest[] dummyAsyncRequestArr = new DummyAsyncRequest[200000];
        for (int i = 0; i < 200000; i++) {
            executeRequestCallbackArr[i] = (AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.mock(AsyncPbRpcProxy.ExecuteRequestCallback.class);
            dummyAsyncRequestArr[i] = createAsyncRequest(null, null, executeRequestCallbackArr[i]);
        }
        Thread[] threadArr = new Thread[10];
        for (int i2 = 0; i2 < 10; i2++) {
            final int i3 = i2 * 20000;
            final int i4 = (i2 + 1) * 20000;
            threadArr[i2] = new Thread(new Runnable() { // from class: org.apache.hadoop.hive.llap.AsyncResponseHandlerTest.1
                @Override // java.lang.Runnable
                public void run() {
                    for (int i5 = i3; i5 < i4; i5++) {
                        AsyncResponseHandlerTest.this.responseHandler.addToAsyncResponseFutureQueue(dummyAsyncRequestArr[i5]);
                    }
                }
            });
        }
        Thread thread = new Thread(new Runnable() { // from class: org.apache.hadoop.hive.llap.AsyncResponseHandlerTest.2
            @Override // java.lang.Runnable
            public void run() {
                Random random = new Random();
                int[] iArr = new int[200000];
                for (int i5 = 0; i5 < 200000; i5++) {
                    iArr[i5] = i5;
                }
                for (int i6 = 0; i6 < 200000; i6++) {
                    int nextInt = random.nextInt(200000);
                    int nextInt2 = random.nextInt(200000);
                    int i7 = iArr[nextInt];
                    iArr[nextInt] = iArr[nextInt2];
                    iArr[nextInt2] = i7;
                }
                for (int i8 = 0; i8 < 200000; i8++) {
                    dummyAsyncRequestArr[i8].finish();
                }
            }
        });
        for (int i5 = 0; i5 < 10; i5++) {
            threadArr[i5].start();
        }
        thread.start();
        for (int i6 = 0; i6 < 10; i6++) {
            threadArr[i6].join();
        }
        thread.join();
        assertTrueEventually(() -> {
            for (int i7 = 0; i7 < 200000; i7++) {
                ((AsyncPbRpcProxy.ExecuteRequestCallback) Mockito.verify(executeRequestCallbackArr[i7], Mockito.times(1))).setResponse((Message) null);
            }
        });
    }

    private DummyAsyncRequest createAsyncRequest(Message message, Exception exc, AsyncPbRpcProxy.ExecuteRequestCallback<Message> executeRequestCallback) {
        return new DummyAsyncRequest(message, exc, executeRequestCallback);
    }

    private void assertTrueEventually(AssertTask assertTask) throws InterruptedException {
        assertTrueEventually(assertTask, 100000);
    }

    private void assertTrueEventually(AssertTask assertTask, int i) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + i;
        AssertionError assertionError = null;
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                assertTask.call();
                return;
            } catch (AssertionError e) {
                assertionError = e;
                Thread.sleep(50L);
            }
        }
        throw assertionError;
    }
}
