package org.apache.hadoop.fs.azurebfs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.ITestWasbRemoteCallHelper;
import org.apache.hadoop.fs.azure.integration.Sizes;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AzureBlobIngressHandler;
import org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler;
import org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.store.BlockUploadStatistics;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.class */
public class ITestAzureBlobFileSystemAppend extends AbstractAbfsIntegrationTest {
    private static final String TEST_FILE_PATH = "testfile";
    private static final String TEST_FOLDER_PATH = "testFolder";
    private static final int TEN = 10;
    private static final int TWENTY = 20;
    private static final int THIRTY = 30;
    private static final int HUNDRED = 100;

    @Test(expected = FileNotFoundException.class)
    public void testAppendDirShouldFail() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FILE_PATH);
        fileSystem.mkdirs(path);
        fileSystem.append(path, 0).close();
    }

    @Test
    public void testAppendWithLength0() throws Exception {
        FSDataOutputStream create = getFileSystem().create(path(TEST_FILE_PATH));
        try {
            byte[] bArr = new byte[Sizes.S_1K];
            new Random().nextBytes(bArr);
            create.write(bArr, 1000, 0);
            assertEquals(0L, create.getPos());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(expected = FileNotFoundException.class)
    public void testAppendFileAfterDelete() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FILE_PATH);
        ContractTestUtils.touch(fileSystem, path);
        fileSystem.delete(path, false);
        fileSystem.append(path).close();
    }

    @Test(expected = FileNotFoundException.class)
    public void testAppendDirectory() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FOLDER_PATH);
        fileSystem.mkdirs(path);
        fileSystem.append(path).close();
    }

    @Test
    public void testTracingForAppend() throws IOException {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FILE_PATH);
        fileSystem.create(path).close();
        fileSystem.registerListener(new TracingHeaderValidator(fileSystem.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), fileSystem.getFileSystemId(), FSOperationType.APPEND, false, 0));
        fileSystem.append(path, 10);
    }

    @Test
    public void testCloseOfDataBlockOnAppendComplete() throws Exception {
        HashSet<String> hashSet = new HashSet();
        hashSet.add("disk");
        hashSet.add("bytebuffer");
        hashSet.add("array");
        for (String str : hashSet) {
            Configuration configuration = new Configuration(getRawConfiguration());
            configuration.set("fs.azure.data.blocks.buffer", str);
            AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(FileSystem.newInstance(configuration));
            try {
                AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
                ((AzureBlobFileSystem) Mockito.doReturn(azureBlobFileSystemStore).when(azureBlobFileSystem)).getAbfsStore();
                DataBlocks.DataBlock[] dataBlockArr = new DataBlocks.DataBlock[1];
                ((AzureBlobFileSystemStore) Mockito.doAnswer(invocationOnMock -> {
                    DataBlocks.BlockFactory blockFactory = (DataBlocks.BlockFactory) Mockito.spy((DataBlocks.BlockFactory) invocationOnMock.callRealMethod());
                    ((DataBlocks.BlockFactory) Mockito.doAnswer(invocationOnMock -> {
                        dataBlockArr[0] = (DataBlocks.DataBlock) Mockito.spy((DataBlocks.DataBlock) invocationOnMock.callRealMethod());
                        return dataBlockArr[0];
                    }).when(blockFactory)).create(Mockito.anyLong(), Mockito.anyInt(), (BlockUploadStatistics) Mockito.any(BlockUploadStatistics.class));
                    return blockFactory;
                }).when(azureBlobFileSystemStore)).getBlockFactory();
                FSDataOutputStream create = azureBlobFileSystem.create(new Path(getMethodName() + "_" + str));
                try {
                    create.write(new byte[1]);
                    Assertions.assertThat(dataBlockArr[0].getState()).describedAs("On write of data in outputStream, state should become Writing", new Object[0]).isEqualTo(DataBlocks.DataBlock.DestState.Writing);
                    create.close();
                    ((DataBlocks.DataBlock) Mockito.verify(dataBlockArr[0], Mockito.times(1))).close();
                    Assertions.assertThat(dataBlockArr[0].getState()).describedAs("On close of outputStream, state should become Closed", new Object[0]).isEqualTo(DataBlocks.DataBlock.DestState.Closed);
                    if (create != null) {
                        create.close();
                    }
                    if (azureBlobFileSystem != null) {
                        azureBlobFileSystem.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (azureBlobFileSystem != null) {
                    try {
                        azureBlobFileSystem.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    public void testCreateOverDfsAppendOverBlob() throws IOException {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FILE_PATH);
        fileSystem.getAbfsStore().getClientHandler().getDfsClient().createPath(makeQualified(path).toUri().getPath(), true, false, new AzureBlobFileSystemStore.Permissions(false, FsPermission.getDefault(), FsPermission.getUMask(fileSystem.getConf())), false, (String) null, (ContextEncryptionAdapter) null, getTestTracingContext(fileSystem, true));
        fileSystem.getAbfsStore().getAbfsConfiguration().set("fs.azure.ingress.service.type", AbfsServiceType.BLOB.name());
        FSDataOutputStream append = fileSystem.append(path);
        Assertions.assertThat(append.getWrappedStream().getIngressHandler().getClient()).as("Blob client was not used before fallback", new Object[0]).isInstanceOf(AbfsBlobClient.class);
        append.write(10);
        append.hsync();
        append.write(TWENTY);
        append.hsync();
        append.write(THIRTY);
        append.hsync();
        Assertions.assertThat(append.getWrappedStream().getIngressHandler().getClient()).as("DFS client was not used after fallback", new Object[0]).isInstanceOf(AbfsDfsClient.class);
    }

    @Test
    public void testCreateOverBlobAppendOverDfs() throws IOException {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        Configuration rawConfiguration = getRawConfiguration();
        rawConfiguration.setBoolean("fs.azure.enable.dfstoblob.fallback", true);
        rawConfiguration.set("fs.azure.ingress.service.type", String.valueOf(AbfsServiceType.DFS));
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) FileSystem.newInstance(rawConfiguration);
        try {
            Path path = path(TEST_FILE_PATH);
            AzureBlobFileSystemStore.Permissions permissions = new AzureBlobFileSystemStore.Permissions(false, FsPermission.getDefault(), FsPermission.getUMask(azureBlobFileSystem.getConf()));
            azureBlobFileSystem.getAbfsStore().getAbfsConfiguration().setBoolean("fs.azure.enable.dfstoblob.fallback", true);
            azureBlobFileSystem.getAbfsStore().getAbfsConfiguration().set("fs.azure.ingress.service.type", String.valueOf(AbfsServiceType.DFS));
            azureBlobFileSystem.getAbfsStore().getClientHandler().getBlobClient().createPath(makeQualified(path).toUri().getPath(), true, false, permissions, false, (String) null, (ContextEncryptionAdapter) null, getTestTracingContext(azureBlobFileSystem, true));
            FSDataOutputStream append = azureBlobFileSystem.append(path);
            append.write(10);
            append.hsync();
            append.write(TWENTY);
            append.hsync();
            append.write(THIRTY);
            append.hsync();
            if (azureBlobFileSystem != null) {
                azureBlobFileSystem.close();
            }
        } catch (Throwable th) {
            if (azureBlobFileSystem != null) {
                try {
                    azureBlobFileSystem.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() throws IOException, NoSuchFieldException, IllegalAccessException {
        Configuration rawConfiguration = getRawConfiguration();
        rawConfiguration.setBoolean("fs.azure.enable.dfstoblob.fallback", true);
        rawConfiguration.set("fs.azure.ingress.service.type", String.valueOf(AbfsServiceType.DFS));
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(FileSystem.newInstance(rawConfiguration));
        try {
            AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
            ((AzureBlobFileSystemStore) Mockito.doReturn(true).when(azureBlobFileSystemStore)).isAppendBlobKey(ArgumentMatchers.anyString());
            Field declaredField = AzureBlobFileSystem.class.getDeclaredField("abfsStore");
            declaredField.setAccessible(true);
            declaredField.set(azureBlobFileSystem, azureBlobFileSystemStore);
            Path path = path(TEST_FILE_PATH);
            AzureBlobFileSystemStore.Permissions permissions = new AzureBlobFileSystemStore.Permissions(false, FsPermission.getDefault(), FsPermission.getUMask(azureBlobFileSystem.getConf()));
            azureBlobFileSystem.getAbfsStore().getAbfsConfiguration().setBoolean("fs.azure.enable.dfstoblob.fallback", true);
            azureBlobFileSystem.getAbfsStore().getAbfsConfiguration().set("fs.azure.ingress.service.type", String.valueOf(AbfsServiceType.DFS));
            azureBlobFileSystem.getAbfsStore().getClientHandler().getBlobClient().createPath(makeQualified(path).toUri().getPath(), true, false, permissions, true, (String) null, (ContextEncryptionAdapter) null, getTestTracingContext(azureBlobFileSystem, true));
            FSDataOutputStream append = azureBlobFileSystem.append(path);
            append.write(10);
            append.hsync();
            append.write(TWENTY);
            append.hsync();
            append.write(THIRTY);
            append.hsync();
            if (azureBlobFileSystem != null) {
                azureBlobFileSystem.close();
            }
        } catch (Throwable th) {
            if (azureBlobFileSystem != null) {
                try {
                    azureBlobFileSystem.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCreateAppendBlobOverDfsEndpointAppendOverBlob() throws IOException, NoSuchFieldException, IllegalAccessException {
        assumeHnsEnabled("FNS does not support append blob creation for DFS endpoint");
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(getFileSystem());
        AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
        ((AzureBlobFileSystemStore) Mockito.doReturn(true).when(azureBlobFileSystemStore)).isAppendBlobKey(ArgumentMatchers.anyString());
        Field declaredField = AzureBlobFileSystem.class.getDeclaredField("abfsStore");
        declaredField.setAccessible(true);
        declaredField.set(azureBlobFileSystem, azureBlobFileSystemStore);
        Path path = path(TEST_FILE_PATH);
        azureBlobFileSystem.getAbfsStore().getClientHandler().getDfsClient().createPath(makeQualified(path).toUri().getPath(), true, false, new AzureBlobFileSystemStore.Permissions(false, FsPermission.getDefault(), FsPermission.getUMask(azureBlobFileSystem.getConf())), true, (String) null, (ContextEncryptionAdapter) null, getTestTracingContext(azureBlobFileSystem, true));
        azureBlobFileSystem.getAbfsStore().getAbfsConfiguration().set("fs.azure.ingress.service.type", AbfsServiceType.BLOB.name());
        FSDataOutputStream append = azureBlobFileSystem.append(path);
        Assertions.assertThat(append.getWrappedStream().getIngressHandler().getClient()).as("Blob client was not used before fallback", new Object[0]).isInstanceOf(AbfsBlobClient.class);
        append.write(10);
        append.hsync();
        append.write(TWENTY);
        append.hsync();
        append.write(THIRTY);
        append.flush();
        Assertions.assertThat(append.getWrappedStream().getIngressHandler().getClient()).as("DFS client was not used after fallback", new Object[0]).isInstanceOf(AbfsDfsClient.class);
    }

    @Test
    public void testValidateIngressHandler() throws IOException {
        Configuration rawConfiguration = getRawConfiguration();
        rawConfiguration.set("fs.azure.ingress.service.type", AbfsServiceType.BLOB.name());
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) FileSystem.newInstance(rawConfiguration);
        try {
            Path path = path(TEST_FILE_PATH);
            AzureBlobFileSystemStore.Permissions permissions = new AzureBlobFileSystemStore.Permissions(false, FsPermission.getDefault(), FsPermission.getUMask(azureBlobFileSystem.getConf()));
            azureBlobFileSystem.getAbfsStore().getClientHandler().getBlobClient().createPath(makeQualified(path).toUri().getPath(), true, false, permissions, false, (String) null, (ContextEncryptionAdapter) null, getTestTracingContext(azureBlobFileSystem, true));
            AzureIngressHandler ingressHandler = azureBlobFileSystem.append(path).getWrappedStream().getIngressHandler();
            Assertions.assertThat(ingressHandler).as("Blob Ingress handler instance is not correct", new Object[0]).isInstanceOf(AzureBlobIngressHandler.class);
            Assertions.assertThat(ingressHandler.getClient()).as("Blob client was not used correctly", new Object[0]).isInstanceOf(AbfsBlobClient.class);
            Path path2 = new Path("testFile1");
            azureBlobFileSystem.getAbfsStore().getClientHandler().getBlobClient().createPath(makeQualified(path2).toUri().getPath(), true, false, permissions, false, (String) null, (ContextEncryptionAdapter) null, getTestTracingContext(azureBlobFileSystem, true));
            azureBlobFileSystem.getAbfsStore().getAbfsConfiguration().set("fs.azure.ingress.service.type", AbfsServiceType.DFS.name());
            AzureIngressHandler ingressHandler2 = azureBlobFileSystem.append(path2).getWrappedStream().getIngressHandler();
            Assertions.assertThat(ingressHandler2).as("DFS Ingress handler instance is not correct", new Object[0]).isInstanceOf(AzureDFSIngressHandler.class);
            Assertions.assertThat(ingressHandler2.getClient()).as("Dfs client was not used correctly", new Object[0]).isInstanceOf(AbfsDfsClient.class);
            if (azureBlobFileSystem != null) {
                azureBlobFileSystem.close();
            }
        } catch (Throwable th) {
            if (azureBlobFileSystem != null) {
                try {
                    azureBlobFileSystem.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(expected = FileNotFoundException.class)
    public void testAppendImplicitDirectory() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = new Path(TEST_FOLDER_PATH);
        fileSystem.mkdirs(path);
        fileSystem.append(path.getParent());
    }

    @Test(expected = FileNotFoundException.class)
    public void testAppendFileNotExists() throws Exception {
        getFileSystem().append(new Path(TEST_FOLDER_PATH));
    }

    @Test(expected = IOException.class)
    public void testCreateExplicitDirectoryOverDfsAppendOverBlob() throws IOException {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FOLDER_PATH);
        fileSystem.getAbfsStore().getClientHandler().getDfsClient().createPath(makeQualified(path).toUri().getPath(), false, false, new AzureBlobFileSystemStore.Permissions(false, FsPermission.getDefault(), FsPermission.getUMask(fileSystem.getConf())), false, (String) null, (ContextEncryptionAdapter) null, getTestTracingContext(fileSystem, true));
        FSDataOutputStream append = fileSystem.append(path);
        append.write(10);
        append.hsync();
    }

    @Test(expected = IOException.class)
    public void testRecreateAppendAndFlush() throws IOException {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FILE_PATH);
        fileSystem.create(path);
        Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
        FSDataOutputStream append = fileSystem.append(path);
        append.write(10);
        AzureBlobFileSystem newInstance = FileSystem.newInstance(getRawConfiguration());
        try {
            FSDataOutputStream create = newInstance.create(path);
            try {
                append.hsync();
                if (create != null) {
                    create.close();
                }
                if (newInstance != null) {
                    newInstance.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(expected = IOException.class)
    public void testRecreateDirectoryAppendAndFlush() throws IOException {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FILE_PATH);
        fileSystem.create(path);
        FSDataOutputStream append = fileSystem.append(path);
        append.write(10);
        AzureBlobFileSystem newInstance = FileSystem.newInstance(getRawConfiguration());
        try {
            newInstance.mkdirs(path);
            append.hsync();
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testParallelWriteSameOffsetDifferentOutputStreams() throws Exception {
        Configuration rawConfiguration = getRawConfiguration();
        rawConfiguration.set("fs.azure.enable.conditional.create.overwrite", "false");
        AzureBlobFileSystem newInstance = FileSystem.newInstance(rawConfiguration);
        try {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            ArrayList arrayList = new ArrayList();
            byte[] bArr = new byte[8388608];
            new Random().nextBytes(bArr);
            Path path = path(TEST_FILE_PATH);
            FSDataOutputStream create = newInstance.create(path);
            FSDataOutputStream append = newInstance.append(path);
            FSDataOutputStream append2 = newInstance.append(path);
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    create.write(bArr, 10, 200);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    append.write(bArr, 10, 200);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    append2.write(bArr, 10, 200);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
            checkFuturesForExceptions(arrayList, 0);
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testParallelWriteDifferentContentLength() throws Exception {
        Configuration rawConfiguration = getRawConfiguration();
        rawConfiguration.set("fs.azure.enable.conditional.create.overwrite", "false");
        FileSystem newInstance = FileSystem.newInstance(rawConfiguration);
        try {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            ArrayList arrayList = new ArrayList();
            Path path = path(TEST_FILE_PATH);
            FSDataOutputStream create = newInstance.create(path);
            byte[] bArr = new byte[8388608];
            new Random().nextBytes(bArr);
            FSDataOutputStream append = newInstance.append(path);
            FSDataOutputStream append2 = newInstance.append(path);
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    create.write(bArr, 10, 200);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    append.write(bArr, TWENTY, 300);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    append2.write(bArr, THIRTY, 400);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
            checkFuturesForExceptions(arrayList, 0);
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testParallelWriteOutputStreamClose() throws Exception {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = new Path("secondarytestfile");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        ArrayList arrayList = new ArrayList();
        FSDataOutputStream create = fileSystem.create(path);
        Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
        String eTag = create.getWrappedStream().getIngressHandler().getETag();
        byte[] bArr = new byte[8388608];
        new Random().nextBytes(bArr);
        byte[] bArr2 = new byte[8388608];
        new Random().nextBytes(bArr2);
        FSDataOutputStream append = fileSystem.append(path);
        arrayList.add(newFixedThreadPool.submit(() -> {
            try {
                create.write(bArr, 0, 200);
                create.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
        arrayList.add(newFixedThreadPool.submit(() -> {
            try {
                append.write(bArr2, 0, 400);
                append.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
        checkFuturesForExceptions(arrayList, 1);
        byte[] bArr3 = new byte[8388608];
        FSDataInputStream open = fileSystem.open(path);
        open.seek(0L);
        String eTag2 = create.getWrappedStream().getIngressHandler().getETag();
        String eTag3 = append.getWrappedStream().getIngressHandler().getETag();
        if (!eTag.equals(eTag2)) {
            int read = open.read(bArr3, 0, 4194304);
            assertEquals(read, 200L);
            assertArrayEquals(Arrays.copyOfRange(bArr3, 0, read), Arrays.copyOfRange(bArr, 0, read));
        } else {
            if (eTag.equals(eTag3)) {
                fail("Neither out1 nor out2 was flushed successfully.");
                return;
            }
            int read2 = open.read(bArr3, 0, 4194304);
            assertEquals(read2, 400L);
            assertArrayEquals(Arrays.copyOfRange(bArr3, 0, read2), Arrays.copyOfRange(bArr2, 0, read2));
        }
    }

    @Test
    public void testEtagMismatch() throws Exception {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(TEST_FILE_PATH);
        FSDataOutputStream create = fileSystem.create(path);
        FSDataOutputStream create2 = fileSystem.create(path);
        Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
        create2.write(10);
        create2.hsync();
        create.write(10);
        LambdaTestUtils.intercept(IOException.class, () -> {
            create.hsync();
        });
    }

    @Test
    public void testAppendWithLease() throws Exception {
        Path path = new Path(path(this.methodName.getMethodName()), TEST_FILE_PATH);
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(getCustomFileSystem(path.getParent(), 1));
        AbfsOutputStream createFile = azureBlobFileSystem.getAbfsStore().createFile(path, (FileSystem.Statistics) null, true, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE), getTestTracingContext(azureBlobFileSystem, true));
        createFile.write(10);
        createFile.close();
        assertNotNull(createFile.getLeaseId());
    }

    private AzureBlobFileSystem getCustomFileSystem(Path path, int i) throws Exception {
        Configuration rawConfiguration = getRawConfiguration();
        rawConfiguration.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()), true);
        rawConfiguration.set("fs.azure.infinite-lease.directories", path.toUri().getPath());
        rawConfiguration.setInt("fs.azure.lease.threads", i);
        return FileSystem.newInstance(rawConfiguration);
    }

    @Test
    public void testAppendImplicitDirectoryAzcopy() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        createAzCopyFolder(new Path("/src"));
        createAzCopyFile(new Path("/src/file"));
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
            return fileSystem.append(new Path("/src"));
        });
    }

    @Test
    public void testIntermittentAppendFailureToBeReported() throws Exception {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(FileSystem.newInstance(getRawConfiguration()));
        try {
            assumeHnsDisabled();
            AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
            assumeBlobServiceType();
            AbfsClientHandler abfsClientHandler = (AbfsClientHandler) Mockito.spy(azureBlobFileSystemStore.getClientHandler());
            AbfsBlobClient abfsBlobClient = (AbfsBlobClient) Mockito.spy(abfsClientHandler.getBlobClient());
            ((AzureBlobFileSystemStore) Mockito.doReturn(abfsClientHandler).when(azureBlobFileSystemStore)).getClientHandler();
            ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
            ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getIngressClient();
            ((AbfsBlobClient) Mockito.doThrow(new Throwable[]{new AbfsRestOperationException(503, ITestWasbRemoteCallHelper.EMPTY_STRING, ITestWasbRemoteCallHelper.EMPTY_STRING, new Exception())}).when(abfsBlobClient)).append(Mockito.anyString(), (byte[]) Mockito.any(byte[].class), (AppendRequestParameters) Mockito.any(AppendRequestParameters.class), (String) Mockito.any(), (ContextEncryptionAdapter) Mockito.any(), (TracingContext) Mockito.any(TracingContext.class));
            byte[] bArr = new byte[8388608];
            new Random().nextBytes(bArr);
            LambdaTestUtils.intercept(IOException.class, () -> {
                FSDataOutputStream createMockedOutputStream = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file"), abfsBlobClient);
                try {
                    createMockedOutputStream.write(bArr);
                    if (createMockedOutputStream != null) {
                        createMockedOutputStream.close();
                    }
                } catch (Throwable th) {
                    if (createMockedOutputStream != null) {
                        try {
                            createMockedOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            LambdaTestUtils.intercept(IOException.class, () -> {
                FSDataOutputStream createMockedOutputStream = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file/file1"), abfsBlobClient);
                createMockedOutputStream.write(bArr);
                createMockedOutputStream.close();
            });
            LambdaTestUtils.intercept(IOException.class, () -> {
                FSDataOutputStream createMockedOutputStream = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file/file2"), abfsBlobClient);
                createMockedOutputStream.write(bArr);
                createMockedOutputStream.hsync();
            });
            LambdaTestUtils.intercept(IOException.class, () -> {
                FSDataOutputStream createMockedOutputStream = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file/file3"), abfsBlobClient);
                createMockedOutputStream.write(bArr);
                createMockedOutputStream.hflush();
            });
            LambdaTestUtils.intercept(IOException.class, () -> {
                AbfsOutputStream wrappedStream = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file/file4"), abfsBlobClient).getWrappedStream();
                wrappedStream.write(bArr);
                do {
                } while (!wrappedStream.areWriteOperationsTasksDone().booleanValue());
                wrappedStream.write(bArr);
            });
            if (azureBlobFileSystem != null) {
                azureBlobFileSystem.close();
            }
        } catch (Throwable th) {
            if (azureBlobFileSystem != null) {
                try {
                    azureBlobFileSystem.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem azureBlobFileSystem, Path path, AbfsClient abfsClient) throws IOException {
        AbfsOutputStream abfsOutputStream = (AbfsOutputStream) Mockito.spy(azureBlobFileSystem.create(path).getWrappedStream());
        AzureIngressHandler azureIngressHandler = (AzureIngressHandler) Mockito.spy(abfsOutputStream.getIngressHandler());
        ((AbfsOutputStream) Mockito.doReturn(azureIngressHandler).when(abfsOutputStream)).getIngressHandler();
        ((AzureIngressHandler) Mockito.doReturn(abfsClient).when(azureIngressHandler)).getClient();
        return (FSDataOutputStream) Mockito.spy(new FSDataOutputStream(abfsOutputStream, (FileSystem.Statistics) null));
    }

    @Test
    public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(FileSystem.newInstance(getRawConfiguration()));
        try {
            AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
            AbfsClientHandler abfsClientHandler = (AbfsClientHandler) Mockito.spy(azureBlobFileSystemStore.getClientHandler());
            AbfsClient abfsClient = (AbfsBlobClient) Mockito.spy(abfsClientHandler.getBlobClient());
            AbfsClient abfsClient2 = (AbfsDfsClient) Mockito.spy(abfsClientHandler.getDfsClient());
            AbfsClient ingressClient = abfsClientHandler.getIngressClient();
            if (abfsClientHandler.getIngressClient() instanceof AbfsBlobClient) {
                ((AbfsClientHandler) Mockito.doReturn(abfsClient).when(abfsClientHandler)).getBlobClient();
                ((AbfsClientHandler) Mockito.doReturn(abfsClient).when(abfsClientHandler)).getIngressClient();
            } else {
                ((AbfsClientHandler) Mockito.doReturn(abfsClient2).when(abfsClientHandler)).getDfsClient();
                ((AbfsClientHandler) Mockito.doReturn(abfsClient2).when(abfsClientHandler)).getIngressClient();
            }
            ((AzureBlobFileSystemStore) Mockito.doReturn(abfsClientHandler).when(azureBlobFileSystemStore)).getClientHandler();
            byte[] bArr = new byte[8388608];
            new Random().nextBytes(bArr);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            ((AbfsClient) Mockito.doAnswer(invocationOnMock -> {
                atomicInteger.incrementAndGet();
                do {
                } while (atomicInteger.get() < 2);
                Thread.sleep(1000L);
                throw new AbfsRestOperationException(503, ITestWasbRemoteCallHelper.EMPTY_STRING, ITestWasbRemoteCallHelper.EMPTY_STRING, new Exception());
            }).when(ingressClient instanceof AbfsBlobClient ? abfsClient : abfsClient2)).append(Mockito.anyString(), (byte[]) Mockito.any(byte[].class), (AppendRequestParameters) Mockito.any(AppendRequestParameters.class), (String) Mockito.any(), (ContextEncryptionAdapter) Mockito.any(), (TracingContext) Mockito.any(TracingContext.class));
            ((AbfsClient) Mockito.doAnswer(invocationOnMock2 -> {
                atomicInteger.incrementAndGet();
                do {
                } while (atomicInteger.get() < 2);
                Thread.sleep(1000L);
                throw new AbfsRestOperationException(503, ITestWasbRemoteCallHelper.EMPTY_STRING, ITestWasbRemoteCallHelper.EMPTY_STRING, new Exception());
            }).when(ingressClient instanceof AbfsBlobClient ? abfsClient : abfsClient2)).append(Mockito.anyString(), (byte[]) Mockito.any(byte[].class), (AppendRequestParameters) Mockito.any(AppendRequestParameters.class), (String) Mockito.any(), (ContextEncryptionAdapter) Mockito.any(), (TracingContext) Mockito.any(TracingContext.class));
            FSDataOutputStream createMockedOutputStream = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file"), ingressClient instanceof AbfsBlobClient ? abfsClient : abfsClient2);
            createMockedOutputStream.write(bArr);
            createMockedOutputStream.write(bArr);
            Objects.requireNonNull(createMockedOutputStream);
            LambdaTestUtils.intercept(IOException.class, createMockedOutputStream::close);
            atomicInteger.set(0);
            FSDataOutputStream createMockedOutputStream2 = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file1"), ingressClient instanceof AbfsBlobClient ? abfsClient : abfsClient2);
            createMockedOutputStream2.write(bArr);
            createMockedOutputStream2.write(bArr);
            Objects.requireNonNull(createMockedOutputStream2);
            LambdaTestUtils.intercept(IOException.class, createMockedOutputStream2::hsync);
            atomicInteger.set(0);
            FSDataOutputStream createMockedOutputStream3 = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file2"), ingressClient instanceof AbfsBlobClient ? abfsClient : abfsClient2);
            createMockedOutputStream3.write(bArr);
            createMockedOutputStream3.write(bArr);
            Objects.requireNonNull(createMockedOutputStream3);
            LambdaTestUtils.intercept(IOException.class, createMockedOutputStream3::hflush);
            if (azureBlobFileSystem != null) {
                azureBlobFileSystem.close();
            }
        } catch (Throwable th) {
            if (azureBlobFileSystem != null) {
                try {
                    azureBlobFileSystem.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private String generateBlockId(AbfsOutputStream abfsOutputStream, long j) {
        String format = String.format("%d_%s", Long.valueOf(j), Integer.toString(abfsOutputStream.getStreamID().hashCode()));
        byte[] bArr = new byte[60];
        System.arraycopy(format.getBytes(), 0, bArr, 0, Math.min(60, format.length()));
        return new String(Base64.encodeBase64(bArr), StandardCharsets.UTF_8);
    }

    @Test
    public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Exception {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(FileSystem.newInstance(getRawConfiguration()));
        try {
            assumeHnsDisabled();
            AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
            assumeBlobServiceType();
            AbfsClientHandler abfsClientHandler = (AbfsClientHandler) Mockito.spy(azureBlobFileSystemStore.getClientHandler());
            AbfsBlobClient abfsBlobClient = (AbfsBlobClient) Mockito.spy(abfsClientHandler.getBlobClient());
            ((AzureBlobFileSystemStore) Mockito.doReturn(abfsClientHandler).when(azureBlobFileSystemStore)).getClientHandler();
            ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
            ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getIngressClient();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AbfsOutputStream abfsOutputStream = (AbfsOutputStream) createMockedOutputStream(azureBlobFileSystem, new Path("/test/file"), abfsBlobClient).getWrappedStream();
            String eTag = abfsOutputStream.getIngressHandler().getETag();
            byte[] bArr = new byte[8388608];
            new Random().nextBytes(bArr);
            abfsOutputStream.write(bArr);
            ArrayList arrayList = new ArrayList();
            arrayList.add(generateBlockId(abfsOutputStream, 0L));
            String generateBlockListXml = AbfsBlobClient.generateBlockListXml(arrayList);
            ((AbfsBlobClient) Mockito.doAnswer(invocationOnMock -> {
                AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(abfsBlobClient, eTag, generateBlockListXml, abfsHttpOperation -> {
                    ((AbfsHttpOperation) Mockito.doAnswer(invocationOnMock -> {
                        invocationOnMock.callRealMethod();
                        if (atomicInteger.incrementAndGet() != 1) {
                            return null;
                        }
                        Mockito.when(Integer.valueOf(abfsHttpOperation.getStatusCode())).thenReturn(500);
                        Mockito.when(abfsHttpOperation.getStorageErrorMessage()).thenReturn("CONNECTION_RESET");
                        throw new IOException("Connection Reset");
                    }).when(abfsHttpOperation)).processResponse((byte[]) Mockito.nullable(byte[].class), Mockito.anyInt(), Mockito.anyInt());
                    return abfsHttpOperation;
                });
                return invocationOnMock.callRealMethod();
            }).when(abfsBlobClient)).flush((byte[]) Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), (String) Mockito.nullable(String.class), (String) Mockito.nullable(String.class), Mockito.anyString(), (ContextEncryptionAdapter) Mockito.nullable(ContextEncryptionAdapter.class), (TracingContext) Mockito.any(TracingContext.class));
            abfsOutputStream.hsync();
            abfsOutputStream.close();
            ((AbfsBlobClient) Mockito.verify(abfsBlobClient, Mockito.times(1))).flush((byte[]) Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), (String) Mockito.nullable(String.class), (String) Mockito.nullable(String.class), Mockito.anyString(), (ContextEncryptionAdapter) Mockito.nullable(ContextEncryptionAdapter.class), (TracingContext) Mockito.any(TracingContext.class));
            if (azureBlobFileSystem != null) {
                azureBlobFileSystem.close();
            }
        } catch (Throwable th) {
            if (azureBlobFileSystem != null) {
                try {
                    azureBlobFileSystem.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exception {
        Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
        AzureBlobFileSystem azureBlobFileSystem = (AzureBlobFileSystem) Mockito.spy(FileSystem.newInstance(getRawConfiguration()));
        try {
            assumeHnsDisabled();
            AzureBlobFileSystemStore azureBlobFileSystemStore = (AzureBlobFileSystemStore) Mockito.spy(azureBlobFileSystem.getAbfsStore());
            assumeBlobServiceType();
            AbfsClientHandler abfsClientHandler = (AbfsClientHandler) Mockito.spy(azureBlobFileSystemStore.getClientHandler());
            AbfsBlobClient abfsBlobClient = (AbfsBlobClient) Mockito.spy(abfsClientHandler.getBlobClient());
            ((AzureBlobFileSystemStore) Mockito.doReturn(abfsClientHandler).when(azureBlobFileSystemStore)).getClientHandler();
            ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getBlobClient();
            ((AbfsClientHandler) Mockito.doReturn(abfsBlobClient).when(abfsClientHandler)).getIngressClient();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AbfsOutputStream abfsOutputStream = (AbfsOutputStream) createMockedOutputStream(azureBlobFileSystem, new Path("/test/file"), abfsBlobClient).getWrappedStream();
            String eTag = abfsOutputStream.getIngressHandler().getETag();
            byte[] bArr = new byte[8388608];
            new Random().nextBytes(bArr);
            abfsOutputStream.write(bArr);
            ArrayList arrayList = new ArrayList();
            arrayList.add(generateBlockId(abfsOutputStream, 0L));
            String generateBlockListXml = AbfsBlobClient.generateBlockListXml(arrayList);
            ((AbfsBlobClient) Mockito.doAnswer(invocationOnMock -> {
                AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(abfsBlobClient, eTag, generateBlockListXml, abfsHttpOperation -> {
                    ((AbfsHttpOperation) Mockito.doAnswer(invocationOnMock -> {
                        invocationOnMock.callRealMethod();
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (incrementAndGet == 1) {
                            Mockito.when(Integer.valueOf(abfsHttpOperation.getStatusCode())).thenReturn(500);
                            Mockito.when(abfsHttpOperation.getStorageErrorMessage()).thenReturn("CONNECTION_RESET");
                            throw new IOException("Connection Reset");
                        }
                        if (incrementAndGet != 2) {
                            return null;
                        }
                        Mockito.when(Integer.valueOf(abfsHttpOperation.getStatusCode())).thenReturn(200);
                        Mockito.when(abfsHttpOperation.getStorageErrorMessage()).thenReturn("HTTP_OK");
                        return null;
                    }).when(abfsHttpOperation)).processResponse((byte[]) Mockito.nullable(byte[].class), Mockito.anyInt(), Mockito.anyInt());
                    return abfsHttpOperation;
                });
                return invocationOnMock.callRealMethod();
            }).when(abfsBlobClient)).flush((byte[]) Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), (String) Mockito.nullable(String.class), (String) Mockito.nullable(String.class), Mockito.anyString(), (ContextEncryptionAdapter) Mockito.nullable(ContextEncryptionAdapter.class), (TracingContext) Mockito.any(TracingContext.class));
            AbfsOutputStream wrappedStream = createMockedOutputStream(azureBlobFileSystem, new Path("/test/file"), abfsBlobClient).getWrappedStream();
            byte[] bArr2 = new byte[8388608];
            new Random().nextBytes(bArr2);
            wrappedStream.write(bArr2);
            Thread thread = new Thread(() -> {
                try {
                    wrappedStream.hsync();
                } catch (IOException e) {
                }
            });
            thread.start();
            thread.join();
            Objects.requireNonNull(abfsOutputStream);
            LambdaTestUtils.intercept(IOException.class, "The condition specified using HTTP conditional header(s) is not met.", abfsOutputStream::hsync);
            if (azureBlobFileSystem != null) {
                azureBlobFileSystem.close();
            }
        } catch (Throwable th) {
            if (azureBlobFileSystem != null) {
                try {
                    azureBlobFileSystem.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
