package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progressable;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.compaction.IcebergCompactionService;
import org.apache.iceberg.mr.hive.compaction.IcebergCompactionUtil;
import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.class */
public class HiveIcebergOutputCommitter extends OutputCommitter {
    private static final String FOR_COMMIT_EXTENSION = ".forCommit";
    private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
    private static final HiveIcebergOutputCommitter OUTPUT_COMMITTER = new HiveIcebergOutputCommitter();
    private ExecutorService workerPool;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter$OutputTable.class */
    public static class OutputTable {
        private final String catalogName;
        private final String tableName;
        private final Table table;

        private OutputTable(String str, String str2, Table table) {
            this.catalogName = str;
            this.tableName = str2;
            this.table = table;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.tableName, ((OutputTable) obj).tableName);
        }

        public int hashCode() {
            return Objects.hash(this.tableName);
        }
    }

    public static HiveIcebergOutputCommitter getInstance() {
        return OUTPUT_COMMITTER;
    }

    public void setupJob(JobContext jobContext) {
    }

    public void setupTask(TaskAttemptContext taskAttemptContext) {
    }

    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
        return TaskType.REDUCE.equals(taskAttemptContext.getTaskAttemptID().getTaskID().getTaskType()) || taskAttemptContext.getJobConf().getNumReduceTasks() == 0;
    }

    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        TaskAttemptContext enrichContextWithAttemptWrapper = TezUtil.enrichContextWithAttemptWrapper(taskAttemptContext);
        TaskAttemptID taskAttemptID = enrichContextWithAttemptWrapper.getTaskAttemptID();
        JobConf jobConf = enrichContextWithAttemptWrapper.getJobConf();
        Set<Path> combinedLocations = getCombinedLocations(jobConf);
        Set<String> outputTables = HiveIcebergStorageHandler.outputTables(enrichContextWithAttemptWrapper.getJobConf());
        Map map = (Map) Optional.ofNullable(WriterRegistry.writers(taskAttemptID)).orElseGet(() -> {
            LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputTables, taskAttemptID);
            return ImmutableMap.of();
        });
        ExecutorService tableExecutor = tableExecutor(jobConf, outputTables.size());
        try {
            Tasks.foreach(outputTables).retry(3).stopOnFailure().throwFailureWhenFinished().executeWith(tableExecutor).run(str -> {
                Table table = HiveIcebergStorageHandler.table(enrichContextWithAttemptWrapper.getJobConf(), str);
                if (table == null) {
                    LOG.info("CommitTask found no serialized table in config for table: {}.", str);
                    return;
                }
                String generateFileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, taskAttemptID.getJobID(), taskAttemptID.getTaskID().getId());
                if (map.get(str) == null) {
                    LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", str, taskAttemptID);
                    createFileForCommit(FilesForCommit.empty(), generateFileForCommitLocation, table.io());
                    return;
                }
                ArrayList newArrayList = Lists.newArrayList();
                ArrayList newArrayList2 = Lists.newArrayList();
                ArrayList newArrayList3 = Lists.newArrayList();
                HashSet newHashSet = Sets.newHashSet();
                Iterator it = ((List) map.get(str)).iterator();
                while (it.hasNext()) {
                    FilesForCommit files = ((HiveIcebergWriter) it.next()).files();
                    newArrayList.addAll(files.dataFiles());
                    newArrayList2.addAll(files.deleteFiles());
                    newArrayList3.addAll(files.replacedDataFiles());
                    newHashSet.addAll(files.referencedDataFiles());
                }
                createFileForCommit(new FilesForCommit(newArrayList, newArrayList2, newArrayList3, newHashSet, combinedLocations), generateFileForCommitLocation, table.io());
            }, IOException.class);
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            WriterRegistry.removeWriters(taskAttemptID);
        } catch (Throwable th) {
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        Map<String, List<HiveIcebergWriter>> removeWriters = WriterRegistry.removeWriters(TezUtil.enrichContextWithAttemptWrapper(taskAttemptContext).getTaskAttemptID());
        if (removeWriters != null) {
            Iterator<List<HiveIcebergWriter>> it = removeWriters.values().iterator();
            while (it.hasNext()) {
                Iterator<HiveIcebergWriter> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    it2.next().close(true);
                }
            }
        }
    }

    public void commitJob(JobContext jobContext) throws IOException {
        commitJobs(Collections.singletonList(jobContext), Context.Operation.OTHER);
    }

    public void setWorkerPool(ExecutorService executorService) {
        this.workerPool = executorService;
    }

    public void commitJobs(List<JobContext> list, Context.Operation operation) throws IOException {
        List<JobContext> list2 = (List) list.stream().map(TezUtil::enrichContextWithVertexId).collect(Collectors.toList());
        Multimap<OutputTable, JobContext> collectOutputs = collectOutputs(list2);
        JobConf jobConf = list2.get(0).getJobConf();
        long currentTimeMillis = System.currentTimeMillis();
        String str = (String) list2.stream().map(jobContext -> {
            return jobContext.getJobID().toString();
        }).collect(Collectors.joining(","));
        LOG.info("Committing job(s) {} has started", str);
        ExecutorService fileExecutor = fileExecutor(jobConf);
        ExecutorService tableExecutor = tableExecutor(jobConf, collectOutputs.keySet().size());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        try {
            Tasks.foreach(collectOutputs.keySet()).throwFailureWhenFinished().stopOnFailure().executeWith(tableExecutor).run(outputTable -> {
                Collection<JobContext> collection = collectOutputs.get(outputTable);
                Table table = outputTable.table;
                collection.forEach(jobContext2 -> {
                    concurrentLinkedQueue.add(generateJobLocation(table.location(), jobConf, jobContext2.getJobID()));
                });
                commitTable(table.io(), fileExecutor, outputTable, collection, operation);
            });
            cleanMergeTaskInputFiles(list2, fileExecutor);
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            LOG.info("Commit took {} ms for job(s) {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str);
            Iterator<JobContext> it = list2.iterator();
            while (it.hasNext()) {
                cleanup(it.next(), concurrentLinkedQueue);
            }
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    private Multimap<OutputTable, JobContext> collectOutputs(List<JobContext> list) {
        ListMultimap newListMultimap = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
        for (JobContext jobContext : list) {
            for (String str : HiveIcebergStorageHandler.outputTables(jobContext.getJobConf())) {
                Table table = (Table) SessionStateUtil.getResource(jobContext.getJobConf(), str).filter(obj -> {
                    return obj instanceof Table;
                }).map(obj2 -> {
                    return (Table) obj2;
                }).orElseGet(() -> {
                    return HiveIcebergStorageHandler.table(jobContext.getJobConf(), str);
                });
                if (table != null) {
                    newListMultimap.put(new OutputTable(HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), str), str, table), jobContext);
                } else {
                    LOG.info("Found no table object in QueryState or conf for: {}. Skipping job commit.", str);
                }
            }
        }
        return newListMultimap;
    }

    public void abortJob(JobContext jobContext, int i) throws IOException {
        abortJobs(Collections.singletonList(jobContext));
    }

    public void abortJobs(List<JobContext> list) throws IOException {
        List<JobContext> list2 = (List) list.stream().map(TezUtil::enrichContextWithVertexId).collect(Collectors.toList());
        Multimap<OutputTable, JobContext> collectOutputs = collectOutputs(list2);
        JobConf jobConf = list2.get(0).getJobConf();
        String str = (String) list2.stream().map(jobContext -> {
            return jobContext.getJobID().toString();
        }).collect(Collectors.joining(","));
        LOG.info("Job(s) {} are aborted. Data file cleaning started", str);
        ExecutorService fileExecutor = fileExecutor(jobConf);
        ExecutorService tableExecutor = tableExecutor(jobConf, collectOutputs.keySet().size());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        try {
            Tasks.foreach(collectOutputs.keySet()).suppressFailureWhenFinished().executeWith(tableExecutor).onFailure((outputTable, exc) -> {
                LOG.warn("Failed cleanup table {} on abort job", outputTable, exc);
            }).run(outputTable2 -> {
                for (JobContext jobContext2 : collectOutputs.get(outputTable2)) {
                    LOG.info("Cleaning job for jobID: {}, table: {}", jobContext2.getJobID(), outputTable2);
                    Table table = outputTable2.table;
                    String generateJobLocation = generateJobLocation(table.location(), jobConf, jobContext2.getJobID());
                    concurrentLinkedQueue.add(generateJobLocation);
                    Tasks.foreach(collectResults(listForCommits(jobConf, generateJobLocation).size(), fileExecutor, table.location(), jobContext2, table.io(), false).allFiles()).retry(3).suppressFailureWhenFinished().executeWith(fileExecutor).onFailure((contentFile, exc2) -> {
                        LOG.warn("Failed to remove data file {} on abort job", contentFile.path(), exc2);
                    }).run(contentFile2 -> {
                        table.io().deleteFile(contentFile2.path().toString());
                    });
                }
            }, IOException.class);
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            LOG.info("Job(s) {} are aborted. Data file cleaning finished", str);
            Iterator<JobContext> it = list2.iterator();
            while (it.hasNext()) {
                cleanup(it.next(), concurrentLinkedQueue);
            }
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    private Set<FileStatus> listForCommits(JobConf jobConf, String str) throws IOException {
        Path path = new Path(str);
        LOG.debug("Listing job location to get commitTask manifest files for abort: {}", str);
        FileStatus[] listStatus = path.getFileSystem(jobConf).listStatus(path);
        LOG.debug("Listing the job location: {} yielded these files: {}", str, Arrays.toString(listStatus));
        return (Set) Arrays.stream(listStatus).filter(fileStatus -> {
            return !fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(FOR_COMMIT_EXTENSION);
        }).collect(Collectors.toSet());
    }

    private void commitTable(FileIO fileIO, ExecutorService executorService, OutputTable outputTable, Collection<JobContext> collection, Context.Operation operation) {
        String str = outputTable.tableName;
        Properties properties = new Properties();
        properties.put(Catalogs.NAME, str);
        properties.put(Catalogs.LOCATION, outputTable.table.location());
        if (outputTable.catalogName != null) {
            properties.put(InputFormatConfig.CATALOG_NAME, outputTable.catalogName);
        }
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        ArrayList newArrayList3 = Lists.newArrayList();
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        Table table = null;
        String str2 = null;
        Long l = null;
        Expression alwaysTrue = Expressions.alwaysTrue();
        for (JobContext jobContext : collection) {
            JobConf jobConf = jobContext.getJobConf();
            table = (Table) Optional.ofNullable(table).orElse(Catalogs.loadTable(jobConf, properties));
            str2 = jobConf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
            l = getSnapshotId(outputTable.table, str2);
            Expression expression = (Expression) SessionStateUtil.getResource(jobConf, InputFormatConfig.QUERY_FILTERS).orElse(Expressions.alwaysTrue());
            if (!alwaysTrue.equals(expression)) {
                alwaysTrue = Expressions.and(alwaysTrue, expression);
            }
            LOG.debug("Filter Expression :{}", alwaysTrue);
            LOG.info("Committing job has started for table: {}, using location: {}", table, generateJobLocation(outputTable.table.location(), jobConf, jobContext.getJobID()));
            FilesForCommit collectResults = collectResults(((Integer) SessionStateUtil.getCommitInfo(jobConf, str).map(map -> {
                return (SessionStateUtil.CommitInfo) map.get(jobContext.getJobID().toString());
            }).map((v0) -> {
                return v0.getTaskNum();
            }).orElseGet(() -> {
                LOG.info("Number of tasks not available in session state for jobID: {}, table: {}. Falling back to jobConf numReduceTasks/numMapTasks", jobContext.getJobID(), str);
                return Integer.valueOf(jobConf.getNumReduceTasks() > 0 ? jobConf.getNumReduceTasks() : jobConf.getNumMapTasks());
            })).intValue(), executorService, outputTable.table.location(), jobContext, fileIO, true);
            newArrayList.addAll(collectResults.dataFiles());
            newArrayList2.addAll(collectResults.deleteFiles());
            newArrayList3.addAll(collectResults.replacedDataFiles());
            newHashSet.addAll(collectResults.referencedDataFiles());
            newHashSet2.addAll(collectResults.mergedAndDeletedFiles());
        }
        newArrayList.removeIf(dataFile -> {
            return newHashSet2.contains(new Path(String.valueOf(dataFile.path())));
        });
        newArrayList2.removeIf(deleteFile -> {
            return newHashSet2.contains(new Path(String.valueOf(deleteFile.path())));
        });
        FilesForCommit filesForCommit = new FilesForCommit(newArrayList, newArrayList2, newArrayList3, newHashSet, Collections.emptySet());
        long currentTimeMillis = System.currentTimeMillis();
        if (Context.Operation.IOW != operation) {
            if (filesForCommit.isEmpty()) {
                LOG.info("Not creating a new commit for table: {}, jobIDs: {}, since there were no new files to add", table, collection.stream().map((v0) -> {
                    return v0.getJobID();
                }).map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(",")));
                return;
            } else {
                commitWrite(table, str2, l, currentTimeMillis, filesForCommit, operation, alwaysTrue);
                return;
            }
        }
        if (Context.RewritePolicy.fromString((String) collection.stream().findAny().map(jobContext2 -> {
            return jobContext2.getJobConf().get(HiveConf.ConfVars.REWRITE_POLICY.varname);
        }).orElse(Context.RewritePolicy.DEFAULT.name())) == Context.RewritePolicy.DEFAULT) {
            commitOverwrite(table, str2, l, currentTimeMillis, filesForCommit);
            return;
        }
        commitCompaction(table, l, currentTimeMillis, filesForCommit, (String) collection.stream().findAny().map(jobContext3 -> {
            return jobContext3.getJobConf().get(IcebergCompactionService.PARTITION_PATH);
        }).orElse(null), ((Long) collection.stream().findAny().map(jobContext4 -> {
            return jobContext4.getJobConf().get("compactor.threshold.file.size.threshold");
        }).map(Long::parseLong).orElse(-1L)).longValue());
    }

    private Long getSnapshotId(Table table, String str) {
        Snapshot tableSnapshot = IcebergTableUtil.getTableSnapshot(table, str);
        if (tableSnapshot != null) {
            return Long.valueOf(tableSnapshot.snapshotId());
        }
        return null;
    }

    private void commitWrite(Table table, String str, Long l, long j, FilesForCommit filesForCommit, Context.Operation operation, Expression expression) {
        if (!filesForCommit.replacedDataFiles().isEmpty()) {
            OverwriteFiles newOverwrite = table.newOverwrite();
            Collection<DataFile> replacedDataFiles = filesForCommit.replacedDataFiles();
            newOverwrite.getClass();
            replacedDataFiles.forEach(newOverwrite::deleteFile);
            Collection<DataFile> dataFiles = filesForCommit.dataFiles();
            newOverwrite.getClass();
            dataFiles.forEach(newOverwrite::addFile);
            if (StringUtils.isNotEmpty(str)) {
                newOverwrite.toBranch(HiveUtils.getTableSnapshotRef(str));
            }
            if (l != null) {
                newOverwrite.validateFromSnapshot(l.longValue());
            }
            newOverwrite.conflictDetectionFilter(expression);
            newOverwrite.validateNoConflictingData();
            newOverwrite.validateNoConflictingDeletes();
            commit(newOverwrite);
            return;
        }
        if (!filesForCommit.deleteFiles().isEmpty() || Context.Operation.MERGE == operation) {
            RowDelta newRowDelta = table.newRowDelta();
            Collection<DataFile> dataFiles2 = filesForCommit.dataFiles();
            newRowDelta.getClass();
            dataFiles2.forEach(newRowDelta::addRows);
            Collection<DeleteFile> deleteFiles = filesForCommit.deleteFiles();
            newRowDelta.getClass();
            deleteFiles.forEach(newRowDelta::addDeletes);
            if (StringUtils.isNotEmpty(str)) {
                newRowDelta.toBranch(HiveUtils.getTableSnapshotRef(str));
            }
            if (l != null) {
                newRowDelta.validateFromSnapshot(l.longValue());
            }
            newRowDelta.conflictDetectionFilter(expression);
            if (!filesForCommit.dataFiles().isEmpty()) {
                newRowDelta.validateDeletedFiles();
                newRowDelta.validateNoConflictingDeleteFiles();
            }
            newRowDelta.validateDataFilesExist(filesForCommit.referencedDataFiles());
            newRowDelta.validateNoConflictingDataFiles();
            commit(newRowDelta);
        } else {
            AppendFiles newAppend = table.newAppend();
            Collection<DataFile> dataFiles3 = filesForCommit.dataFiles();
            newAppend.getClass();
            dataFiles3.forEach(newAppend::appendFile);
            if (StringUtils.isNotEmpty(str)) {
                newAppend.toBranch(HiveUtils.getTableSnapshotRef(str));
            }
            commit(newAppend);
        }
        LOG.info("Write commit took {} ms for table: {} with {} data and {} delete file(s)", new Object[]{Long.valueOf(System.currentTimeMillis() - j), table, Integer.valueOf(filesForCommit.dataFiles().size()), Integer.valueOf(filesForCommit.deleteFiles().size())});
        LOG.debug("Added files {}", filesForCommit);
    }

    private void commit(SnapshotUpdate<?> snapshotUpdate) {
        if (this.workerPool != null) {
            snapshotUpdate.scanManifestsWith(this.workerPool);
        }
        snapshotUpdate.commit();
    }

    private void commitCompaction(Table table, Long l, long j, FilesForCommit filesForCommit, String str, long j2) {
        List<DataFile> dataFiles = IcebergCompactionUtil.getDataFiles(table, str, j2);
        List<DeleteFile> deleteFiles = j2 == -1 ? IcebergCompactionUtil.getDeleteFiles(table, str) : Collections.emptyList();
        RewriteFiles newRewrite = table.newRewrite();
        newRewrite.getClass();
        dataFiles.forEach(newRewrite::deleteFile);
        newRewrite.getClass();
        deleteFiles.forEach(newRewrite::deleteFile);
        Collection<DataFile> dataFiles2 = filesForCommit.dataFiles();
        newRewrite.getClass();
        dataFiles2.forEach(newRewrite::addFile);
        if (l != null) {
            newRewrite.validateFromSnapshot(l.longValue());
        }
        newRewrite.commit();
        LOG.info("Compaction commit took {} ms for table: {} partition: {} with {} file(s)", new Object[]{Long.valueOf(System.currentTimeMillis() - j), table, StringUtils.defaultString(str, "N/A"), Integer.valueOf(filesForCommit.dataFiles().size())});
    }

    private void commitOverwrite(Table table, String str, Long l, long j, FilesForCommit filesForCommit) {
        Preconditions.checkArgument(filesForCommit.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
        if (!filesForCommit.dataFiles().isEmpty()) {
            ReplacePartitions newReplacePartitions = table.newReplacePartitions();
            Collection<DataFile> dataFiles = filesForCommit.dataFiles();
            newReplacePartitions.getClass();
            dataFiles.forEach(newReplacePartitions::addFile);
            if (StringUtils.isNotEmpty(str)) {
                newReplacePartitions.toBranch(HiveUtils.getTableSnapshotRef(str));
            }
            if (l != null) {
                newReplacePartitions.validateFromSnapshot(l.longValue());
            }
            newReplacePartitions.validateNoConflictingDeletes();
            newReplacePartitions.validateNoConflictingData();
            commit(newReplacePartitions);
            LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", new Object[]{Long.valueOf(System.currentTimeMillis() - j), table, Integer.valueOf(filesForCommit.dataFiles().size())});
        } else if (table.spec().isUnpartitioned()) {
            DeleteFiles newDelete = table.newDelete();
            newDelete.deleteFromRowFilter(Expressions.alwaysTrue());
            if (StringUtils.isNotEmpty(str)) {
                newDelete.toBranch(HiveUtils.getTableSnapshotRef(str));
            }
            commit(newDelete);
            LOG.info("Cleared table contents as part of empty overwrite for unpartitioned table. Commit took {} ms for table: {}", Long.valueOf(System.currentTimeMillis() - j), table);
        }
        LOG.debug("Overwrote partitions with files {}", filesForCommit);
    }

    private void cleanup(JobContext jobContext, Collection<String> collection) throws IOException {
        JobConf jobConf = jobContext.getJobConf();
        LOG.info("Cleaning for job {} started", jobContext.getJobID());
        Tasks.foreach(collection).retry(3).suppressFailureWhenFinished().onFailure((str, exc) -> {
            LOG.debug("Failed to remove directory {} on job cleanup", str, exc);
        }).run(str2 -> {
            LOG.info("Cleaning location: {}", str2);
            Path path = new Path(str2);
            Util.getFs(path, jobConf).delete(path, true);
        }, IOException.class);
        LOG.info("Cleaning for job {} finished", jobContext.getJobID());
    }

    private static ExecutorService fileExecutor(Configuration configuration) {
        return Executors.newFixedThreadPool(configuration.getInt(InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE, 10), new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-file-pool-%d").build());
    }

    private static ExecutorService tableExecutor(Configuration configuration, int i) {
        int min = Math.min(i, configuration.getInt(InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE, 10));
        if (min > 1) {
            return Executors.newFixedThreadPool(min, new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-table-pool-%d").build());
        }
        return null;
    }

    private static FilesForCommit collectResults(int i, ExecutorService executorService, String str, JobContext jobContext, FileIO fileIO, boolean z) {
        JobConf jobConf = jobContext.getJobConf();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue3 = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue4 = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue5 = new ConcurrentLinkedQueue();
        Tasks.range(i).throwFailureWhenFinished(z).executeWith(executorService).retry(3).run(num -> {
            String generateFileForCommitLocation = generateFileForCommitLocation(str, jobConf, jobContext.getJobID(), num.intValue());
            FilesForCommit readFileForCommit = readFileForCommit(generateFileForCommitLocation, fileIO);
            LOG.debug("Found Iceberg commitTask manifest file: {}\n{}", generateFileForCommitLocation, readFileForCommit);
            concurrentLinkedQueue.addAll(readFileForCommit.dataFiles());
            concurrentLinkedQueue2.addAll(readFileForCommit.deleteFiles());
            concurrentLinkedQueue3.addAll(readFileForCommit.replacedDataFiles());
            concurrentLinkedQueue4.addAll(readFileForCommit.referencedDataFiles());
            concurrentLinkedQueue5.addAll(readFileForCommit.mergedAndDeletedFiles());
        });
        return new FilesForCommit(concurrentLinkedQueue, concurrentLinkedQueue2, concurrentLinkedQueue3, concurrentLinkedQueue4, concurrentLinkedQueue5);
    }

    @VisibleForTesting
    static String generateJobLocation(String str, Configuration configuration, JobID jobID) {
        return str + "/temp/" + configuration.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname) + "-" + jobID;
    }

    private static String generateFileForCommitLocation(String str, Configuration configuration, JobID jobID, int i) {
        return generateJobLocation(str, configuration, jobID) + "/task-" + i + FOR_COMMIT_EXTENSION;
    }

    private static void createFileForCommit(FilesForCommit filesForCommit, String str, FileIO fileIO) throws IOException {
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileIO.newOutputFile(str).createOrOverwrite());
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(filesForCommit);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                LOG.debug("Created Iceberg commitTask manifest file: {}\n{}", str, filesForCommit);
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private static FilesForCommit readFileForCommit(String str, FileIO fileIO) {
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(fileIO.newInputFile(str).newStream());
            Throwable th = null;
            try {
                try {
                    FilesForCommit filesForCommit = (FilesForCommit) objectInputStream.readObject();
                    if (objectInputStream != null) {
                        if (0 != 0) {
                            try {
                                objectInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            objectInputStream.close();
                        }
                    }
                    return filesForCommit;
                } finally {
                }
            } catch (Throwable th3) {
                if (objectInputStream != null) {
                    if (th != null) {
                        try {
                            objectInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        objectInputStream.close();
                    }
                }
                throw th3;
            }
        } catch (IOException | ClassNotFoundException e) {
            throw new NotFoundException("Can not read or parse commitTask manifest file: %s", str);
        }
    }

    public List<FileStatus> getOutputFiles(List<JobContext> list) throws IOException {
        Multimap<OutputTable, JobContext> collectOutputs = collectOutputs(list);
        JobConf jobConf = list.get(0).getJobConf();
        ExecutorService fileExecutor = fileExecutor(jobConf);
        ExecutorService tableExecutor = tableExecutor(jobConf, collectOutputs.keySet().size());
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        ConcurrentMap newConcurrentMap2 = Maps.newConcurrentMap();
        try {
            Tasks.foreach(collectOutputs.keySet()).suppressFailureWhenFinished().executeWith(tableExecutor).onFailure((outputTable, exc) -> {
                LOG.warn("Failed to retrieve merge input file for the table {}", outputTable, exc);
            }).run(outputTable2 -> {
                for (JobContext jobContext : collectOutputs.get(outputTable2)) {
                    LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), outputTable2);
                    Table table = outputTable2.table;
                    FileSystem fileSystem = new Path(table.location()).getFileSystem(jobConf);
                    FilesForCommit collectResults = collectResults(listForCommits(jobConf, generateJobLocation(table.location(), jobConf, jobContext.getJobID())).size(), fileExecutor, table.location(), jobContext, table.io(), false);
                    Iterator<DataFile> it = collectResults.dataFiles().iterator();
                    while (it.hasNext()) {
                        Path path = new Path(it.next().path().toString());
                        ((List) newConcurrentMap.computeIfAbsent(path.getParent(), path2 -> {
                            return Lists.newArrayList();
                        })).add(fileSystem.getFileStatus(path));
                    }
                    Iterator<DeleteFile> it2 = collectResults.deleteFiles().iterator();
                    while (it2.hasNext()) {
                        Path path3 = new Path(it2.next().path().toString());
                        ((List) newConcurrentMap2.computeIfAbsent(path3.getParent(), path4 -> {
                            return Lists.newArrayList();
                        })).add(fileSystem.getFileStatus(path3));
                    }
                }
            }, IOException.class);
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            return (List) Stream.of((Object[]) new Map[]{newConcurrentMap, newConcurrentMap2}).flatMap(map -> {
                return map.values().stream().flatMap((v0) -> {
                    return v0.stream();
                });
            }).collect(Collectors.toList());
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    public List<ContentFile> getOutputContentFiles(List<JobContext> list) throws IOException {
        Multimap<OutputTable, JobContext> collectOutputs = collectOutputs(list);
        JobConf jobConf = list.get(0).getJobConf();
        ExecutorService fileExecutor = fileExecutor(jobConf);
        ExecutorService tableExecutor = tableExecutor(jobConf, collectOutputs.keySet().size());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        try {
            Tasks.foreach(collectOutputs.keySet()).suppressFailureWhenFinished().executeWith(tableExecutor).onFailure((outputTable, exc) -> {
                LOG.warn("Failed to retrieve merge input file for the table {}", outputTable, exc);
            }).run(outputTable2 -> {
                for (JobContext jobContext : collectOutputs.get(outputTable2)) {
                    LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), outputTable2);
                    Table table = outputTable2.table;
                    FilesForCommit collectResults = collectResults(listForCommits(jobConf, generateJobLocation(table.location(), jobConf, jobContext.getJobID())).size(), fileExecutor, table.location(), jobContext, table.io(), false);
                    concurrentLinkedQueue.addAll(collectResults.dataFiles());
                    concurrentLinkedQueue.addAll(collectResults.deleteFiles());
                }
            }, IOException.class);
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            return Lists.newArrayList(concurrentLinkedQueue);
        } catch (Throwable th) {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
            throw th;
        }
    }

    private void cleanMergeTaskInputFiles(List<JobContext> list, ExecutorService executorService) throws IOException {
        Tasks.foreach(list.stream().map((v0) -> {
            return v0.getJobConf();
        }).filter(jobConf -> {
            return jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class);
        }).map((v0) -> {
            return Utilities.getMapWork(v0);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getInputPaths();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        })).retry(3).executeWith(executorService).run(path -> {
            FileSystem fileSystem = path.getFileSystem(((JobContext) list.get(0)).getJobConf());
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
        }, IOException.class);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<JobContext> generateJobContext(Configuration configuration, String str, String str2) {
        JobConf jobConf = new JobConf(configuration);
        Optional commitInfo = SessionStateUtil.getCommitInfo(jobConf, str);
        if (!commitInfo.isPresent()) {
            LOG.debug("Unable to find commit information in query state for table: {}", str);
            return Collections.emptyList();
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        for (SessionStateUtil.CommitInfo commitInfo2 : ((Map) commitInfo.get()).values()) {
            org.apache.hadoop.mapred.JobID forName = org.apache.hadoop.mapred.JobID.forName(commitInfo2.getJobIdStr());
            Map props = commitInfo2.getProps();
            jobConf.getClass();
            props.forEach(jobConf::set);
            jobConf.set(InputFormatConfig.OUTPUT_TABLES, str);
            if (str2 != null) {
                jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, str2);
            }
            newLinkedList.add(new JobContextImpl(jobConf, forName, (Progressable) null));
        }
        return newLinkedList;
    }

    private Set<Path> getCombinedLocations(JobConf jobConf) {
        MapWork mapWork;
        HashSet newHashSet = Sets.newHashSet();
        if (jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class) && (mapWork = Utilities.getMapWork(jobConf)) != null && mapWork.getInputPaths() != null) {
            newHashSet.addAll(mapWork.getInputPaths());
        }
        return newHashSet;
    }
}
