package org.apache.flink.fs.s3.common.writer;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SplittableRandom;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.RefCountedBufferingFileStream;
import org.apache.flink.core.fs.RefCountedFSOutputStream;
import org.apache.flink.core.fs.RefCountedFileWithStream;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Description;
import org.hamcrest.MatcherAssert;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.class */
public class S3RecoverableFsDataOutputStreamTest {
    private static final long USER_DEFINED_MIN_PART_SIZE = 10;
    private TestMultipartUpload multipartUploadUnderTest;
    private TestFileProvider fileProvider;
    private S3RecoverableFsDataOutputStream streamUnderTest;

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest$TestFileProvider.class */
    public static class TestFileProvider implements FunctionWithException<File, RefCountedFileWithStream, IOException> {
        private final TemporaryFolder folder;

        TestFileProvider(TemporaryFolder temporaryFolder) {
            this.folder = (TemporaryFolder) Preconditions.checkNotNull(temporaryFolder);
        }

        public RefCountedFileWithStream apply(@Nullable File file) throws IOException {
            while (true) {
                try {
                    if (file != null) {
                        return RefCountedFileWithStream.restoredFile(file, Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND), file.length());
                    }
                    File file2 = new File(this.folder.getRoot(), ".tmp_" + UUID.randomUUID());
                    return RefCountedFileWithStream.newFile(file2, Files.newOutputStream(file2.toPath(), StandardOpenOption.CREATE_NEW));
                } catch (FileAlreadyExistsException e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest$TestMultipartUpload.class */
    public static class TestMultipartUpload implements RecoverableMultiPartUpload {
        private final TestFileProvider fileProvider;
        private int numParts;
        private long numBytes;
        private List<byte[]> uploadedContent = new ArrayList();
        private final ByteArrayOutputStream publishedContents = new ByteArrayOutputStream();
        private Optional<byte[]> uncompleted = Optional.empty();
        private byte[] published = new byte[0];
        private int lastPersistedIndex = 0;

        TestMultipartUpload(TestFileProvider testFileProvider) {
            this.fileProvider = testFileProvider;
        }

        public void discardUnpersistedData() {
            this.uploadedContent = this.uploadedContent.subList(0, this.lastPersistedIndex);
        }

        public Optional<File> getIncompletePart() {
            if (!this.uncompleted.isPresent()) {
                return Optional.empty();
            }
            byte[] bArr = this.uncompleted.get();
            try {
                File file = this.fileProvider.apply((File) null).getFile();
                Files.write(file.toPath(), bArr, new OpenOption[0]);
                return Optional.of(file);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public RecoverableFsDataOutputStream.Committer snapshotAndGetCommitter() throws IOException {
            this.lastPersistedIndex = this.uploadedContent.size();
            return new RecoverableFsDataOutputStream.Committer() { // from class: org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStreamTest.TestMultipartUpload.1
                public void commit() throws IOException {
                    TestMultipartUpload.this.published = TestMultipartUpload.this.getPublishedContents();
                    TestMultipartUpload.this.uploadedContent.clear();
                    TestMultipartUpload.this.lastPersistedIndex = 0;
                }

                public void commitAfterRecovery() throws IOException {
                    if (TestMultipartUpload.this.published.length == 0) {
                        commit();
                    }
                }

                public RecoverableWriter.CommitRecoverable getRecoverable() {
                    return null;
                }
            };
        }

        public RecoverableWriter.ResumeRecoverable snapshotAndGetRecoverable(RefCountedFSOutputStream refCountedFSOutputStream) throws IOException {
            this.lastPersistedIndex = this.uploadedContent.size();
            if (refCountedFSOutputStream.getPos() < 0) {
                return null;
            }
            this.uncompleted = Optional.of(S3RecoverableFsDataOutputStreamTest.readFileContents(refCountedFSOutputStream));
            return null;
        }

        public void uploadPart(RefCountedFSOutputStream refCountedFSOutputStream) throws IOException {
            this.numParts++;
            this.numBytes += refCountedFSOutputStream.getPos();
            this.uploadedContent.add(S3RecoverableFsDataOutputStreamTest.readFileContents(refCountedFSOutputStream));
        }

        public byte[] getPublishedContents() {
            for (int i = 0; i < this.lastPersistedIndex; i++) {
                try {
                    this.publishedContents.write(this.uploadedContent.get(i));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return this.publishedContents.toByteArray();
        }

        public String toString() {
            return "TestMultipartUpload{contents=" + Arrays.toString(this.published) + '}';
        }
    }

    @Before
    public void beforeTest() throws IOException {
        this.fileProvider = new TestFileProvider(TEMP_FOLDER);
        this.multipartUploadUnderTest = new TestMultipartUpload(this.fileProvider);
        this.streamUnderTest = new S3RecoverableFsDataOutputStream(this.multipartUploadUnderTest, this.fileProvider, RefCountedBufferingFileStream.openNew(this.fileProvider), USER_DEFINED_MIN_PART_SIZE, 0L);
    }

    @Test
    public void simpleUsage() throws IOException {
        this.streamUnderTest.write(bytesOf("hello world"));
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("hello world")));
    }

    @Test
    public void noWritesShouldResolveInAnEmptyFile() throws IOException {
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(new byte[0]));
    }

    @Test
    public void closingWithoutCommittingDiscardsTheData() throws IOException {
        this.streamUnderTest.write(bytesOf("hello world"));
        this.streamUnderTest.close();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("")));
    }

    @Test
    public void twoWritesAreConcatenated() throws IOException {
        this.streamUnderTest.write(bytesOf("hello"));
        this.streamUnderTest.write(bytesOf(" "));
        this.streamUnderTest.write(bytesOf("world"));
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("hello world")));
    }

