package org.apache.iceberg.hadoop;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iceberg/hadoop/TestHiveHadoopCommits.class */
class TestHiveHadoopCommits extends HiveHadoopTableTestBase {
    TestHiveHadoopCommits() {
    }

    @Test
    void testCommitFailedBeforeChangeVersionHint() {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        BaseTable baseTable = (BaseTable) this.table;
        HadoopTableOperations operations = baseTable.operations();
        HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(operations);
        ((HadoopTableOperations) Mockito.doReturn(10000).when(hadoopTableOperations)).findVersionWithOutVersionHint((FileSystem) Mockito.any());
        TableMetadata current = hadoopTableOperations.current();
        TableMetadata replaceSortOrder = current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build());
        Assertions.assertThatThrownBy(() -> {
            hadoopTableOperations.commit(current, replaceSortOrder);
        }).isInstanceOf(CommitFailedException.class).hasMessageContaining("Are there other clients running in parallel with the current task?");
        HadoopTableOperations hadoopTableOperations2 = (HadoopTableOperations) Mockito.spy(operations);
        ((HadoopTableOperations) Mockito.doReturn(false).when(hadoopTableOperations2)).nextVersionIsLatest(Mockito.anyInt(), Mockito.anyInt());
        assertCommitNotChangeVersion(baseTable, hadoopTableOperations2, CommitFailedException.class, "Are there other clients running in parallel with the current task?");
        HadoopTableOperations hadoopTableOperations3 = (HadoopTableOperations) Mockito.spy(operations);
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new RuntimeException("FileSystem crash!")}).when(hadoopTableOperations3)).renameMetaDataFileAndCheck((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), Mockito.anyBoolean());
        assertCommitNotChangeVersion(baseTable, hadoopTableOperations3, CommitFailedException.class, "FileSystem crash!");
    }

    @Test
    void testCommitFailedAndCheckFailed() throws IOException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        BaseTable baseTable = (BaseTable) this.table;
        HadoopTableOperations operations = baseTable.operations();
        HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(operations);
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new IOException("FileSystem crash!")}).when(hadoopTableOperations)).renameMetaDataFile((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any());
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new IOException("Can not check new Metadata!")}).when(hadoopTableOperations)).checkMetaDataFileRenameSuccess((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), Mockito.anyBoolean());
        assertCommitNotChangeVersion(baseTable, hadoopTableOperations, CommitStateUnknownException.class, "FileSystem crash!");
        HadoopTableOperations hadoopTableOperations2 = (HadoopTableOperations) Mockito.spy(operations);
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new OutOfMemoryError("Java heap space")}).when(hadoopTableOperations2)).renameMetaDataFile((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any());
        assertCommitFail(baseTable, hadoopTableOperations2, OutOfMemoryError.class, "Java heap space");
        HadoopTableOperations hadoopTableOperations3 = (HadoopTableOperations) Mockito.spy(operations);
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new RuntimeException("UNKNOWN ERROR")}).when(hadoopTableOperations3)).renameMetaDataFile((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any());
        assertCommitNotChangeVersion(baseTable, hadoopTableOperations3, CommitStateUnknownException.class, "UNKNOWN ERROR");
    }

    @Test
    void testCommitFailedAndRenameNotSuccess() throws IOException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        BaseTable baseTable = (BaseTable) this.table;
        HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new IOException("FileSystem crash!")}).when(hadoopTableOperations)).renameMetaDataFile((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any());
        assertCommitNotChangeVersion(baseTable, hadoopTableOperations, CommitFailedException.class, "Are there other clients running in parallel with the current task?");
    }

    @Test
    void testCommitFailedButActualSuccess() throws IOException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        BaseTable baseTable = this.table;
        HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new IOException("FileSystem crash!")}).when(hadoopTableOperations)).renameMetaDataFile((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any());
        ((HadoopTableOperations) Mockito.doReturn(true).when(hadoopTableOperations)).checkMetaDataFileRenameSuccess((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), Mockito.anyBoolean());
        int findVersion = hadoopTableOperations.findVersion();
        TableMetadata current = hadoopTableOperations.current();
        hadoopTableOperations.commit(current, current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build()));
        Assertions.assertThat(hadoopTableOperations.findVersion()).isEqualTo(findVersion + 1);
    }

    private void assertCommitNotChangeVersion(BaseTable baseTable, HadoopTableOperations hadoopTableOperations, Class<? extends Throwable> cls, String str) {
        int findVersion = hadoopTableOperations.findVersion();
        assertCommitFail(baseTable, hadoopTableOperations, cls, str);
        Assertions.assertThat(findVersion).isEqualTo(hadoopTableOperations.findVersion());
    }

    private void assertCommitFail(BaseTable baseTable, HadoopTableOperations hadoopTableOperations, Class<? extends Throwable> cls, String str) {
        TableMetadata current = hadoopTableOperations.current();
        TableMetadata replaceSortOrder = current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build());
        Assertions.assertThatThrownBy(() -> {
            hadoopTableOperations.commit(current, replaceSortOrder);
        }).isInstanceOf(cls).hasMessageContaining(str);
    }

    @Test
    void testCommitFailedAfterChangeVersionHintRepeatCommit() {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        BaseTable baseTable = this.table;
        HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
        ((HadoopTableOperations) Mockito.doThrow(new Throwable[]{new RuntimeException("FileSystem crash!")}).when(hadoopTableOperations)).deleteRemovedMetadataFiles((TableMetadata) Mockito.any(), (TableMetadata) Mockito.any());
        int findVersion = hadoopTableOperations.findVersion();
        TableMetadata current = hadoopTableOperations.current();
        TableMetadata replaceSortOrder = current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build());
        hadoopTableOperations.commit(current, replaceSortOrder);
        Assertions.assertThat(hadoopTableOperations.findVersion()).isEqualTo(findVersion + 1);
        hadoopTableOperations.commit(hadoopTableOperations.current(), replaceSortOrder.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).desc("id")).build()));
        Assertions.assertThat(hadoopTableOperations.findVersion()).isEqualTo(findVersion + 2);
    }

    @Test
    void testTwoClientCommitSameVersion() throws InterruptedException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicReference atomicReference2 = new AtomicReference(null);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        BaseTable baseTable = this.table;
        Assertions.assertThat(baseTable.operations().findVersion()).isEqualTo(2);
        newFixedThreadPool.execute(() -> {
            try {
                HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch.countDown();
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).renameMetaDataFileAndCheck((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), Mockito.anyBoolean());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock2 -> {
                    Path path = (Path) invocationOnMock2.getArgument(1);
                    Path path2 = (Path) invocationOnMock2.getArgument(2);
                    File file = new File(path.toUri());
                    File file2 = new File(path2.toUri());
                    Assertions.assertThat(Runtime.getRuntime().exec(String.format("mv -n %s  %s", file.getAbsolutePath(), file2.getAbsolutePath())).waitFor()).isZero();
                    return Boolean.valueOf(file2.exists() && !file.exists());
                }).when(hadoopTableOperations)).renameMetaDataFile((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any());
                TableMetadata current = hadoopTableOperations.current();
                hadoopTableOperations.commit(current, current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build()));
            } catch (CommitFailedException e) {
                atomicReference2.set(e);
            } catch (Throwable th) {
                atomicReference.set(th);
            }
        });
        newFixedThreadPool.execute(() -> {
            try {
                HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch.countDown();
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).renameMetaDataFileAndCheck((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), Mockito.anyBoolean());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock2 -> {
                    Path path = (Path) invocationOnMock2.getArgument(1);
                    Path path2 = (Path) invocationOnMock2.getArgument(2);
                    File file = new File(path.toUri());
                    File file2 = new File(path2.toUri());
                    Assertions.assertThat(Runtime.getRuntime().exec(String.format("mv -n %s  %s", file.getAbsolutePath(), file2.getAbsolutePath())).waitFor()).isZero();
                    return Boolean.valueOf(file2.exists() && !file.exists());
                }).when(hadoopTableOperations)).renameMetaDataFile((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any());
                TableMetadata current = hadoopTableOperations.current();
                hadoopTableOperations.commit(current, current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build()));
            } catch (CommitFailedException e) {
                atomicReference2.set(e);
            } catch (Throwable th) {
                atomicReference.set(th);
            }
        });
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(610L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        Assertions.assertThat((Throwable) atomicReference2.get()).isNotNull();
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThatThrownBy(() -> {
            throw ((Throwable) atomicReference2.get());
        }).isInstanceOf(CommitFailedException.class).hasMessageContaining("Can not commit newMetaData because version [3] has already been committed.");
        Assertions.assertThat(baseTable.operations().findVersion()).isEqualTo(3);
    }

    @Test
    void testConcurrentCommitAndRejectCommitAlreadyExistsVersion() throws InterruptedException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int i = 20;
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        newFixedThreadPool.execute(() -> {
            try {
                BaseTable baseTable = (BaseTable) this.table;
                HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch2.countDown();
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).commitNewVersion((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), (Integer) Mockito.any(), Mockito.anyBoolean());
                assertCommitFail(baseTable, hadoopTableOperations, CommitFailedException.class, "Version 3 already exists");
            } catch (Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        newFixedThreadPool.execute(() -> {
            try {
                countDownLatch2.await();
                while (atomicInteger.get() < i) {
                    this.table.newFastAppend().appendFile(FILE_A).commit();
                    countDownLatch.countDown();
                    atomicInteger.incrementAndGet();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat(atomicInteger.get()).isEqualTo(20);
    }

    @Test
    void testRejectCommitAlreadyExistsVersionWithUsingObjectStore() throws InterruptedException {
        this.table.updateProperties().set("write.object-storage.enabled", "true").commit();
        this.table.newFastAppend().appendFile(FILE_A).commit();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int i = 20;
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        newFixedThreadPool.execute(() -> {
            try {
                BaseTable baseTable = (BaseTable) this.table;
                HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch2.countDown();
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                assertCommitFail(baseTable, hadoopTableOperations, CommitFailedException.class, "Version 4 already exists");
            } catch (Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        newFixedThreadPool.execute(() -> {
            try {
                countDownLatch2.await();
                while (atomicInteger.get() < i) {
                    this.table.newFastAppend().appendFile(FILE_A).commit();
                    countDownLatch.countDown();
                    atomicInteger.incrementAndGet();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat(atomicInteger.get()).isEqualTo(20);
    }

    @Test
    void testConcurrentCommitAndRejectTooOldCommit() throws InterruptedException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.updateProperties().set("write.metadata.previous-versions-max", "2").commit();
        this.table.updateProperties().set("write.metadata.delete-after-commit.enabled", "true").commit();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int i = 20;
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        newFixedThreadPool.execute(() -> {
            try {
                BaseTable baseTable = (BaseTable) this.table;
                HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch2.countDown();
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).commitNewVersion((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), (Integer) Mockito.any(), Mockito.anyBoolean());
                assertCommitFail(baseTable, hadoopTableOperations, CommitFailedException.class, "Cannot commit version [5] because it is smaller or much larger than the current latest version [9].");
                countDownLatch3.countDown();
            } catch (Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        newFixedThreadPool.execute(() -> {
            try {
                countDownLatch2.await();
                while (atomicInteger.get() < i) {
                    this.table.newFastAppend().appendFile(FILE_A).commit();
                    countDownLatch.countDown();
                    if (countDownLatch.getCount() == 0) {
                        countDownLatch3.await();
                    }
                    atomicInteger.incrementAndGet();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat(atomicInteger.get()).isEqualTo(20);
    }

    @Test
    void testRejectTooOldCommitWithUsingObjectStore() throws InterruptedException {
        this.table.updateProperties().set("write.object-storage.enabled", "true").commit();
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.updateProperties().set("write.metadata.previous-versions-max", "2").commit();
        this.table.updateProperties().set("write.metadata.delete-after-commit.enabled", "true").commit();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int i = 20;
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        newFixedThreadPool.execute(() -> {
            try {
                BaseTable baseTable = (BaseTable) this.table;
                HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch2.countDown();
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                assertCommitFail(baseTable, hadoopTableOperations, CommitFailedException.class, "Cannot commit version [6] because it is smaller or much larger than the current latest version [10].");
            } catch (Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        newFixedThreadPool.execute(() -> {
            try {
                countDownLatch2.await();
                while (atomicInteger.get() < i) {
                    this.table.newFastAppend().appendFile(FILE_A).commit();
                    countDownLatch.countDown();
                    atomicInteger.incrementAndGet();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat(atomicInteger.get()).isEqualTo(20);
    }

    @Test
    void testConcurrentCommitAndRejectDirtyCommit() throws InterruptedException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.updateProperties().set("write.metadata.previous-versions-max", "2").commit();
        this.table.updateProperties().set("write.metadata.delete-after-commit.enabled", "true").commit();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        int i = 20;
        newFixedThreadPool.execute(() -> {
            try {
                BaseTable baseTable = this.table;
                HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
                TableMetadata current = hadoopTableOperations.current();
                TableMetadata replaceSortOrder = current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).renameMetaDataFileAndCheck((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), Mockito.anyBoolean());
                countDownLatch2.countDown();
                Assertions.assertThatThrownBy(() -> {
                    hadoopTableOperations.commit(current, replaceSortOrder);
                }).isInstanceOf(CommitStateUnknownException.class).hasMessageContaining("Commit rejected by server!The current commit version [5] is much smaller than the latest version [9]");
            } catch (Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        newFixedThreadPool.execute(() -> {
            try {
                countDownLatch2.await();
                while (atomicInteger.get() < i) {
                    this.table.newFastAppend().appendFile(FILE_A).commit();
                    countDownLatch.countDown();
                    atomicInteger.incrementAndGet();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        Assertions.assertThat(atomicInteger.get()).isEqualTo(20);
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
    }

    @Test
    void testCleanTooOldDirtyCommit() throws InterruptedException {
        this.table.newFastAppend().appendFile(FILE_A).commit();
        this.table.updateProperties().set("write.metadata.previous-versions-max", "2").commit();
        this.table.updateProperties().set("write.metadata.delete-after-commit.enabled", "true").commit();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(8);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int i = 20;
        BaseTable baseTable = this.table;
        HadoopTableOperations hadoopTableOperations = (HadoopTableOperations) Mockito.spy(baseTable.operations());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference atomicReference2 = new AtomicReference(null);
        newFixedThreadPool.execute(() -> {
            try {
                TableMetadata current = hadoopTableOperations.current();
                int findVersion = hadoopTableOperations.findVersion() + 1;
                TableMetadata replaceSortOrder = current.replaceSortOrder(((SortOrder.Builder) SortOrder.builderFor(baseTable.schema()).asc("data")).build());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).tryLock((Path) Mockito.any(), (Path) Mockito.any());
                ((HadoopTableOperations) Mockito.doAnswer(invocationOnMock -> {
                    countDownLatch2.countDown();
                    countDownLatch.await();
                    return invocationOnMock.callRealMethod();
                }).when(hadoopTableOperations)).renameMetaDataFileAndCheck((FileSystem) Mockito.any(), (Path) Mockito.any(), (Path) Mockito.any(), Mockito.anyBoolean());
                ((HadoopTableOperations) Mockito.doNothing().when(hadoopTableOperations)).fastFailIfDirtyCommit(Mockito.anyInt(), Mockito.anyInt(), (FileSystem) Mockito.any(), (Path) Mockito.any());
                hadoopTableOperations.commit(current, replaceSortOrder);
                atomicReference2.set(new File(hadoopTableOperations.getMetadataFile(findVersion).toUri()));
            } catch (Exception e) {
                atomicReference.set(e);
                throw new RuntimeException(e);
            }
        });
        newFixedThreadPool.execute(() -> {
            try {
                countDownLatch2.await();
                while (atomicInteger.get() < i) {
                    this.table.newFastAppend().appendFile(FILE_A).commit();
                    countDownLatch.countDown();
                    atomicInteger.incrementAndGet();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat((File) atomicReference2.get()).isNotNull();
        Assertions.assertThat((File) atomicReference2.get()).exists();
        Assertions.assertThat(((File) atomicReference2.get()).setLastModified(System.currentTimeMillis() - 2592000000L)).isTrue();
        this.table.newFastAppend().appendFile(FILE_A).commit();
        Assertions.assertThat((File) atomicReference2.get()).doesNotExist();
    }
}
