package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileRecoverableMigrationTest.class */
public class OutputStreamBasedPartFileRecoverableMigrationTest {
    private static final int CURRENT_VERSION = 1;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final Path BASE_PATH = Paths.get("src/test/resources/", new String[0]).resolve("recoverable-serializer-migration");

    static Stream<Integer> previousVersions() {
        return Stream.of(Integer.valueOf(CURRENT_VERSION));
    }

    @Disabled
    @Test
    void prepareDeserializationInProgress() throws IOException {
        Path resolveVersionPath = resolveVersionPath(1L, "in-progress");
        RecoverableWriter createRecoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
        OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer outputStreamBasedInProgressFileRecoverableSerializer = new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer(createRecoverableWriter.getResumeRecoverableSerializer());
        RecoverableFsDataOutputStream open = createRecoverableWriter.open(new org.apache.flink.core.fs.Path(resolveVersionPath.resolve("content").toString()));
        open.write(IN_PROGRESS_CONTENT.getBytes(StandardCharsets.UTF_8));
        Files.write(resolveVersionPath.resolve("recoverable"), outputStreamBasedInProgressFileRecoverableSerializer.serialize(new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(open.persist())), new OpenOption[0]);
    }

    @MethodSource({"previousVersions"})
    @ParameterizedTest(name = "Previous Version = {0}")
    void testSerializationInProgress(int i) throws IOException {
        Path resolveVersionPath = resolveVersionPath(i, "in-progress");
        RecoverableWriter createRecoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
        OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable deserialize = new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer(createRecoverableWriter.getResumeRecoverableSerializer()).deserialize(i, Files.readAllBytes(resolveVersionPath.resolve("recoverable")));
        Assertions.assertThat(deserialize).isInstanceOf(OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable.class);
        createRecoverableWriter.recover(deserialize.getResumeRecoverable());
    }

    @Disabled
    @Test
    void prepareDeserializationPending() throws IOException {
        Path resolveVersionPath = resolveVersionPath(1L, "pending");
        RecoverableWriter createRecoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
        OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer outputStreamBasedPendingFileRecoverableSerializer = new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer(createRecoverableWriter.getCommitRecoverableSerializer());
        RecoverableFsDataOutputStream open = createRecoverableWriter.open(new org.apache.flink.core.fs.Path(resolveVersionPath.resolve("content").toString()));
        open.write(PENDING_CONTENT.getBytes(StandardCharsets.UTF_8));
        Files.write(resolveVersionPath.resolve("recoverable"), outputStreamBasedPendingFileRecoverableSerializer.serialize(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(open.closeForCommit().getRecoverable())), new OpenOption[0]);
    }

    @MethodSource({"previousVersions"})
    @ParameterizedTest(name = "Previous Version = {0}")
    void testSerializationPending(int i) throws IOException {
        Path resolveVersionPath = resolveVersionPath(i, "pending");
        RecoverableWriter createRecoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
        OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable deserialize = new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer(createRecoverableWriter.getCommitRecoverableSerializer()).deserialize(i, Files.readAllBytes(resolveVersionPath.resolve("recoverable")));
        Assertions.assertThat(deserialize).isInstanceOf(OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable.class);
        createRecoverableWriter.recoverForCommit(deserialize.getCommitRecoverable());
    }

    private Path resolveVersionPath(long j, String str) {
        return BASE_PATH.resolve(str + "-v" + j);
    }
}
