package alluxio.client.block.stream;

import alluxio.ClientContext;
import alluxio.ConfigurationRule;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.Chunk;
import alluxio.grpc.RequestType;
import alluxio.grpc.WriteRequest;
import alluxio.resource.CloseableResource;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.BufferUtils;
import alluxio.wire.WorkerNetAddress;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PrepareForTest({FileSystemContext.class, WorkerNetAddress.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:alluxio/client/block/stream/GrpcDataWriterTest.class */
public final class GrpcDataWriterTest {
    private static final int CHUNK_SIZE = 1024;
    private static final long BLOCK_ID = 1;
    private static final int TIER = 0;
    private FileSystemContext mContext;
    private WorkerNetAddress mAddress;
    private BlockWorkerClient mClient;
    private ClientCallStreamObserver<WriteRequest> mRequestObserver;
    private ClientContext mClientContext;
    private static final Logger LOG = LoggerFactory.getLogger(GrpcDataWriterTest.class);
    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("test-executor-%d", true));
    private static final Random RANDOM = new Random();
    private InstancedConfiguration mConf = Configuration.copyGlobal();

    @Rule
    public ConfigurationRule mConfigurationRule = new ConfigurationRule(PropertyKey.USER_STREAMING_WRITER_CHUNK_SIZE_BYTES, String.valueOf(CHUNK_SIZE), this.mConf);

    @Before
    public void before() throws Exception {
        this.mClientContext = ClientContext.create(this.mConf);
        this.mContext = (FileSystemContext) PowerMockito.mock(FileSystemContext.class);
        this.mAddress = (WorkerNetAddress) Mockito.mock(WorkerNetAddress.class);
        this.mClient = (BlockWorkerClient) Mockito.mock(BlockWorkerClient.class);
        this.mRequestObserver = (ClientCallStreamObserver) Mockito.mock(ClientCallStreamObserver.class);
        PowerMockito.when(this.mContext.acquireBlockWorkerClient(this.mAddress)).thenReturn(new NoopClosableResource(this.mClient));
        PowerMockito.when(this.mContext.getClientContext()).thenReturn(this.mClientContext);
        PowerMockito.when(this.mContext.getClusterConf()).thenReturn(this.mConf);
        PowerMockito.when(this.mClient.writeBlock((StreamObserver) ArgumentMatchers.any(StreamObserver.class))).thenReturn(this.mRequestObserver);
        PowerMockito.when(Boolean.valueOf(this.mRequestObserver.isReady())).thenReturn(true);
    }

    @After
    public void after() throws Exception {
        this.mClient.close();
    }

    @Test(timeout = 60000)
    public void writeEmptyFile() throws Exception {
        DataWriter create = create(10L);
        Throwable th = TIER;
        try {
            try {
                long verifyWriteRequests = verifyWriteRequests(this.mClient, 0L, 10L);
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assert.assertEquals(0L, verifyWriteRequests);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 60000)
    public void writeFullFile() throws Exception {
        DataWriter create = create(1048917L);
        Throwable th = TIER;
        try {
            try {
                Future<Long> writeFile = writeFile(create, 1048917L, 0L, 1048917 - BLOCK_ID);
                writeFile.get();
                long verifyWriteRequests = verifyWriteRequests(this.mClient, 0L, 1048917 - BLOCK_ID);
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assert.assertEquals(writeFile.get().longValue(), verifyWriteRequests);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 60000)
    public void writeFileChecksumOfPartialFile() throws Exception {
        DataWriter create = create(1048917L);
        Throwable th = TIER;
        try {
            try {
                Future<Long> writeFile = writeFile(create, 1048917L, 10L, 1048917 / 3);
                writeFile.get();
                long verifyWriteRequests = verifyWriteRequests(this.mClient, 10L, 1048917 / 3);
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assert.assertEquals(writeFile.get().longValue(), verifyWriteRequests);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 60000)
    public void writeFileUnknownLength() throws Exception {
        DataWriter create = create(Long.MAX_VALUE);
        Throwable th = TIER;
        try {
            try {
                Future<Long> writeFile = writeFile(create, 1048576L, 10L, 1048576 / 3);
                writeFile.get();
                long verifyWriteRequests = verifyWriteRequests(this.mClient, 10L, 1048576 / 3);
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assert.assertEquals(writeFile.get().longValue(), verifyWriteRequests);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 60000)
    public void writeFileManyChunks() throws Exception {
        DataWriter create = create(Long.MAX_VALUE);
        Throwable th = TIER;
        try {
            try {
                Future<Long> writeFile = writeFile(create, 30720341L, 10L, 30720341 / 3);
                writeFile.get();
                long verifyWriteRequests = verifyWriteRequests(this.mClient, 10L, 30720341 / 3);
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assert.assertEquals(writeFile.get().longValue(), verifyWriteRequests);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void closedBlockWorkerClientTest() throws IOException {
        CloseableResource closeableResource = (CloseableResource) Mockito.mock(CloseableResource.class);
        PowerMockito.when(closeableResource.get()).thenReturn(this.mClient);
        FileSystemContext fileSystemContext = (FileSystemContext) PowerMockito.mock(FileSystemContext.class);
        PowerMockito.when(fileSystemContext.acquireBlockWorkerClient((WorkerNetAddress) ArgumentMatchers.any(WorkerNetAddress.class))).thenReturn(closeableResource);
        PowerMockito.when(fileSystemContext.getClusterConf()).thenReturn(this.mConf);
        this.mConf.set(PropertyKey.USER_STREAMING_WRITER_CLOSE_TIMEOUT, "1");
        GrpcDataWriter create = GrpcDataWriter.create(fileSystemContext, this.mAddress, BLOCK_ID, 0L, RequestType.ALLUXIO_BLOCK, OutStreamOptions.defaults(this.mContext).setWriteTier(TIER));
        ((CloseableResource) Mockito.verify(closeableResource, Mockito.times(TIER))).close();
        verifyWriteRequests(this.mClient, 0L, 0L);
        create.close();
        ((CloseableResource) Mockito.verify(closeableResource, Mockito.times(1))).close();
    }

    private DataWriter create(long j) throws Exception {
        return GrpcDataWriter.create(this.mContext, this.mAddress, BLOCK_ID, j, RequestType.ALLUXIO_BLOCK, OutStreamOptions.defaults(this.mContext).setWriteTier(TIER));
    }

    private Future<Long> writeFile(final DataWriter dataWriter, final long j, final long j2, final long j3) throws Exception {
        return EXECUTOR.submit(new Callable<Long>() { // from class: alluxio.client.block.stream.GrpcDataWriterTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws IOException {
                try {
                    long j4 = 0;
                    long j5 = 0;
                    long j6 = j;
                    while (j6 > 0) {
                        int min = (int) Math.min(j6, 1024L);
                        byte[] bArr = new byte[min];
                        GrpcDataWriterTest.RANDOM.nextBytes(bArr);
                        try {
                            dataWriter.writeChunk(Unpooled.wrappedBuffer(bArr));
                            j6 -= min;
                            for (int i = GrpcDataWriterTest.TIER; i < bArr.length; i++) {
                                if (j5 >= j2 && j5 <= j3) {
                                    j4 += BufferUtils.byteToInt(bArr[i]);
                                }
                                j5 += GrpcDataWriterTest.BLOCK_ID;
                            }
                        } catch (Exception e) {
                            Assert.fail(e.getMessage());
                            throw e;
                        }
                    }
                    return Long.valueOf(j4);
                } catch (Throwable th) {
                    GrpcDataWriterTest.LOG.error("Failed to write file.", th);
                    Assert.fail();
                    throw th;
                }
            }
        });
    }

    private long verifyWriteRequests(BlockWorkerClient blockWorkerClient, long j, long j2) {
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(WriteRequest.class);
            ((ClientCallStreamObserver) Mockito.verify(this.mRequestObserver, Mockito.atLeastOnce())).onNext(forClass.capture());
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(StreamObserver.class);
            ((BlockWorkerClient) Mockito.verify(this.mClient)).writeBlock((StreamObserver) forClass2.capture());
            ((StreamObserver) forClass2.getValue()).onCompleted();
            long j3 = 0;
            long j4 = 0;
            for (WriteRequest writeRequest : forClass.getAllValues()) {
                validateWriteRequest(writeRequest, j4);
                if (writeRequest.hasChunk()) {
                    Chunk chunk = writeRequest.getChunk();
                    Assert.assertTrue(chunk.hasData());
                    byte[] byteArray = chunk.getData().toByteArray();
                    int length = byteArray.length;
                    for (int i = TIER; i < length; i++) {
                        byte b = byteArray[i];
                        if (j4 >= j && j4 <= j2) {
                            j3 += BufferUtils.byteToInt(b);
                        }
                        j4 += BLOCK_ID;
                    }
                }
            }
            return j3;
        } catch (Throwable th) {
            LOG.error("Failed to verify write requests.", th);
            Assert.fail();
            throw th;
        }
    }

    private void validateWriteRequest(WriteRequest writeRequest, long j) {
        if (!writeRequest.hasCommand()) {
            Assert.assertTrue(writeRequest.hasChunk());
        } else {
            Assert.assertEquals(BLOCK_ID, writeRequest.getCommand().getId());
            Assert.assertEquals(j, writeRequest.getCommand().getOffset());
        }
    }
}
