package org.apache.hadoop.fs.azurebfs.commit;

import java.io.File;
import java.io.FileNotFoundException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.examples.terasort.TeraGen;
import org.apache.hadoop.examples.terasort.TeraSort;
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
import org.apache.hadoop.examples.terasort.TeraValidate;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.ITestWasbRemoteCallHelper;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.junit.Assume;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.class */
public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
    public static final int EXPECTED_PARTITION_COUNT = 10;
    public static final int PARTITION_SAMPLE_SIZE = 1000;
    public static final int ROW_COUNT = 1000;
    private Path terasortPath;
    private Path sortInput;
    private Path sortOutput;
    private Path sortValidate;
    private static final Logger LOG = LoggerFactory.getLogger(ITestAbfsTerasort.class);
    private static final Path TERASORT_PATH = new Path("/ITestAbfsTerasort");
    private static Optional<DurationInfo> terasortDuration = Optional.empty();
    private static final Map<String, DurationInfo> COMPLETED_STAGES = new HashMap();
    protected static final IOStatisticsSnapshot JOB_IOSTATS = IOStatisticsSupport.snapshotIOStatistics();
    private static final Map<String, ManifestSuccessData> SUCCESS_FILES = new HashMap();

    @Override // org.apache.hadoop.fs.azurebfs.commit.AbstractAbfsClusterITest
    public void setup() throws Exception {
        super.setup();
        prepareToTerasort();
    }

    @Override // org.apache.hadoop.fs.azurebfs.commit.AbstractAbfsClusterITest
    protected void applyCustomConfigOptions(JobConf jobConf) {
        jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), getSampleSizeForEachPartition());
        jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), getExpectedPartitionCount());
        jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
    }

    private int getExpectedPartitionCount() {
        return 10;
    }

    private int getSampleSizeForEachPartition() {
        return 1000;
    }

    protected int getRowCount() {
        return 1000;
    }

    private void prepareToTerasort() {
        this.terasortPath = getFileSystem().makeQualified(TERASORT_PATH);
        this.sortInput = new Path(this.terasortPath, "sortin");
        this.sortOutput = new Path(this.terasortPath, "sortout");
        this.sortValidate = new Path(this.terasortPath, "validate");
    }

    private static void completedStage(String str, DurationInfo durationInfo) {
        COMPLETED_STAGES.put(str, durationInfo);
    }

    private static void requireStage(String str) {
        Assume.assumeTrue("Required stage was not completed: " + str, COMPLETED_STAGES.get(str) != null);
    }

    private ManifestSuccessData executeStage(String str, JobConf jobConf, Path path, Tool tool, String[] strArr, int i) throws Exception {
        DurationInfo durationInfo = new DurationInfo(LOG, str, new Object[0]);
        try {
            int run = ToolRunner.run(jobConf, tool, strArr);
            durationInfo.close();
            dumpOutputTree(path);
            assertEquals(str + "(" + StringUtils.join(", ", strArr) + ") failed", 0L, run);
            ManifestSuccessData validateSuccessFile = ManifestCommitterTestSupport.validateSuccessFile(getFileSystem(), path, i, ITestWasbRemoteCallHelper.EMPTY_STRING);
            IOStatisticsSnapshot iOStatistics = validateSuccessFile.getIOStatistics();
            JOB_IOSTATS.aggregate(iOStatistics);
            SUCCESS_FILES.put(str, validateSuccessFile);
            completedStage(str, durationInfo);
            ManifestCommitterTestSupport.assertNoFailureStatistics(iOStatistics, new String[]{str, "task_stage_save_task_manifest", "op_rename"});
            return validateSuccessFile;
        } catch (Throwable th) {
            durationInfo.close();
            throw th;
        }
    }

    @Test
    public void test_100_terasort_setup() throws Throwable {
        describe("Setting up for a terasort");
        getFileSystem().delete(this.terasortPath, true);
        terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort", new Object[0]));
    }

    @Test
    public void test_110_teragen() throws Throwable {
        describe("Teragen to %s", new Object[]{this.sortInput});
        getFileSystem().delete(this.sortInput, true);
        JobConf newJobConf = newJobConf();
        patchConfigurationForCommitter(newJobConf);
        executeStage("teragen", newJobConf, this.sortInput, new TeraGen(), new String[]{Integer.toString(getRowCount()), this.sortInput.toString()}, 1);
    }

    @Test
    public void test_120_terasort() throws Throwable {
        describe("Terasort from %s to %s", new Object[]{this.sortInput, this.sortOutput});
        requireStage("teragen");
        getFileSystem().delete(this.sortOutput, true);
        ManifestCommitterTestSupport.loadSuccessFile(getFileSystem(), this.sortInput);
        JobConf newJobConf = newJobConf();
        patchConfigurationForCommitter(newJobConf);
        executeStage("terasort", newJobConf, this.sortOutput, new TeraSort(), new String[]{this.sortInput.toString(), this.sortOutput.toString()}, 1);
    }

    @Test
    public void test_130_teravalidate() throws Throwable {
        describe("TeraValidate from %s to %s", new Object[]{this.sortOutput, this.sortValidate});
        requireStage("terasort");
        getFileSystem().delete(this.sortValidate, true);
        ManifestCommitterTestSupport.loadSuccessFile(getFileSystem(), this.sortOutput);
        JobConf newJobConf = newJobConf();
        patchConfigurationForCommitter(newJobConf);
        executeStage("teravalidate", newJobConf, this.sortValidate, new TeraValidate(), new String[]{this.sortOutput.toString(), this.sortValidate.toString()}, 1);
    }

    @Test
    public void test_140_teracomplete() throws Throwable {
        terasortDuration.ifPresent(durationInfo -> {
            durationInfo.close();
            completedStage("overall", durationInfo);
        });
        IOStatisticsLogging.logIOStatisticsAtLevel(LOG, "info", JOB_IOSTATS);
        StringBuilder sb = new StringBuilder();
        sb.append("\"Operation\"\t\"Duration\"\n");
        Consumer consumer = str -> {
            String str = (DurationInfo) COMPLETED_STAGES.get(str);
            Object[] objArr = new Object[2];
            objArr[0] = str;
            objArr[1] = str == null ? ITestWasbRemoteCallHelper.EMPTY_STRING : str;
            sb.append(String.format("\"%s\"\t\"%s\"\n", objArr));
        };
        consumer.accept("teragen");
        consumer.accept("terasort");
        consumer.accept("teravalidate");
        consumer.accept("overall");
        String sb2 = sb.toString();
        File createTempFile = File.createTempFile("results", ".csv");
        FileUtils.write(createTempFile, sb2, StandardCharsets.UTF_8);
        LOG.info("Results are in {}\n{}", createTempFile, sb2);
        LOG.info("Report directory {}", getReportDir());
    }

    @Test
    public void test_150_teracleanup() throws Throwable {
        terasortDuration = Optional.empty();
    }

    @Test
    public void test_200_directory_deletion() throws Throwable {
        getFileSystem().delete(this.terasortPath, true);
    }

    protected void dumpOutputTree(Path path) throws Exception {
        LOG.info("Files under output directory {}", path);
        try {
            RemoteIterators.foreach(getFileSystem().listFiles(path, true), locatedFileStatus -> {
                LOG.info("{}", locatedFileStatus);
            });
        } catch (FileNotFoundException e) {
            LOG.info("Output directory {} not found", path);
        }
    }
}
