package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azure.ITestWasbRemoteCallHelper;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.http.HttpResponse;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.class */
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
    private static final int TEST_EXECUTION_TIMEOUT = 120000;
    private static final String TEST_FILE_PATH = "testfile";
    private static final int TEN = 10;

    @Parameterized.Parameter
    public HttpOperationType httpOperationType;

    @Parameterized.Parameters(name = "{0}")
    public static Iterable<Object[]> params() {
        return Arrays.asList(new Object[]{HttpOperationType.JDK_HTTP_URL_CONNECTION}, new Object[]{HttpOperationType.APACHE_HTTP_CLIENT});
    }

    @Override // org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest
    public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception {
        Configuration configuration2 = new Configuration(configuration);
        configuration2.set("fs.azure.networking.library", this.httpOperationType.toString());
        return FileSystem.newInstance(configuration2);
    }

    @Test
    public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
        FSDataOutputStream create = getFileSystem(getRawConfiguration()).create(path(TEST_FILE_PATH));
        try {
            AbfsOutputStream wrappedStream = create.getWrappedStream();
            int writeMaxConcurrentRequestCount = getConfiguration().getWriteMaxConcurrentRequestCount();
            if (wrappedStream.isAppendBlobStream().booleanValue()) {
                writeMaxConcurrentRequestCount = 1;
            }
            Assertions.assertThat(wrappedStream.getMaxConcurrentRequestCount()).describedAs("maxConcurrentRequests should be " + writeMaxConcurrentRequestCount, new Object[0]).isEqualTo(writeMaxConcurrentRequestCount);
            Assertions.assertThat(wrappedStream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + getConfiguration().getMaxWriteRequestsToQueue(), new Object[0]).isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testMaxRequestsAndQueueCapacity() throws Exception {
        Configuration rawConfiguration = getRawConfiguration();
        int i = 6;
        rawConfiguration.set("fs.azure.write.max.concurrent.requests", 6);
        rawConfiguration.set("fs.azure.write.max.requests.to.queue", 10);
        FSDataOutputStream create = getFileSystem(rawConfiguration).create(path(TEST_FILE_PATH));
        try {
            AbfsOutputStream wrappedStream = create.getWrappedStream();
            if (wrappedStream.isAppendBlobStream().booleanValue()) {
                i = 1;
            }
            Assertions.assertThat(wrappedStream.getMaxConcurrentRequestCount()).describedAs("maxConcurrentRequests should be " + i, new Object[0]).isEqualTo(i);
            Assertions.assertThat(wrappedStream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + 10, new Object[0]).isEqualTo(10);
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 120000)
    public void testAzureBlobFileSystemBackReferenceInOutputStream() throws Exception {
        byte[] bArr = new byte[5120];
        AbfsOutputStream stream = getStream();
        for (int i = 0; i < 5; i++) {
            try {
                stream.write(bArr);
                stream.flush();
                System.gc();
                Assertions.assertThat(stream.getExecutorService().isShutdown() || stream.getExecutorService().isTerminated()).describedAs("Executor Service should not be closed before OutputStream while writing", new Object[0]).isFalse();
                Assertions.assertThat(stream.getFsBackRef().isNull()).describedAs("BackReference in output stream should not be null", new Object[0]).isFalse();
            } catch (Throwable th) {
                if (stream != null) {
                    try {
                        stream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (stream != null) {
            stream.close();
        }
    }

    @Test
    public void testAbfsOutputStreamClosingFsBeforeStream() throws Exception {
        AzureBlobFileSystem azureBlobFileSystem = new AzureBlobFileSystem();
        azureBlobFileSystem.initialize(new URI(getTestUrl()), new Configuration());
        byte[] bArr = new byte[5120];
        AbfsOutputStream createAbfsOutputStreamWithFlushEnabled = createAbfsOutputStreamWithFlushEnabled(azureBlobFileSystem, path(getMethodName()));
        try {
            createAbfsOutputStreamWithFlushEnabled.write(bArr);
            azureBlobFileSystem.close();
            String methodName = getMethodName();
            Objects.requireNonNull(createAbfsOutputStreamWithFlushEnabled);
            LambdaTestUtils.intercept(PathIOException.class, methodName, createAbfsOutputStreamWithFlushEnabled::close);
            if (createAbfsOutputStreamWithFlushEnabled != null) {
                createAbfsOutputStreamWithFlushEnabled.close();
            }
        } catch (Throwable th) {
            if (createAbfsOutputStreamWithFlushEnabled != null) {
                try {
                    createAbfsOutputStreamWithFlushEnabled.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testExpect100ContinueFailureInAppend() throws Exception {
        if (!getIsNamespaceEnabled(getFileSystem())) {
            Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        }
        Configuration configuration = new Configuration(getRawConfiguration());
        configuration.set("fs.azure.account.expect.header.enabled", "true");
        AzureBlobFileSystem fileSystem = getFileSystem(configuration);
        Path path = new Path("/testFile");
        AbfsOutputStream abfsOutputStream = (AbfsOutputStream) Mockito.spy(fileSystem.create(path).getWrappedStream());
        AzureIngressHandler azureIngressHandler = (AzureIngressHandler) Mockito.spy(abfsOutputStream.getIngressHandler());
        ((AbfsOutputStream) Mockito.doReturn(azureIngressHandler).when(abfsOutputStream)).getIngressHandler();
        AbfsClient abfsClient = (AbfsClient) Mockito.spy(azureIngressHandler.getClient());
        ((AzureIngressHandler) Mockito.doReturn(abfsClient).when(azureIngressHandler)).getClient();
        AbfsJdkHttpOperation[] abfsJdkHttpOperationArr = new AbfsHttpOperation[2];
        mockSetupForAppend(abfsJdkHttpOperationArr, abfsClient);
        ((AbfsOutputStream) Mockito.doReturn(abfsClient).when(abfsOutputStream)).getClient();
        fileSystem.delete(path, true);
        abfsOutputStream.write(1);
        if (abfsClient instanceof AbfsDfsClient) {
            Objects.requireNonNull(abfsOutputStream);
            LambdaTestUtils.intercept(FileNotFoundException.class, abfsOutputStream::close);
        } else {
            Objects.requireNonNull(abfsOutputStream);
            Assertions.assertThat(((IOException) LambdaTestUtils.intercept(IOException.class, abfsOutputStream::close)).getCause().getCause()).isInstanceOf(AbfsRestOperationException.class);
        }
        Assertions.assertThat(abfsJdkHttpOperationArr[0].getConnectionDisconnectedOnError()).describedAs("First try from AbfsClient will have expect-100 header and should fail with expect-100 error.", new Object[0]).isTrue();
        if (abfsJdkHttpOperationArr[0] instanceof AbfsJdkHttpOperation) {
            ((AbfsJdkHttpOperation) Mockito.verify(abfsJdkHttpOperationArr[0], Mockito.times(0))).processConnHeadersAndInputStreams((byte[]) Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
        }
        Assertions.assertThat(abfsJdkHttpOperationArr[1].getConnectionDisconnectedOnError()).describedAs("The retried operation from AbfsClient should not fail with expect-100 error. The retried operation does not haveexpect-100 header.", new Object[0]).isFalse();
        if (abfsJdkHttpOperationArr[1] instanceof AbfsJdkHttpOperation) {
            ((AbfsJdkHttpOperation) Mockito.verify(abfsJdkHttpOperationArr[1], Mockito.times(1))).processConnHeadersAndInputStreams((byte[]) Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
        }
    }

    private void mockSetupForAppend(AbfsHttpOperation[] abfsHttpOperationArr, AbfsClient abfsClient) {
        int[] iArr = {0};
        ((AbfsClient) Mockito.doAnswer(invocationOnMock -> {
            AbfsRestOperation abfsRestOperation = (AbfsRestOperation) Mockito.spy((AbfsRestOperation) invocationOnMock.callRealMethod());
            boolean[] zArr = new boolean[1];
            Iterator it = abfsRestOperation.getRequestHeaders().iterator();
            while (it.hasNext()) {
                if (((AbfsHttpHeader) it.next()).getName().equals("Expect")) {
                    zArr[0] = true;
                }
            }
            ((AbfsRestOperation) Mockito.doAnswer(invocationOnMock -> {
                abfsHttpOperationArr[iArr[0]] = (AbfsHttpOperation) Mockito.spy((AbfsHttpOperation) invocationOnMock.callRealMethod());
                if (zArr[0]) {
                    if (abfsHttpOperationArr[iArr[0]] instanceof AbfsJdkHttpOperation) {
                        ((AbfsJdkHttpOperation) Mockito.doAnswer(invocationOnMock -> {
                            OutputStream outputStream = (OutputStream) invocationOnMock.callRealMethod();
                            outputStream.write(1);
                            outputStream.close();
                            throw new ProtocolException("Server rejected operation");
                        }).when((AbfsJdkHttpOperation) abfsHttpOperationArr[iArr[0]])).getConnOutputStream();
                    } else {
                        ((AbfsAHCHttpOperation) Mockito.doAnswer(invocationOnMock2 -> {
                            throw new AbfsApacheHttpExpect100Exception((HttpResponse) invocationOnMock2.callRealMethod());
                        }).when((AbfsAHCHttpOperation) abfsHttpOperationArr[iArr[0]])).executeRequest();
                    }
                }
                int i = iArr[0];
                iArr[0] = i + 1;
                return abfsHttpOperationArr[i];
            }).when(abfsRestOperation)).createHttpOperation();
            return abfsRestOperation;
        }).when(abfsClient)).getAbfsRestOperation((AbfsRestOperationType) Mockito.any(AbfsRestOperationType.class), Mockito.anyString(), (URL) Mockito.any(URL.class), Mockito.anyList(), (byte[]) Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt(), (String) Mockito.nullable(String.class));
    }

    private AbfsOutputStream getStream() throws URISyntaxException, IOException {
        AzureBlobFileSystem azureBlobFileSystem = new AzureBlobFileSystem();
        azureBlobFileSystem.initialize(new URI(getTestUrl()), new Configuration());
        return createAbfsOutputStreamWithFlushEnabled(azureBlobFileSystem, path(getMethodName() + "1"));
    }

    @Test
    public void testValidateGetBlockList() throws Exception {
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(getFileSystem());
        Assume.assumeTrue(!getIsNamespaceEnabled(azureBlobFileSystem));
        AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
        assumeBlobServiceType();
        AbfsClientHandler abfsClientHandler = (AbfsClientHandler) Mockito.spy(azureBlobFileSystemStore.getClientHandler());
        AbfsBlobClient abfsBlobClient = (AbfsBlobClient) Mockito.spy(abfsClientHandler.getBlobClient());
        ((AzureBlobFileSystemStore) Mockito.doReturn(abfsClientHandler).when(azureBlobFileSystemStore)).getClientHandler();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getIngressClient();
        ((AzureBlobFileSystem) Mockito.doReturn(azureBlobFileSystemStore).when(azureBlobFileSystem)).getAbfsStore();
        Path path = new Path("/testFile");
        AbfsOutputStream abfsOutputStream = (AbfsOutputStream) Mockito.spy(azureBlobFileSystem.create(path).getWrappedStream());
        ((AbfsOutputStream) Mockito.doReturn(abfsClientHandler).when(abfsOutputStream)).getClientHandler();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
        ((AbfsBlobClient) Mockito.doThrow(new Throwable[]{getMockAbfsRestOperationException(409)}).when(abfsBlobClient)).getBlockList(Mockito.anyString(), (TracingContext) Mockito.any(TracingContext.class));
        abfsOutputStream.write(10);
        abfsOutputStream.hsync();
        abfsOutputStream.close();
        ((AzureBlobFileSystemStore) Mockito.doCallRealMethod().when(azureBlobFileSystemStore)).openFileForWrite((Path) Mockito.any(Path.class), (FileSystem.Statistics) Mockito.any(), Mockito.anyBoolean(), (TracingContext) Mockito.any(TracingContext.class));
        LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> {
            return azureBlobFileSystemStore.openFileForWrite(path, (FileSystem.Statistics) null, false, getTestTracingContext(azureBlobFileSystem, true));
        });
    }

    @Test
    public void testNoNetworkCallsForFlush() throws Exception {
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(getFileSystem());
        Assume.assumeTrue(!getIsNamespaceEnabled(azureBlobFileSystem));
        AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
        assumeBlobServiceType();
        AbfsClientHandler abfsClientHandler = (AbfsClientHandler) Mockito.spy(azureBlobFileSystemStore.getClientHandler());
        AbfsBlobClient abfsBlobClient = (AbfsBlobClient) Mockito.spy(abfsClientHandler.getBlobClient());
        ((AzureBlobFileSystemStore) Mockito.doReturn(abfsClientHandler).when(azureBlobFileSystemStore)).getClientHandler();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getIngressClient();
        ((AzureBlobFileSystem) Mockito.doReturn(azureBlobFileSystemStore).when(azureBlobFileSystem)).getAbfsStore();
        AbfsOutputStream abfsOutputStream = (AbfsOutputStream) Mockito.spy(azureBlobFileSystem.create(new Path("/testFile")).getWrappedStream());
        AzureIngressHandler azureIngressHandler = (AzureIngressHandler) Mockito.spy(abfsOutputStream.getIngressHandler());
        ((AbfsOutputStream) Mockito.doReturn(azureIngressHandler).when(abfsOutputStream)).getIngressHandler();
        ((AzureIngressHandler) Mockito.doReturn(abfsBlobClient).when(azureIngressHandler)).getClient();
        ((AbfsOutputStream) Mockito.doReturn(abfsClientHandler).when(abfsOutputStream)).getClientHandler();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
        abfsOutputStream.hsync();
        ((AbfsBlobClient) Mockito.verify(abfsBlobClient, Mockito.times(0))).append((String) Mockito.any(), (byte[]) Mockito.any(), (AppendRequestParameters) Mockito.any(), (String) Mockito.any(), (ContextEncryptionAdapter) Mockito.any(), (TracingContext) Mockito.any(TracingContext.class));
        ((AbfsBlobClient) Mockito.verify(abfsBlobClient, Mockito.times(0))).flush((byte[]) Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), (ContextEncryptionAdapter) Mockito.any(), (TracingContext) Mockito.any(TracingContext.class));
    }

    private AbfsRestOperationException getMockAbfsRestOperationException(int i) {
        return new AbfsRestOperationException(i, ITestWasbRemoteCallHelper.EMPTY_STRING, ITestWasbRemoteCallHelper.EMPTY_STRING, new Exception());
    }

    @Test
    public void testNoNetworkCallsForSecondFlush() throws Exception {
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(getFileSystem());
        Assume.assumeTrue(!getIsNamespaceEnabled(azureBlobFileSystem));
        AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
        assumeBlobServiceType();
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AbfsClientHandler abfsClientHandler = (AbfsClientHandler) Mockito.spy(azureBlobFileSystemStore.getClientHandler());
        AbfsBlobClient abfsBlobClient = (AbfsBlobClient) Mockito.spy(abfsClientHandler.getBlobClient());
        ((AzureBlobFileSystemStore) Mockito.doReturn(abfsClientHandler).when(azureBlobFileSystemStore)).getClientHandler();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getIngressClient();
        ((AzureBlobFileSystem) Mockito.doReturn(azureBlobFileSystemStore).when(azureBlobFileSystem)).getAbfsStore();
        AbfsOutputStream abfsOutputStream = (AbfsOutputStream) Mockito.spy(azureBlobFileSystem.create(new Path("/testFile")).getWrappedStream());
        AzureIngressHandler azureIngressHandler = (AzureIngressHandler) Mockito.spy(abfsOutputStream.getIngressHandler());
        ((AbfsOutputStream) Mockito.doReturn(azureIngressHandler).when(abfsOutputStream)).getIngressHandler();
        ((AzureIngressHandler) Mockito.doReturn(abfsBlobClient).when(azureIngressHandler)).getClient();
        ((AbfsOutputStream) Mockito.doReturn(abfsClientHandler).when(abfsOutputStream)).getClientHandler();
        ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
        abfsOutputStream.write(10);
        abfsOutputStream.hsync();
        abfsOutputStream.close();
        ((AbfsBlobClient) Mockito.verify(abfsBlobClient, Mockito.times(1))).append(Mockito.anyString(), (byte[]) Mockito.any(byte[].class), (AppendRequestParameters) Mockito.any(AppendRequestParameters.class), (String) Mockito.any(), (ContextEncryptionAdapter) Mockito.any(), (TracingContext) Mockito.any(TracingContext.class));
        ((AbfsBlobClient) Mockito.verify(abfsBlobClient, Mockito.times(1))).flush((byte[]) Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), (String) Mockito.any(), (String) Mockito.any(), Mockito.anyString(), (ContextEncryptionAdapter) Mockito.any(), (TracingContext) Mockito.any(TracingContext.class));
    }
}
