package org.apache.iceberg.mr.mapreduce;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Scan;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.ThreadPools;

/* loaded from: input_file:org/apache/iceberg/mr/mapreduce/IcebergInputFormat.class */
public class IcebergInputFormat<T> extends InputFormat<Void, T> {
    public static InputFormatConfig.ConfigBuilder configure(Job job) {
        job.setInputFormatClass(IcebergInputFormat.class);
        return new InputFormatConfig.ConfigBuilder(job.getConfiguration());
    }

    private static TableScan createTableScan(Table table, Configuration configuration) {
        long snapshotId;
        TableScan newScan = table.newScan();
        try {
            snapshotId = configuration.getLong(InputFormatConfig.SNAPSHOT_ID, -1L);
        } catch (NumberFormatException e) {
            String str = configuration.get(InputFormatConfig.SNAPSHOT_ID);
            SnapshotRef snapshotRef = table.refs().get(str);
            if (snapshotRef == null) {
                throw new RuntimeException("Cannot find matching snapshot ID or reference name for version " + str);
            }
            snapshotId = snapshotRef.snapshotId();
        }
        String str2 = configuration.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
        if (StringUtils.isNotEmpty(str2)) {
            newScan = newScan.useRef(HiveUtils.getTableSnapshotRef(str2));
        }
        if (snapshotId != -1) {
            newScan = newScan.useSnapshot(snapshotId);
        }
        long j = configuration.getLong(InputFormatConfig.AS_OF_TIMESTAMP, -1L);
        if (j != -1) {
            newScan = newScan.asOfTime(j);
        }
        return newScan;
    }

