package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.class */
public class DirectoryScanner implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DirectoryScanner.class);
    private static final int DEFAULT_MAP_SIZE = 32768;
    private final int reconcileBlocksBatchSize;
    private final long reconcileBlocksBatchInterval;
    private final FsDatasetSpi<?> dataset;
    private final ExecutorService reportCompileThreadPool;
    private final ScheduledExecutorService masterThread;
    private final long scanPeriodMsecs;
    private final long throttleLimitMsPerSec;
    private final AtomicBoolean shouldRun = new AtomicBoolean();
    private boolean retainDiffs = false;

    @VisibleForTesting
    final AtomicLong timeRunningMs = new AtomicLong(0);

    @VisibleForTesting
    final AtomicLong timeWaitingMs = new AtomicLong(0);

    @VisibleForTesting
    final BlockPoolReport diffs = new BlockPoolReport();

    @VisibleForTesting
    final Map<String, Stats> stats = new HashMap(32768);

    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DirectoryScanner$BlockPoolReport.class */
    public static class BlockPoolReport {
        private static final long serialVersionUID = 1;
        private final Set<String> blockPools;
        private final ListMultimap<String, FsVolumeSpi.ScanInfo> map;

        BlockPoolReport() {
            this.blockPools = new HashSet(2);
            this.map = ArrayListMultimap.create(2, 32768);
        }

        BlockPoolReport(Collection<String> collection) {
            this.blockPools = new HashSet(collection);
            this.map = ArrayListMultimap.create(collection.size(), 32768);
        }

        public void addAll(String str, Collection<FsVolumeSpi.ScanInfo> collection) {
            this.blockPools.add(str);
            this.map.putAll(str, collection);
        }

        public void sortBlocks() {
            Iterator it = this.map.keySet().iterator();
            while (it.hasNext()) {
                Collections.sort(this.map.get((String) it.next()));
            }
        }

        public Set<String> getBlockPoolIds() {
            return Collections.unmodifiableSet(this.blockPools);
        }

        public List<FsVolumeSpi.ScanInfo> getScanInfo(String str) {
            return this.map.get(str);
        }

        public Collection<Map.Entry<String, FsVolumeSpi.ScanInfo>> getEntries() {
            return Collections.unmodifiableCollection(this.map.entries());
        }

        public void clear() {
            this.map.clear();
            this.blockPools.clear();
        }

        public String toString() {
            return "BlockPoolReport [blockPools=" + this.blockPools + ", map=" + this.map + "]";
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DirectoryScanner$ReportCompiler.class */
    public class ReportCompiler implements Callable<ScanInfoVolumeReport> {
        private final FsVolumeSpi volume;
        private final StopWatch throttleTimer = new StopWatch();
        private final StopWatch perfTimer = new StopWatch();

        public ReportCompiler(FsVolumeSpi fsVolumeSpi) {
            this.volume = fsVolumeSpi;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public ScanInfoVolumeReport call() throws IOException {
            String[] blockPoolList = this.volume.getBlockPoolList();
            ScanInfoVolumeReport scanInfoVolumeReport = new ScanInfoVolumeReport(this.volume, Arrays.asList(blockPoolList));
            this.perfTimer.start();
            this.throttleTimer.start();
            for (String str : blockPoolList) {
                ArrayList arrayList = new ArrayList(32768);
                this.perfTimer.reset().start();
                this.throttleTimer.reset().start();
                try {
                    this.volume.compileReport(str, arrayList, this);
                    scanInfoVolumeReport.addAll(str, arrayList);
                } catch (InterruptedException e) {
                    scanInfoVolumeReport = null;
                }
            }
            DirectoryScanner.LOG.trace("Scanner volume report: {}", scanInfoVolumeReport);
            return scanInfoVolumeReport;
        }

        public void throttle() throws InterruptedException {
            long j;
            accumulateTimeRunning();
            if (DirectoryScanner.this.throttleLimitMsPerSec > 0) {
                long now = this.throttleTimer.now(TimeUnit.MILLISECONDS);
                if (now >= DirectoryScanner.this.throttleLimitMsPerSec) {
                    if (now >= 1000) {
                        DirectoryScanner.LOG.warn("Unable to throttle within the second. Blocking for 1s.");
                        j = 1000;
                    } else {
                        j = (1000 - DirectoryScanner.this.throttleLimitMsPerSec) + (now - DirectoryScanner.this.throttleLimitMsPerSec);
                    }
                    Thread.sleep(j);
                    this.throttleTimer.reset().start();
                }
                accumulateTimeWaiting();
            }
        }

        private void accumulateTimeRunning() {
            DirectoryScanner.this.timeRunningMs.getAndAdd(this.perfTimer.now(TimeUnit.MILLISECONDS));
            this.perfTimer.reset().start();
        }

        private void accumulateTimeWaiting() {
            DirectoryScanner.this.timeWaitingMs.getAndAdd(this.perfTimer.now(TimeUnit.MILLISECONDS));
            this.perfTimer.reset().start();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DirectoryScanner$ScanInfoVolumeReport.class */
    public static class ScanInfoVolumeReport {
        private static final long serialVersionUID = 1;
        private final FsVolumeSpi volume;
        private final BlockPoolReport blockPoolReport;

        ScanInfoVolumeReport(FsVolumeSpi fsVolumeSpi) {
            this.volume = fsVolumeSpi;
            this.blockPoolReport = new BlockPoolReport();
        }

        ScanInfoVolumeReport(FsVolumeSpi fsVolumeSpi, Collection<String> collection) {
            this.volume = fsVolumeSpi;
            this.blockPoolReport = new BlockPoolReport(collection);
        }

        public void addAll(String str, Collection<FsVolumeSpi.ScanInfo> collection) {
            this.blockPoolReport.addAll(str, collection);
        }

        public Set<String> getBlockPoolIds() {
            return this.blockPoolReport.getBlockPoolIds();
        }

        public List<FsVolumeSpi.ScanInfo> getScanInfo(String str) {
            return this.blockPoolReport.getScanInfo(str);
        }

        public FsVolumeSpi getVolume() {
            return this.volume;
        }

        public String toString() {
            return "ScanInfoVolumeReport [volume=" + this.volume + ", blockPoolReport=" + this.blockPoolReport + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DirectoryScanner$Stats.class */
    public static class Stats {
        final String bpid;
        long totalBlocks = 0;
        long missingMetaFile = 0;
        long missingBlockFile = 0;
        long missingMemoryBlocks = 0;
        long mismatchBlocks = 0;
        long duplicateBlocks = 0;

        public Stats(String str) {
            this.bpid = str;
        }

        public String toString() {
            return "BlockPool " + this.bpid + " Total blocks: " + this.totalBlocks + ", missing metadata files: " + this.missingMetaFile + ", missing block files: " + this.missingBlockFile + ", missing blocks in memory: " + this.missingMemoryBlocks + ", mismatched blocks: " + this.mismatchBlocks + ", duplicated blocks: " + this.duplicateBlocks;
        }
    }

    @VisibleForTesting
    public void setRetainDiffs(boolean z) {
        this.retainDiffs = z;
    }

    public DirectoryScanner(FsDatasetSpi<?> fsDatasetSpi, Configuration configuration) {
        this.dataset = fsDatasetSpi;
        this.scanPeriodMsecs = TimeUnit.SECONDS.toMillis((int) configuration.getTimeDuration(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 21600L, TimeUnit.SECONDS));
        int i = configuration.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, -1);
        if (i >= TimeUnit.SECONDS.toMillis(1L)) {
            LOG.warn("{} set to value above 1000 ms/sec. Assuming default value of {}", DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, -1);
            i = -1;
        }
        this.throttleLimitMsPerSec = i;
        this.reportCompileThreadPool = Executors.newFixedThreadPool(configuration.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1), new Daemon.DaemonFactory());
        this.masterThread = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new Daemon.DaemonFactory());
        int i2 = configuration.getInt(DFSConfigKeys.DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE, 1000);
        if (i2 <= 0) {
            LOG.warn("Invalid value configured for dfs.datanode.reconcile.blocks.batch.size, should be greater than 0, Using default.");
            i2 = 1000;
        }
        this.reconcileBlocksBatchSize = i2;
        long timeDuration = configuration.getTimeDuration(DFSConfigKeys.DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL, 2000L, TimeUnit.MILLISECONDS);
        if (timeDuration <= 0) {
            LOG.warn("Invalid value configured for dfs.datanode.reconcile.blocks.batch.interval, should be greater than 0, Using default.");
            timeDuration = 2000;
        }
        this.reconcileBlocksBatchInterval = timeDuration;
    }

    @VisibleForTesting
    public void start() {
        this.shouldRun.set(true);
        long nextLong = ThreadLocalRandom.current().nextLong(this.scanPeriodMsecs);
        LOG.info("Periodic Directory Tree Verification scan starting in {}ms with interval of {}ms and throttle limit of {}ms/s", new Object[]{Long.valueOf(nextLong), Long.valueOf(this.scanPeriodMsecs), Long.valueOf(this.throttleLimitMsPerSec)});
        this.masterThread.scheduleAtFixedRate(this, nextLong, this.scanPeriodMsecs, TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    boolean getRunStatus() {
        return this.shouldRun.get();
    }

    private void clear() {
        synchronized (this.diffs) {
            this.diffs.clear();
        }
        this.stats.clear();
    }

    @Override // java.lang.Runnable
    public void run() {
        if (!this.shouldRun.get()) {
            LOG.warn("This cycle terminating immediately because 'shouldRun' has been deactivated");
            return;
        }
        try {
            reconcile();
            this.dataset.setLastDirScannerFinishTime(System.currentTimeMillis());
        } catch (Error e) {
            LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", e);
            throw e;
        } catch (Exception e2) {
            LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        LOG.info("Shutdown has been called");
        if (!this.shouldRun.getAndSet(false)) {
            LOG.warn("Shutdown has been called, but periodic scanner not started");
        }
        if (this.masterThread != null) {
            this.masterThread.shutdown();
        }
        if (this.reportCompileThreadPool != null) {
            this.reportCompileThreadPool.shutdownNow();
        }
        if (this.masterThread != null) {
            try {
                this.masterThread.awaitTermination(1L, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                LOG.error("interrupted while waiting for masterThread to terminate", e);
            }
        }
        if (this.reportCompileThreadPool != null) {
            try {
                this.reportCompileThreadPool.awaitTermination(1L, TimeUnit.MINUTES);
            } catch (InterruptedException e2) {
                LOG.error("interrupted while waiting for reportCompileThreadPool to terminate", e2);
            }
        }
        if (this.retainDiffs) {
            return;
        }
        clear();
    }

    @VisibleForTesting
    public void reconcile() throws IOException {
        LOG.debug("reconcile start DirectoryScanning");
        scan();
        DataNodeFaultInjector.get().waitUntilStorageRemoved();
        int i = 0;
        synchronized (this.diffs) {
            for (Map.Entry<String, FsVolumeSpi.ScanInfo> entry : this.diffs.getEntries()) {
                this.dataset.checkAndUpdate(entry.getKey(), entry.getValue());
                if (i % this.reconcileBlocksBatchSize == 0) {
                    try {
                        Thread.sleep(this.reconcileBlocksBatchInterval);
                    } catch (InterruptedException e) {
                    }
                }
                i++;
            }
        }
        if (this.retainDiffs) {
            return;
        }
        clear();
    }

    private void scan() {
        BlockPoolReport blockPoolReport = new BlockPoolReport();
        clear();
        for (ScanInfoVolumeReport scanInfoVolumeReport : getVolumeReports()) {
            for (String str : scanInfoVolumeReport.getBlockPoolIds()) {
                blockPoolReport.addAll(str, scanInfoVolumeReport.getScanInfo(str));
            }
        }
        blockPoolReport.sortBlocks();
        DataNodeFaultInjector.get().delayDiffRecord();
        for (String str2 : blockPoolReport.getBlockPoolIds()) {
            List<FsVolumeSpi.ScanInfo> scanInfo = blockPoolReport.getScanInfo(str2);
            Stats stats = new Stats(str2);
            this.stats.put(str2, stats);
            ArrayList arrayList = new ArrayList();
            stats.totalBlocks = scanInfo.size();
            List<ReplicaInfo> finalizedBlocks = this.dataset.getFinalizedBlocks(str2);
            Collections.sort(finalizedBlocks);
            int i = 0;
            int i2 = 0;
            while (i2 < finalizedBlocks.size() && i < scanInfo.size()) {
                ReplicaInfo replicaInfo = finalizedBlocks.get(i2);
                FsVolumeSpi.ScanInfo scanInfo2 = scanInfo.get(i);
                if (scanInfo2.getBlockId() < replicaInfo.getBlockId()) {
                    if (!this.dataset.isDeletingBlock(str2, scanInfo2.getBlockId())) {
                        stats.missingMemoryBlocks++;
                        addDifference(arrayList, stats, scanInfo2);
                    }
                    i++;
                } else if (scanInfo2.getBlockId() > replicaInfo.getBlockId()) {
                    addDifference(arrayList, stats, replicaInfo.getBlockId(), scanInfo2.getVolume());
                    i2++;
                } else {
                    if (!(FileUtil.isRegularFile(scanInfo2.getBlockFile(), false) && FileUtil.isRegularFile(scanInfo2.getMetaFile(), false))) {
                        stats.mismatchBlocks++;
                        addDifference(arrayList, stats, scanInfo2);
                    } else if (scanInfo2.getBlockFile() == null) {
                        addDifference(arrayList, stats, scanInfo2);
                    } else if (scanInfo2.getGenStamp() != replicaInfo.getGenerationStamp() || scanInfo2.getBlockLength() != replicaInfo.getNumBytes()) {
                        stats.mismatchBlocks++;
                        addDifference(arrayList, stats, scanInfo2);
                    } else if (replicaInfo.compareWith(scanInfo2) != 0) {
                        stats.duplicateBlocks++;
                        addDifference(arrayList, stats, scanInfo2);
                    }
                    i++;
                    if (i >= scanInfo.size()) {
                        i2++;
                    } else if (scanInfo.get(i).getBlockId() != scanInfo2.getBlockId()) {
                        i2++;
                    }
                }
            }
            while (i2 < finalizedBlocks.size()) {
                int i3 = i2;
                i2++;
                ReplicaInfo replicaInfo2 = finalizedBlocks.get(i3);
                addDifference(arrayList, stats, replicaInfo2.getBlockId(), replicaInfo2.getVolume());
            }
            while (i < scanInfo.size()) {
                if (!this.dataset.isDeletingBlock(str2, scanInfo.get(i).getBlockId())) {
                    stats.missingMemoryBlocks++;
                    addDifference(arrayList, stats, scanInfo.get(i));
                }
                i++;
            }
            synchronized (this.diffs) {
                this.diffs.addAll(str2, arrayList);
            }
            LOG.info("Scan Results: {}", stats);
        }
    }

    private void addDifference(Collection<FsVolumeSpi.ScanInfo> collection, Stats stats, FsVolumeSpi.ScanInfo scanInfo) {
        stats.missingMetaFile += scanInfo.getMetaFile() == null ? 1L : 0L;
        stats.missingBlockFile += scanInfo.getBlockFile() == null ? 1L : 0L;
        collection.add(scanInfo);
    }

    private void addDifference(Collection<FsVolumeSpi.ScanInfo> collection, Stats stats, long j, FsVolumeSpi fsVolumeSpi) {
        stats.missingBlockFile++;
        stats.missingMetaFile++;
        collection.add(new FsVolumeSpi.ScanInfo(j, null, null, null, fsVolumeSpi));
    }

    @VisibleForTesting
    public Collection<ScanInfoVolumeReport> getVolumeReports() {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            FsDatasetSpi.FsVolumeReferences fsVolumeReferences = this.dataset.getFsVolumeReferences();
            Throwable th = null;
            try {
                try {
                    Iterator<FsVolumeSpi> it = fsVolumeReferences.iterator();
                    while (it.hasNext()) {
                        FsVolumeSpi next = it.next();
                        if (next.getStorageType() != StorageType.PROVIDED) {
                            arrayList2.add(this.reportCompileThreadPool.submit(new ReportCompiler(next)));
                        }
                    }
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        try {
                        } catch (Exception e) {
                            LOG.warn("Error compiling report. Continuing.", e);
                        }
                        if (!CollectionUtils.addIgnoreNull(arrayList, (ScanInfoVolumeReport) ((Future) it2.next()).get())) {
                            arrayList.clear();
                            break;
                        }
                    }
                    if (fsVolumeReferences != null) {
                        if (0 != 0) {
                            try {
                                fsVolumeReferences.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fsVolumeReferences.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e2) {
            LOG.error("Unexpected IOException by closing FsVolumeReference", e2);
        }
        return arrayList;
    }
}
