package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/operation/FileStoreCommitImpl.class */
public class FileStoreCommitImpl implements FileStoreCommit {
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
    private final FileIO fileIO;
    private final SchemaManager schemaManager;
    private final String commitUser;
    private final RowType partitionType;
    private final RowDataToObjectArrayConverter partitionObjectConverter;
    private final FileStorePathFactory pathFactory;
    private final SnapshotManager snapshotManager;
    private final ManifestFile manifestFile;
    private final ManifestList manifestList;
    private final IndexManifestFile indexManifestFile;
    private final FileStoreScan scan;
    private final int numBucket;
    private final MemorySize manifestTargetSize;
    private final MemorySize manifestFullCompactionSize;
    private final int manifestMergeMinCount;
    private final boolean dynamicPartitionOverwrite;

    @Nullable
    private final Comparator<InternalRow> keyComparator;

    @Nullable
    private Lock lock = null;
    private boolean ignoreEmptyCommit = true;
    private CommitMetrics commitMetrics = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/operation/FileStoreCommitImpl$LevelIdentifier.class */
    public static class LevelIdentifier {
        private final BinaryRow partition;
        private final int bucket;
        private final int level;

        private LevelIdentifier(BinaryRow binaryRow, int i, int i2) {
            this.partition = binaryRow;
            this.bucket = i;
            this.level = i2;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof LevelIdentifier)) {
                return false;
            }
            LevelIdentifier levelIdentifier = (LevelIdentifier) obj;
            return Objects.equals(this.partition, levelIdentifier.partition) && this.bucket == levelIdentifier.bucket && this.level == levelIdentifier.level;
        }

        public int hashCode() {
            return Objects.hash(this.partition, Integer.valueOf(this.bucket), Integer.valueOf(this.level));
        }
    }

    public FileStoreCommitImpl(FileIO fileIO, SchemaManager schemaManager, String str, RowType rowType, FileStorePathFactory fileStorePathFactory, SnapshotManager snapshotManager, ManifestFile.Factory factory, ManifestList.Factory factory2, IndexManifestFile.Factory factory3, FileStoreScan fileStoreScan, int i, MemorySize memorySize, MemorySize memorySize2, int i2, boolean z, @Nullable Comparator<InternalRow> comparator) {
        this.fileIO = fileIO;
        this.schemaManager = schemaManager;
        this.commitUser = str;
        this.partitionType = rowType;
        this.partitionObjectConverter = new RowDataToObjectArrayConverter(rowType);
        this.pathFactory = fileStorePathFactory;
        this.snapshotManager = snapshotManager;
        this.manifestFile = factory.create();
        this.manifestList = factory2.create();
        this.indexManifestFile = factory3.create();
        this.scan = fileStoreScan;
        this.numBucket = i;
        this.manifestTargetSize = memorySize;
        this.manifestFullCompactionSize = memorySize2;
        this.manifestMergeMinCount = i2;
        this.dynamicPartitionOverwrite = z;
        this.keyComparator = comparator;
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public FileStoreCommit withLock(Lock lock) {
        this.lock = lock;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public FileStoreCommit ignoreEmptyCommit(boolean z) {
        this.ignoreEmptyCommit = z;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public Set<Long> filterCommitted(Set<Long> set) {
        if (set.isEmpty()) {
            return set;
        }
        Optional<Snapshot> latestSnapshotOfUser = this.snapshotManager.latestSnapshotOfUser(this.commitUser);
        if (!latestSnapshotOfUser.isPresent()) {
            return set;
        }
        HashSet hashSet = new HashSet();
        for (Long l : set) {
            if (l.longValue() > latestSnapshotOfUser.get().commitIdentifier()) {
                hashSet.add(l);
            }
        }
        return hashSet;
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public void commit(ManifestCommittable manifestCommittable, Map<String, String> map) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit\n" + manifestCommittable.toString());
        }
        long nanoTime = System.nanoTime();
        int i = 0;
        int i2 = 0;
        Snapshot snapshot = null;
        Long l = null;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        ArrayList arrayList5 = new ArrayList();
        ArrayList arrayList6 = new ArrayList();
        collectChanges(manifestCommittable.fileCommittables(), arrayList2, arrayList3, arrayList4, arrayList5, arrayList6);
        try {
            if (!this.ignoreEmptyCommit || !arrayList2.isEmpty() || !arrayList3.isEmpty() || !arrayList6.isEmpty()) {
                snapshot = this.snapshotManager.latestSnapshot();
                if (snapshot != null) {
                    arrayList.addAll(readAllEntriesFromChangedPartitions(snapshot, arrayList2, arrayList4));
                    noConflictsOrFail(snapshot.commitUser(), arrayList, arrayList2);
                    l = Long.valueOf(snapshot.id());
                }
                i2 = 0 + tryCommit(arrayList2, arrayList3, arrayList6, manifestCommittable.identifier(), manifestCommittable.watermark(), manifestCommittable.logOffsets(), Snapshot.CommitKind.APPEND, l);
                i = 0 + 1;
            }
            if (!arrayList4.isEmpty() || !arrayList5.isEmpty()) {
                if (l != null) {
                    arrayList.addAll(arrayList2);
                    noConflictsOrFail(snapshot.commitUser(), arrayList, arrayList4);
                    l = Long.valueOf(l.longValue() + 1);
                }
                i2 += tryCommit(arrayList4, arrayList5, Collections.emptyList(), manifestCommittable.identifier(), manifestCommittable.watermark(), manifestCommittable.logOffsets(), Snapshot.CommitKind.COMPACT, l);
                i++;
            }
            long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
            if (this.commitMetrics != null) {
                reportCommit(arrayList2, arrayList3, arrayList4, arrayList5, nanoTime2, i, i2);
            }
        } catch (Throwable th) {
            long nanoTime3 = (System.nanoTime() - nanoTime) / 1000000;
            if (this.commitMetrics != null) {
                reportCommit(arrayList2, arrayList3, arrayList4, arrayList5, nanoTime3, i, i2);
            }
            throw th;
        }
    }

    private void reportCommit(List<ManifestEntry> list, List<ManifestEntry> list2, List<ManifestEntry> list3, List<ManifestEntry> list4, long j, int i, int i2) {
        this.commitMetrics.reportCommit(new CommitStats(list, list2, list3, list4, j, i, i2));
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public void overwrite(Map<String, String> map, ManifestCommittable manifestCommittable, Map<String, String> map2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to overwrite partition {}\nManifestCommittable: {}\nProperties: {}", new Object[]{map, manifestCommittable, map2});
        }
        long nanoTime = System.nanoTime();
        int i = 0;
        int i2 = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        ArrayList arrayList5 = new ArrayList();
        collectChanges(manifestCommittable.fileCommittables(), arrayList, arrayList2, arrayList3, arrayList4, arrayList5);
        if (!arrayList2.isEmpty() || !arrayList4.isEmpty()) {
            StringBuilder sb = new StringBuilder("Overwrite mode currently does not commit any changelog.\nPlease make sure that the partition you're overwriting is not being consumed by a streaming reader.\nIgnored changelog files are:\n");
            Iterator<ManifestEntry> it = arrayList2.iterator();
            while (it.hasNext()) {
                sb.append("  * ").append(it.next().toString()).append("\n");
            }
            Iterator<ManifestEntry> it2 = arrayList4.iterator();
            while (it2.hasNext()) {
                sb.append("  * ").append(it2.next().toString()).append("\n");
            }
            LOG.warn(sb.toString());
        }
        try {
            boolean z = false;
            Predicate predicate = null;
            if (!this.dynamicPartitionOverwrite) {
                predicate = PredicateBuilder.partition(map, this.partitionType);
                if (predicate != null) {
                    for (ManifestEntry manifestEntry : arrayList) {
                        if (!predicate.test(this.partitionObjectConverter.convert(manifestEntry.partition()))) {
                            throw new IllegalArgumentException("Trying to overwrite partition " + map + ", but the changes in " + this.pathFactory.getPartitionString(manifestEntry.partition()) + " does not belong to this partition");
                        }
                    }
                }
            } else if (arrayList.isEmpty()) {
                z = true;
            } else {
                predicate = (Predicate) arrayList.stream().map((v0) -> {
                    return v0.partition();
                }).distinct().map(binaryRow -> {
                    return PredicateBuilder.equalPartition(binaryRow, this.partitionType);
                }).reduce((predicate2, predicate3) -> {
                    return PredicateBuilder.or(new Predicate[]{predicate2, predicate3});
                }).orElseThrow(() -> {
                    return new RuntimeException("Failed to get dynamic partition filter. This is unexpected.");
                });
            }
            if (!z) {
                i2 = 0 + tryOverwrite(predicate, arrayList, arrayList5, manifestCommittable.identifier(), manifestCommittable.watermark(), manifestCommittable.logOffsets());
                i = 0 + 1;
            }
            if (!arrayList3.isEmpty()) {
                i2 += tryCommit(arrayList3, Collections.emptyList(), Collections.emptyList(), manifestCommittable.identifier(), manifestCommittable.watermark(), manifestCommittable.logOffsets(), Snapshot.CommitKind.COMPACT, null);
                i++;
            }
            long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
            if (this.commitMetrics != null) {
                reportCommit(arrayList, Collections.emptyList(), arrayList3, Collections.emptyList(), nanoTime2, i, i2);
            }
        } catch (Throwable th) {
            long nanoTime3 = (System.nanoTime() - nanoTime) / 1000000;
            if (this.commitMetrics != null) {
                reportCommit(arrayList, Collections.emptyList(), arrayList3, Collections.emptyList(), nanoTime3, 0, 0);
            }
            throw th;
        }
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public void dropPartitions(List<Map<String, String>> list, long j) {
        Preconditions.checkArgument(!list.isEmpty(), "Partitions list cannot be empty.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to drop partitions {}", list.stream().map((v0) -> {
                return Objects.toString(v0);
            }).collect(Collectors.joining(FieldListaggAgg.DELIMITER)));
        }
        tryOverwrite((Predicate) list.stream().map(map -> {
            return PredicateBuilder.partition(map, this.partitionType);
        }).reduce((predicate, predicate2) -> {
            return PredicateBuilder.or(new Predicate[]{predicate, predicate2});
        }).orElseThrow(() -> {
            return new RuntimeException("Failed to get partition filter.");
        }), Collections.emptyList(), Collections.emptyList(), j, null, new HashMap());
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public void purgeTable(long j) {
        tryOverwrite(null, Collections.emptyList(), Collections.emptyList(), j, null, new HashMap());
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public void abort(List<CommitMessage> list) {
        HashMap hashMap = new HashMap();
        for (CommitMessage commitMessage : list) {
            DataFilePathFactory dataFilePathFactory = (DataFilePathFactory) hashMap.computeIfAbsent(Pair.of(commitMessage.partition(), Integer.valueOf(commitMessage.bucket())), pair -> {
                return this.pathFactory.createDataFilePathFactory((BinaryRow) pair.getKey(), ((Integer) pair.getValue()).intValue());
            });
            CommitMessageImpl commitMessageImpl = (CommitMessageImpl) commitMessage;
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(commitMessageImpl.newFilesIncrement().newFiles());
            arrayList.addAll(commitMessageImpl.newFilesIncrement().changelogFiles());
            arrayList.addAll(commitMessageImpl.compactIncrement().compactAfter());
            arrayList.addAll(commitMessageImpl.compactIncrement().changelogFiles());
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.fileIO.deleteQuietly(dataFilePathFactory.toPath(((DataFileMeta) it.next()).fileName()));
            }
        }
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public FileStoreCommit withMetrics(CommitMetrics commitMetrics) {
        this.commitMetrics = commitMetrics;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public FileStorePathFactory pathFactory() {
        return this.pathFactory;
    }

    @Override // org.apache.paimon.operation.FileStoreCommit
    public FileIO fileIO() {
        return this.fileIO;
    }

    private void collectChanges(List<CommitMessage> list, List<ManifestEntry> list2, List<ManifestEntry> list3, List<ManifestEntry> list4, List<ManifestEntry> list5, List<IndexManifestEntry> list6) {
        Iterator<CommitMessage> it = list.iterator();
        while (it.hasNext()) {
            CommitMessageImpl commitMessageImpl = (CommitMessageImpl) it.next();
            commitMessageImpl.newFilesIncrement().newFiles().forEach(dataFileMeta -> {
                list2.add(makeEntry(FileKind.ADD, commitMessageImpl, dataFileMeta));
            });
            commitMessageImpl.newFilesIncrement().changelogFiles().forEach(dataFileMeta2 -> {
                list3.add(makeEntry(FileKind.ADD, commitMessageImpl, dataFileMeta2));
            });
            commitMessageImpl.compactIncrement().compactBefore().forEach(dataFileMeta3 -> {
                list4.add(makeEntry(FileKind.DELETE, commitMessageImpl, dataFileMeta3));
            });
            commitMessageImpl.compactIncrement().compactAfter().forEach(dataFileMeta4 -> {
                list4.add(makeEntry(FileKind.ADD, commitMessageImpl, dataFileMeta4));
            });
            commitMessageImpl.compactIncrement().changelogFiles().forEach(dataFileMeta5 -> {
                list5.add(makeEntry(FileKind.ADD, commitMessageImpl, dataFileMeta5));
            });
            commitMessageImpl.indexIncrement().newIndexFiles().forEach(indexFileMeta -> {
                list6.add(new IndexManifestEntry(FileKind.ADD, commitMessageImpl.partition(), commitMessageImpl.bucket(), indexFileMeta));
            });
        }
    }

    private ManifestEntry makeEntry(FileKind fileKind, CommitMessage commitMessage, DataFileMeta dataFileMeta) {
        return new ManifestEntry(fileKind, commitMessage.partition(), commitMessage.bucket(), this.numBucket, dataFileMeta);
    }

    private int tryCommit(List<ManifestEntry> list, List<ManifestEntry> list2, List<IndexManifestEntry> list3, long j, @Nullable Long l, Map<Integer, Long> map, Snapshot.CommitKind commitKind, Long l2) {
        int i = 0;
        do {
            i++;
        } while (!tryCommitOnce(list, list2, list3, j, l, map, commitKind, this.snapshotManager.latestSnapshot(), l2));
        return i;
    }

    private int tryOverwrite(Predicate predicate, List<ManifestEntry> list, List<IndexManifestEntry> list2, long j, @Nullable Long l, Map<Integer, Long> map) {
        Snapshot latestSnapshot;
        ArrayList arrayList;
        ArrayList arrayList2;
        int i = 0;
        do {
            latestSnapshot = this.snapshotManager.latestSnapshot();
            i++;
            arrayList = new ArrayList();
            arrayList2 = new ArrayList();
            if (latestSnapshot != null) {
                for (ManifestEntry manifestEntry : this.scan.withSnapshot(latestSnapshot).withPartitionFilter(predicate).plan().files()) {
                    arrayList.add(new ManifestEntry(FileKind.DELETE, manifestEntry.partition(), manifestEntry.bucket(), manifestEntry.totalBuckets(), manifestEntry.file()));
                }
                if (latestSnapshot.indexManifest() != null) {
                    RowDataToObjectArrayConverter rowDataToObjectArrayConverter = new RowDataToObjectArrayConverter(this.partitionType);
                    for (IndexManifestEntry indexManifestEntry : this.indexManifestFile.read(latestSnapshot.indexManifest())) {
                        if (predicate == null || predicate.test(rowDataToObjectArrayConverter.convert(indexManifestEntry.partition()))) {
                            arrayList2.add(indexManifestEntry.toDeleteEntry());
                        }
                    }
                }
            }
            arrayList.addAll(list);
            arrayList2.addAll(list2);
        } while (!tryCommitOnce(arrayList, Collections.emptyList(), arrayList2, j, l, map, Snapshot.CommitKind.OVERWRITE, latestSnapshot, null));
        return i;
    }

    @VisibleForTesting
    public boolean tryCommitOnce(List<ManifestEntry> list, List<ManifestEntry> list2, List<IndexManifestEntry> list3, long j, @Nullable Long l, Map<Integer, Long> map, Snapshot.CommitKind commitKind, Snapshot snapshot, Long l2) {
        long id = snapshot == null ? 1L : snapshot.id() + 1;
        Path snapshotPath = this.snapshotManager.snapshotPath(id);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit table files to snapshot #" + id);
            Iterator<ManifestEntry> it = list.iterator();
            while (it.hasNext()) {
                LOG.debug("  * " + it.next().toString());
            }
            LOG.debug("Ready to commit changelog to snapshot #" + id);
            Iterator<ManifestEntry> it2 = list2.iterator();
            while (it2.hasNext()) {
                LOG.debug("  * " + it2.next().toString());
            }
        }
        if (snapshot != null && !Objects.equals(Long.valueOf(snapshot.id()), l2)) {
            noConflictsOrFail(snapshot.commitUser(), snapshot, list);
        }
        String str = null;
        String str2 = null;
        String str3 = null;
        String str4 = null;
        List<ManifestFileMeta> arrayList = new ArrayList<>();
        List<ManifestFileMeta> arrayList2 = new ArrayList<>();
        List<ManifestFileMeta> arrayList3 = new ArrayList<>();
        long j2 = 0;
        Long l3 = l;
        String str5 = null;
        if (snapshot != null) {
            try {
                j2 = snapshot.totalRecordCount(this.scan).longValue();
                arrayList.addAll(snapshot.dataManifests(this.manifestList));
                Map<Integer, Long> logOffsets = snapshot.logOffsets();
                map.getClass();
                logOffsets.forEach((v1, v2) -> {
                    r1.putIfAbsent(v1, v2);
                });
                Long watermark = snapshot.watermark();
                if (watermark != null) {
                    l3 = Long.valueOf(l3 == null ? watermark.longValue() : Math.max(l3.longValue(), watermark.longValue()));
                }
                str5 = snapshot.indexManifest();
            } catch (Throwable th) {
                cleanUpTmpManifests(str, str2, str3, str4, arrayList, arrayList2, arrayList3);
                throw new RuntimeException(String.format("Exception occurs when preparing snapshot #%d (path %s) by user %s with hash %s and kind %s. Clean up.", Long.valueOf(id), snapshotPath.toString(), this.commitUser, Long.valueOf(j), commitKind.name()), th);
            }
        }
        arrayList2.addAll(ManifestFileMeta.merge(arrayList, this.manifestFile, this.manifestTargetSize.getBytes(), this.manifestMergeMinCount, this.manifestFullCompactionSize.getBytes(), this.partitionType));
        str = this.manifestList.write(arrayList2);
        long recordCountAdd = Snapshot.recordCountAdd(list) - Snapshot.recordCountDelete(list);
        long j3 = j2 + recordCountAdd;
        List<ManifestFileMeta> write = this.manifestFile.write(list);
        arrayList2.addAll(write);
        str2 = this.manifestList.write(write);
        if (!list2.isEmpty()) {
            arrayList3.addAll(this.manifestFile.write(list2));
            str3 = this.manifestList.write(arrayList3);
        }
        String merge = this.indexManifestFile.merge(str5, list3);
        if (!Objects.equals(merge, str5)) {
            str4 = merge;
        }
        Snapshot snapshot2 = new Snapshot(id, this.schemaManager.latest().get().id(), str, str2, str3, merge, this.commitUser, j, commitKind, System.currentTimeMillis(), map, Long.valueOf(j3), Long.valueOf(recordCountAdd), Long.valueOf(Snapshot.recordCount(list2)), l3);
        try {
            Callable callable = () -> {
                boolean writeFileUtf8 = this.fileIO.writeFileUtf8(snapshotPath, snapshot2.toJson());
                if (writeFileUtf8) {
                    this.snapshotManager.commitLatestHint(id);
                }
                return Boolean.valueOf(writeFileUtf8);
            };
            if (!(this.lock != null ? ((Boolean) this.lock.runWithLock(() -> {
                return Boolean.valueOf(!this.fileIO.exists(snapshotPath) && ((Boolean) callable.call()).booleanValue());
            })).booleanValue() : ((Boolean) callable.call()).booleanValue())) {
                LOG.warn(String.format("Atomic commit failed for snapshot #%d (path %s) by user %s with identifier %s and kind %s. Clean up and try again.", Long.valueOf(id), snapshotPath, this.commitUser, Long.valueOf(j), commitKind.name()));
                cleanUpTmpManifests(str, str2, str3, str4, arrayList, arrayList2, arrayList3);
                return false;
            }
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug(String.format("Successfully commit snapshot #%d (path %s) by user %s with identifier %s and kind %s.", Long.valueOf(id), snapshotPath, this.commitUser, Long.valueOf(j), commitKind.name()));
            return true;
        } catch (Throwable th2) {
            throw new RuntimeException(String.format("Exception occurs when committing snapshot #%d (path %s) by user %s with identifier %s and kind %s. Cannot clean up because we can't determine the success.", Long.valueOf(id), snapshotPath, this.commitUser, Long.valueOf(j), commitKind.name()), th2);
        }
    }

    @SafeVarargs
    private final List<ManifestEntry> readAllEntriesFromChangedPartitions(Snapshot snapshot, List<ManifestEntry>... listArr) {
        try {
            return this.scan.withSnapshot(snapshot).withPartitionFilter((List<BinaryRow>) Arrays.stream(listArr).flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.partition();
            }).distinct().collect(Collectors.toList())).plan().files();
        } catch (Throwable th) {
            throw new RuntimeException("Cannot read manifest entries from changed partitions.", th);
        }
    }

    private void noConflictsOrFail(String str, Snapshot snapshot, List<ManifestEntry> list) {
        noConflictsOrFail(str, readAllEntriesFromChangedPartitions(snapshot, list), list);
    }

    private void noConflictsOrFail(String str, List<ManifestEntry> list, List<ManifestEntry> list2) {
        ArrayList arrayList = new ArrayList(list);
        arrayList.addAll(list2);
        try {
            Collection<ManifestEntry> mergeEntries = ManifestEntry.mergeEntries(arrayList);
            ManifestEntry.assertNoDelete(mergeEntries);
            if (this.keyComparator == null) {
                return;
            }
            HashMap hashMap = new HashMap();
            for (ManifestEntry manifestEntry : mergeEntries) {
                int level = manifestEntry.file().level();
                if (level >= 1) {
                    ((List) hashMap.computeIfAbsent(new LevelIdentifier(manifestEntry.partition(), manifestEntry.bucket(), level), levelIdentifier -> {
                        return new ArrayList();
                    })).add(manifestEntry);
                }
            }
            for (List list3 : hashMap.values()) {
                list3.sort((manifestEntry2, manifestEntry3) -> {
                    return this.keyComparator.compare(manifestEntry2.file().minKey(), manifestEntry3.file().minKey());
                });
                for (int i = 0; i + 1 < list3.size(); i++) {
                    ManifestEntry manifestEntry4 = (ManifestEntry) list3.get(i);
                    ManifestEntry manifestEntry5 = (ManifestEntry) list3.get(i + 1);
                    if (this.keyComparator.compare(manifestEntry4.file().maxKey(), manifestEntry5.file().minKey()) >= 0) {
                        Pair<RuntimeException, RuntimeException> createConflictException = createConflictException("LSM conflicts detected! Give up committing. Conflict files are:\n" + manifestEntry4.identifier().toString(this.pathFactory) + "\n" + manifestEntry5.identifier().toString(this.pathFactory), str, list, list2, null, 50);
                        LOG.warn("", (Throwable) createConflictException.getLeft());
                        throw ((RuntimeException) createConflictException.getRight());
                    }
                }
            }
        } catch (Throwable th) {
            Pair<RuntimeException, RuntimeException> createConflictException2 = createConflictException("File deletion conflicts detected! Give up committing.", str, list, list2, th, 50);
            LOG.warn("", (Throwable) createConflictException2.getLeft());
            throw ((RuntimeException) createConflictException2.getRight());
        }
    }

    private Pair<RuntimeException, RuntimeException> createConflictException(String str, String str2, List<ManifestEntry> list, List<ManifestEntry> list2, Throwable th, int i) {
        String join = String.join("\n", "Don't panic!", "Conflicts during commits are normal and this failure is intended to resolve the conflicts.", "Conflicts are mainly caused by the following scenarios:", "1. Your job is suffering from back-pressuring.", "   There are too many snapshots waiting to be committed and an exception occurred during the commit procedure (most probably due to checkpoint timeout).", "   See https://paimon.apache.org/docs/master/maintenance/write-performance/ for how to improve writing performance.", "2. Multiple jobs are writing into the same partition at the same time, or you use STATEMENT SET to execute multiple INSERT statements into the same Paimon table.", "   You'll probably see different base commit user and current commit user below.", "   You can use https://paimon.apache.org/docs/master/maintenance/dedicated-compaction#dedicated-compaction-jobto support multiple writing.", "3. You're recovering from an old savepoint, or you're creating multiple jobs from a savepoint.", "   The job will fail continuously in this scenario to protect metadata from corruption.", "   You can either recover from the latest savepoint, or you can revert the table to the snapshot corresponding to the old savepoint.");
        String str3 = "Base commit user is: " + str2 + "; Current commit user is: " + this.commitUser;
        RuntimeException runtimeException = new RuntimeException(str + "\n\n" + join + "\n\n" + str3 + "\n\n" + ("Base entries are:\n" + ((String) list.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("\n")))) + "\n\n" + ("Changes are:\n" + ((String) list2.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("\n")))), th);
        if (list.size() <= i && list2.size() <= i) {
            return Pair.of(runtimeException, runtimeException);
        }
        return Pair.of(runtimeException, new RuntimeException(str + "\n\n" + join + "\n\n" + str3 + "\n\n" + ("Base entries are:\n" + ((String) list.subList(0, Math.min(list.size(), i)).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("\n")))) + "\n\n" + ("Changes are:\n" + ((String) list2.subList(0, Math.min(list2.size(), i)).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("\n")))) + "\n\nThe entry list above are not fully displayed, please refer to taskmanager.log for more information.", th));
    }

    private void cleanUpTmpManifests(String str, String str2, String str3, String str4, List<ManifestFileMeta> list, List<ManifestFileMeta> list2, List<ManifestFileMeta> list3) {
        if (str != null) {
            this.manifestList.delete(str);
        }
        if (str2 != null) {
            this.manifestList.delete(str2);
        }
        if (str3 != null) {
            this.manifestList.delete(str3);
        }
        if (str4 != null) {
            this.indexManifestFile.delete(str4);
        }
        HashSet hashSet = new HashSet(list);
        for (ManifestFileMeta manifestFileMeta : list2) {
            if (!hashSet.contains(manifestFileMeta)) {
                this.manifestList.delete(manifestFileMeta.fileName());
            }
        }
        Iterator<ManifestFileMeta> it = list3.iterator();
        while (it.hasNext()) {
            this.manifestList.delete(it.next().fileName());
        }
    }
}