    private static IncrementalAppendScan createIncrementalAppendScan(Table table, Configuration configuration) {
        return table.newIncrementalAppendScan().fromSnapshotExclusive(configuration.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1L));
    }

    private static <T extends Scan<T, FileScanTask, CombinedScanTask>> Scan<T, FileScanTask, CombinedScanTask> applyConfig(Configuration configuration, Scan<T, FileScanTask, CombinedScanTask> scan) {
        Scan<T, FileScanTask, CombinedScanTask> scan2 = (Scan) scan.caseSensitive(configuration.getBoolean(InputFormatConfig.CASE_SENSITIVE, true));
        long j = configuration.getLong(InputFormatConfig.SPLIT_SIZE, 0L);
        if (j > 0) {
            scan2 = (Scan) scan2.option(TableProperties.SPLIT_SIZE, String.valueOf(j));
        }
        MapWork findMapWork = LlapHiveUtils.findMapWork((JobConf) configuration);
        if (findMapWork != null && findMapWork.getCacheAffinity()) {
            scan2 = (Scan) scan2.option("read.split.open-file-cost", String.valueOf(Long.valueOf(j > 0 ? j : 134217728L)));
        }
        Schema readSchema = InputFormatConfig.readSchema(configuration);
        if (readSchema != null) {
            scan2 = (Scan) scan2.project(readSchema);
        } else {
            String[] selectedColumns = InputFormatConfig.selectedColumns(configuration);
            if (selectedColumns != null) {
                scan2 = (Scan) scan2.select(selectedColumns);
            }
        }
        Expression expression = (Expression) SerializationUtil.deserializeFromBase64(configuration.get(InputFormatConfig.FILTER_EXPRESSION));
        if (expression != null) {
            scan2 = (Scan) ((Scan) scan2.filter(expression)).ignoreResiduals();
        }
        return scan2;
    }

    public List<InputSplit> getSplits(JobContext jobContext) {
        Configuration configuration = jobContext.getConfiguration();
        Table table = (Table) Optional.ofNullable(HiveIcebergStorageHandler.table(configuration, configuration.get(InputFormatConfig.TABLE_IDENTIFIER))).orElseGet(() -> {
            Table loadTable = Catalogs.loadTable(configuration);
            configuration.set(InputFormatConfig.TABLE_IDENTIFIER, loadTable.name());
            configuration.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + loadTable.name(), SerializationUtil.serializeToBase64(loadTable));
            return loadTable;
        });
        ExecutorService newWorkerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", configuration.getInt(SystemConfigs.WORKER_THREAD_POOL_SIZE.propertyKey(), ThreadPools.WORKER_THREAD_POOL_SIZE));
        try {
            List<InputSplit> planInputSplits = planInputSplits(table, configuration, newWorkerPool);
            newWorkerPool.shutdown();
            return planInputSplits;
        } catch (Throwable th) {
            newWorkerPool.shutdown();
            throw th;
        }
    }

    private List<InputSplit> planInputSplits(Table table, Configuration configuration, ExecutorService executorService) {
        ArrayList newArrayList = Lists.newArrayList();
        boolean z = !configuration.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
        InputFormatConfig.InMemoryDataModel inMemoryDataModel = (InputFormatConfig.InMemoryDataModel) configuration.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, InputFormatConfig.InMemoryDataModel.GENERIC);
        Scan scan = (Scan) (configuration.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1L) != -1 ? applyConfig(configuration, createIncrementalAppendScan(table, configuration)) : applyConfig(configuration, createTableScan(table, configuration))).planWith(executorService);
        boolean z2 = configuration.getBoolean(HiveConf.ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.varname, HiveConf.ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.defaultBoolVal);
        Path path = new Path(configuration.get(InputFormatConfig.TABLE_LOCATION));
        generateInputSplits(scan, table, configuration.getStrings(InputFormatConfig.GROUPING_PARTITION_COLUMNS), scanTaskGroup -> {
            if (z && (inMemoryDataModel == InputFormatConfig.InMemoryDataModel.HIVE || inMemoryDataModel == InputFormatConfig.InMemoryDataModel.PIG)) {
                checkResiduals(scanTaskGroup);
            }
            if (z2) {
                validateFileLocations(scanTaskGroup, path);
            }
            newArrayList.add(new IcebergSplit(configuration, scanTaskGroup));
        });
        if (scan instanceof DataTableScan) {
            HiveIcebergStorageHandler.checkAndSkipIoConfigSerialization(configuration, table);
        }
        return newArrayList;
    }

    private static void validateFileLocations(ScanTaskGroup<FileScanTask> scanTaskGroup, Path path) {
        Iterator<FileScanTask> it = scanTaskGroup.tasks().iterator();
        while (it.hasNext()) {
            if (!FileUtils.isPathWithinSubtree(new Path(it.next().file().path().toString()), path)) {
                throw new AuthorizationException("The table contains paths which are outside the table location");
            }
        }
    }

    private static void generateInputSplits(Scan<?, FileScanTask, CombinedScanTask> scan, Table table, String[] strArr, Consumer<ScanTaskGroup<FileScanTask>> consumer) {
        if (strArr == null) {
            try {
                CloseableIterable<CombinedScanTask> planTasks = scan.planTasks();
                Throwable th = null;
                try {
                    try {
                        planTasks.forEach(consumer);
                        if (planTasks != null) {
                            if (0 != 0) {
                                try {
                                    planTasks.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                planTasks.close();
                            }
                        }
                        return;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
            }
        }
        Types.StructType groupingKeyType = Partitioning.groupingKeyType(table.schema().select(strArr), table.specs().values());
        try {
            CloseableIterable<FileScanTask> planFiles = scan.planFiles();
            Throwable th4 = null;
            try {
                try {
                    TableScanUtil.planTaskGroups(Lists.newArrayList(planFiles), scan.targetSplitSize(), scan.splitLookback(), scan.splitOpenFileCost(), groupingKeyType).forEach(consumer);
                    if (planFiles != null) {
                        if (0 != 0) {
                            try {
                                planFiles.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        } else {
                            planFiles.close();
                        }
                    }
                } catch (Throwable th6) {
                    th4 = th6;
                    throw th6;
                }
            } finally {
            }
        } catch (IOException e2) {
            throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e2);
        }
    }

    private static void checkResiduals(ScanTaskGroup<FileScanTask> scanTaskGroup) {
        scanTaskGroup.tasks().forEach(fileScanTask -> {
            Expression residual = fileScanTask.residual();
            if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
                throw new UnsupportedOperationException(String.format("Filter expression %s is not completely satisfied. Additional rows can be returned not satisfied by the filter expression", residual));
            }
        });
    }

    public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
        return inputSplit instanceof IcebergMergeSplit ? new IcebergMergeRecordReader() : new IcebergRecordReader();
    }
}