    @Test
    public void writeLargeFile() throws IOException {
        List<byte[]> createRandomLargeTestDataBuffers = createRandomLargeTestDataBuffers();
        Iterator<byte[]> it = createRandomLargeTestDataBuffers.iterator();
        while (it.hasNext()) {
            this.streamUnderTest.write(it.next());
        }
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(createRandomLargeTestDataBuffers));
    }

    @Test
    public void simpleRecovery() throws IOException {
        this.streamUnderTest.write(bytesOf("hello"));
        this.streamUnderTest.persist();
        this.streamUnderTest = reopenStreamUnderTestAfterRecovery();
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("hello")));
    }

    @Test
    public void multiplePersistsDoesNotIntroduceJunk() throws IOException {
        this.streamUnderTest.write(bytesOf("hello"));
        this.streamUnderTest.persist();
        this.streamUnderTest.persist();
        this.streamUnderTest.persist();
        this.streamUnderTest.persist();
        this.streamUnderTest.write(bytesOf(" "));
        this.streamUnderTest.write(bytesOf("world"));
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("hello world")));
    }

    @Test
    public void multipleWritesAndPersists() throws IOException {
        this.streamUnderTest.write(bytesOf("a"));
        this.streamUnderTest.persist();
        this.streamUnderTest.write(bytesOf("b"));
        this.streamUnderTest.persist();
        this.streamUnderTest.write(bytesOf("c"));
        this.streamUnderTest.persist();
        this.streamUnderTest.write(bytesOf("d"));
        this.streamUnderTest.persist();
        this.streamUnderTest.write(bytesOf("e"));
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("abcde")));
    }

    @Test
    public void multipleWritesAndPersistsWithBigChunks() throws IOException {
        List<byte[]> createRandomLargeTestDataBuffers = createRandomLargeTestDataBuffers();
        Iterator<byte[]> it = createRandomLargeTestDataBuffers.iterator();
        while (it.hasNext()) {
            this.streamUnderTest.write(it.next());
            this.streamUnderTest.persist();
        }
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(createRandomLargeTestDataBuffers));
    }

    @Test
    public void addDataAfterRecovery() throws IOException {
        this.streamUnderTest.write(bytesOf("hello"));
        this.streamUnderTest.persist();
        this.streamUnderTest = reopenStreamUnderTestAfterRecovery();
        this.streamUnderTest.write(bytesOf(" "));
        this.streamUnderTest.write(bytesOf("world"));
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("hello world")));
    }

    @Test
    public void discardingUnpersistedNotYetUploadedData() throws IOException {
        this.streamUnderTest.write(bytesOf("hello"));
        this.streamUnderTest.persist();
        this.streamUnderTest.write(bytesOf("goodbye"));
        this.streamUnderTest = reopenStreamUnderTestAfterRecovery();
        this.streamUnderTest.write(bytesOf(" world"));
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("hello world")));
    }

    @Test
    public void discardingUnpersistedUploadedData() throws IOException {
        this.streamUnderTest.write(bytesOf("hello"));
        this.streamUnderTest.persist();
        this.streamUnderTest.write(randomBuffer(4097));
        this.streamUnderTest = reopenStreamUnderTestAfterRecovery();
        this.streamUnderTest.write(bytesOf(" world"));
        this.streamUnderTest.closeForCommit().commit();
        MatcherAssert.assertThat(this.multipartUploadUnderTest, hasContent(bytesOf("hello world")));
    }

    @Test
    public void commitEmptyStreamShouldBeSuccessful() throws IOException {
        this.streamUnderTest.closeForCommit().commit();
    }

    @Test(expected = IOException.class)
    public void closeForCommitOnClosedStreamShouldFail() throws IOException {
        this.streamUnderTest.closeForCommit().commit();
        this.streamUnderTest.closeForCommit().commit();
    }

    private S3RecoverableFsDataOutputStream reopenStreamUnderTestAfterRecovery() throws IOException {
        long j = this.multipartUploadUnderTest.numBytes;
        RefCountedBufferingFileStream restore = RefCountedBufferingFileStream.restore(this.fileProvider, this.multipartUploadUnderTest.getIncompletePart().get());
        this.multipartUploadUnderTest.discardUnpersistedData();
        return new S3RecoverableFsDataOutputStream(this.multipartUploadUnderTest, this.fileProvider, restore, USER_DEFINED_MIN_PART_SIZE, j);
    }

    private static List<byte[]> createRandomLargeTestDataBuffers() {
        ArrayList arrayList = new ArrayList();
        SplittableRandom splittableRandom = new SplittableRandom();
        long j = 0;
        int nextLong = (int) splittableRandom.nextLong(50L, 1000L);
        while (j < nextLong) {
            j += r0.length;
            arrayList.add(randomBuffer(splittableRandom, splittableRandom.nextInt(0, 20)));
        }
        return arrayList;
    }

    private static byte[] randomBuffer(int i) {
        return randomBuffer(new SplittableRandom(), i);
    }

    private static byte[] randomBuffer(SplittableRandom splittableRandom, int i) {
        byte[] bArr = new byte[i];
        for (int i2 = 0; i2 < bArr.length; i2++) {
            bArr[i2] = (byte) (splittableRandom.nextInt() & 255);
        }
        return bArr;
    }

    private static byte[] bytesOf(String str) {
        return str.getBytes(StandardCharsets.UTF_8);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static byte[] readFileContents(RefCountedFSOutputStream refCountedFSOutputStream) throws IOException {
        byte[] bArr = new byte[MathUtils.checkedDownCast(refCountedFSOutputStream.getPos())];
        File inputFile = refCountedFSOutputStream.getInputFile();
        Assert.assertEquals(refCountedFSOutputStream.getPos(), new FileInputStream(inputFile).read(bArr, 0, MathUtils.checkedDownCast(inputFile.length())));
        return bArr;
    }

    private static TypeSafeMatcher<TestMultipartUpload> hasContent(final byte[] bArr) {
        return new TypeSafeMatcher<TestMultipartUpload>() { // from class: org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStreamTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(TestMultipartUpload testMultipartUpload) {
                return Arrays.equals(testMultipartUpload.getPublishedContents(), bArr);
            }

            public void describeTo(Description description) {
                description.appendText("a TestMultipartUpload with contents='").appendValue(bArr).appendText("'");
            }
        };
    }

    private static TypeSafeMatcher<TestMultipartUpload> hasContent(Collection<byte[]> collection) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Iterator<byte[]> it = collection.iterator();
        while (it.hasNext()) {
            byteArrayOutputStream.write(it.next());
        }
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        return new TypeSafeMatcher<TestMultipartUpload>() { // from class: org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStreamTest.2
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(TestMultipartUpload testMultipartUpload) {
                return Arrays.equals(testMultipartUpload.getPublishedContents(), byteArray);
            }

            public void describeTo(Description description) {
                description.appendText("a TestMultipartUpload with contents='").appendValue(byteArray).appendText("'");
            }
        };
    }
}
