package org.apache.spark.network.shuffle;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.server.BlockPushNonFatalFailure;
import org.apache.spark.network.shuffle.ErrorHandler;
import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.shuffledb.DB;
import org.apache.spark.network.shuffledb.DBBackend;
import org.apache.spark.network.shuffledb.DBIterator;
import org.apache.spark.network.shuffledb.StoreVersion;
import org.apache.spark.network.util.DBProvider;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.annotations.VisibleForTesting;
import org.sparkproject.guava.base.Preconditions;
import org.sparkproject.guava.cache.CacheBuilder;
import org.sparkproject.guava.cache.CacheLoader;
import org.sparkproject.guava.cache.LoadingCache;
import org.sparkproject.guava.primitives.Ints;
import org.sparkproject.guava.primitives.Longs;

/* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver.class */
public class RemoteBlockPushResolver implements MergedShuffleFileManager {
    public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
    public static final String SHUFFLE_META_DELIMITER = ":";
    public static final String MERGE_DIR_KEY = "mergeDir";
    public static final String ATTEMPT_ID_KEY = "attemptId";
    private static final int UNDEFINED_ATTEMPT_ID = -1;
    public static final int DELETE_ALL_MERGED_SHUFFLE = -1;
    private static final String DB_KEY_DELIMITER = ";";
    private static final String APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX = "AppAttemptShuffleFinalized";
    private static final String APP_ATTEMPT_PATH_KEY_PREFIX = "AppAttemptPathInfo";

    @VisibleForTesting
    final ConcurrentMap<String, AppShuffleInfo> appsShuffleInfo = new ConcurrentHashMap();
    private final ExecutorService mergedShuffleCleaner = Executors.newSingleThreadExecutor(NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
    private final TransportConf conf;
    private final long cleanerShutdownTimeout;
    private final int minChunkSize;
    private final int ioExceptionsThresholdDuringMerge;
    private final LoadingCache<String, ShuffleIndexInformation> indexCache;
    private final PushMergeMetrics pushMergeMetrics;

    @VisibleForTesting
    final File recoveryFile;

    @VisibleForTesting
    final DB db;
    private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class);
    private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler();
    private static final ByteBuffer SUCCESS_RESPONSE = new BlockPushReturnCode(BlockPushNonFatalFailure.ReturnCode.SUCCESS.id(), "").toByteBuffer().asReadOnlyBuffer();
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppAttemptId.class */
    public static class AppAttemptId {
        public final String appId;
        public final int attemptId;

        @JsonCreator
        public AppAttemptId(@JsonProperty("appId") String str, @JsonProperty("attemptId") int i) {
            this.appId = str;
            this.attemptId = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AppAttemptId appAttemptId = (AppAttemptId) obj;
            return this.attemptId == appAttemptId.attemptId && Objects.equals(this.appId, appAttemptId.appId);
        }

        public int hashCode() {
            return Objects.hash(this.appId, Integer.valueOf(this.attemptId));
        }

        public String toString() {
            return String.format("Application %s_%s", this.appId, Integer.valueOf(this.attemptId));
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppAttemptShuffleMergeId.class */
    public static class AppAttemptShuffleMergeId {
        public final String appId;
        public final int attemptId;
        public final int shuffleId;
        public final int shuffleMergeId;

        @JsonCreator
        public AppAttemptShuffleMergeId(@JsonProperty("appId") String str, @JsonProperty("attemptId") int i, @JsonProperty("shuffleId") int i2, @JsonProperty("shuffleMergeId") int i3) {
            Preconditions.checkArgument(str != null, "app id is null");
            this.appId = str;
            this.attemptId = i;
            this.shuffleId = i2;
            this.shuffleMergeId = i3;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AppAttemptShuffleMergeId appAttemptShuffleMergeId = (AppAttemptShuffleMergeId) obj;
            return this.attemptId == appAttemptShuffleMergeId.attemptId && this.shuffleId == appAttemptShuffleMergeId.shuffleId && this.shuffleMergeId == appAttemptShuffleMergeId.shuffleMergeId && Objects.equals(this.appId, appAttemptShuffleMergeId.appId);
        }

        public int hashCode() {
            return Objects.hash(this.appId, Integer.valueOf(this.attemptId), Integer.valueOf(this.shuffleId), Integer.valueOf(this.shuffleMergeId));
        }

        public String toString() {
            return String.format("Application %s_%s shuffleId %s shuffleMergeId %s", this.appId, Integer.valueOf(this.attemptId), Integer.valueOf(this.shuffleId), Integer.valueOf(this.shuffleMergeId));
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppPathsInfo.class */
    public static class AppPathsInfo {

        @JsonProperty("activeLocalDirs")
        @JsonFormat(shape = JsonFormat.Shape.ARRAY)
        private final String[] activeLocalDirs;

        @JsonProperty("subDirsPerLocalDir")
        private final int subDirsPerLocalDir;

        @JsonCreator
        public AppPathsInfo(@JsonProperty("activeLocalDirs") @JsonFormat(shape = JsonFormat.Shape.ARRAY) String[] strArr, @JsonProperty("subDirsPerLocalDir") int i) {
            this.activeLocalDirs = strArr;
            this.subDirsPerLocalDir = i;
        }

        private AppPathsInfo(String str, String[] strArr, String str2, int i) {
            this.activeLocalDirs = (String[]) Arrays.stream(strArr).map(str3 -> {
                return Paths.get(str3, new String[0]).getParent().resolve(str2).toFile().getPath();
            }).toArray(i2 -> {
                return new String[i2];
            });
            this.subDirsPerLocalDir = i;
            if (RemoteBlockPushResolver.logger.isInfoEnabled()) {
                RemoteBlockPushResolver.logger.info("Updated active local dirs {} and sub dirs {} for application {}", new Object[]{Arrays.toString(this.activeLocalDirs), Integer.valueOf(i), str});
            }
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AppPathsInfo appPathsInfo = (AppPathsInfo) obj;
            return this.subDirsPerLocalDir == appPathsInfo.subDirsPerLocalDir && Arrays.equals(this.activeLocalDirs, appPathsInfo.activeLocalDirs);
        }

        public int hashCode() {
            return (Objects.hash(Integer.valueOf(this.subDirsPerLocalDir)) * 41) + Arrays.hashCode(this.activeLocalDirs);
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppShuffleInfo.class */
    public static class AppShuffleInfo {

        @VisibleForTesting
        final String appId;

        @VisibleForTesting
        final int attemptId;
        private final AppPathsInfo appPathsInfo;
        private final ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> shuffles = new ConcurrentHashMap();

        AppShuffleInfo(String str, int i, AppPathsInfo appPathsInfo) {
            this.appId = str;
            this.attemptId = i;
            this.appPathsInfo = appPathsInfo;
        }

        @VisibleForTesting
        public AppPathsInfo getAppPathsInfo() {
            return this.appPathsInfo;
        }

        @VisibleForTesting
        public ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> getShuffles() {
            return this.shuffles;
        }

        @VisibleForTesting
        String getFilePath(String str) {
            String filePath = ExecutorDiskUtils.getFilePath(this.appPathsInfo.activeLocalDirs, this.appPathsInfo.subDirsPerLocalDir, str);
            RemoteBlockPushResolver.logger.debug("Get merged file {}", filePath);
            return filePath;
        }

        private String generateFileName(String str, int i, int i2, int i3) {
            return String.format("%s_%s_%d_%d_%d", RemoteBlockPushResolver.MERGED_SHUFFLE_FILE_NAME_PREFIX, str, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
        }

        public File getMergedShuffleDataFile(int i, int i2, int i3) {
            return new File(getFilePath(String.format("%s.data", generateFileName(this.appId, i, i2, i3))));
        }

        public String getMergedShuffleIndexFilePath(int i, int i2, int i3) {
            return getFilePath(String.format("%s.index", generateFileName(this.appId, i, i2, i3)));
        }

        public File getMergedShuffleMetaFile(int i, int i2, int i3) {
            return new File(getFilePath(String.format("%s.meta", generateFileName(this.appId, i, i2, i3))));
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppShuffleMergePartitionsInfo.class */
    public static class AppShuffleMergePartitionsInfo {
        private static final Map<Integer, AppShufflePartitionInfo> SHUFFLE_FINALIZED_MARKER = Collections.emptyMap();
        private final int shuffleMergeId;
        private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
        private final AtomicReference<int[]> reduceIds = new AtomicReference<>(new int[0]);

        public AppShuffleMergePartitionsInfo(int i, boolean z) {
            this.shuffleMergeId = i;
            this.shuffleMergePartitions = z ? SHUFFLE_FINALIZED_MARKER : new ConcurrentHashMap<>();
        }

        @VisibleForTesting
        public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
            return this.shuffleMergePartitions;
        }

        public boolean isFinalized() {
            return this.shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
        }

        public void setReduceIds(int[] iArr) {
            this.reduceIds.set(iArr);
        }

        public int[] getReduceIds() {
            return this.reduceIds.get();
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppShufflePartitionInfo.class */
    public static class AppShufflePartitionInfo {
        private final AppAttemptShuffleMergeId appAttemptShuffleMergeId;
        private final int reduceId;
        private final File dataFile;
        public final FileChannel dataChannel;
        private final MergeShuffleFile indexFile;
        private final MergeShuffleFile metaFile;
        private long dataFilePos;
        private RoaringBitmap mapTracker;
        private long lastChunkOffset;
        private RoaringBitmap chunkTracker;
        private boolean indexMetaUpdateFailed;
        private int lastMergedMapIndex = -1;
        private int numIOExceptions = 0;
        private int currentMapIndex = -1;

        AppShufflePartitionInfo(AppAttemptShuffleMergeId appAttemptShuffleMergeId, int i, File file, MergeShuffleFile mergeShuffleFile, MergeShuffleFile mergeShuffleFile2) throws IOException {
            this.appAttemptShuffleMergeId = appAttemptShuffleMergeId;
            this.reduceId = i;
            this.dataChannel = new FileOutputStream(file).getChannel();
            this.dataFile = file;
            this.indexFile = mergeShuffleFile;
            this.metaFile = mergeShuffleFile2;
            updateChunkInfo(0L, -1);
            this.dataFilePos = 0L;
            this.mapTracker = new RoaringBitmap();
            this.chunkTracker = new RoaringBitmap();
        }

        public long getDataFilePos() {
            return this.dataFilePos;
        }

        public void setDataFilePos(long j) {
            RemoteBlockPushResolver.logger.trace("{} current pos {} update pos {}", new Object[]{this, Long.valueOf(this.dataFilePos), Long.valueOf(j)});
            this.dataFilePos = j;
        }

        int getCurrentMapIndex() {
            return this.currentMapIndex;
        }

        void setCurrentMapIndex(int i) {
            RemoteBlockPushResolver.logger.trace("{} mapIndex {} current mapIndex {}", new Object[]{this, Integer.valueOf(this.currentMapIndex), Integer.valueOf(i)});
            this.currentMapIndex = i;
        }

        long getLastChunkOffset() {
            return this.lastChunkOffset;
        }

        void blockMerged(int i) {
            RemoteBlockPushResolver.logger.debug("{} updated merging mapIndex {}", this, Integer.valueOf(i));
            this.mapTracker.add(i);
            this.chunkTracker.add(i);
            this.lastMergedMapIndex = i;
        }

        void resetChunkTracker() {
            this.chunkTracker.clear();
        }

        void updateChunkInfo(long j, int i) throws IOException {
            try {
                RemoteBlockPushResolver.logger.trace("{} index current {} updated {}", new Object[]{this, Long.valueOf(this.lastChunkOffset), Long.valueOf(j)});
                if (this.indexMetaUpdateFailed) {
                    this.indexFile.getChannel().position(this.indexFile.getPos());
                }
                this.indexFile.getDos().writeLong(j);
                writeChunkTracker(i);
                this.indexFile.updatePos(8L);
                this.lastChunkOffset = j;
                this.indexMetaUpdateFailed = false;
            } catch (IOException e) {
                RemoteBlockPushResolver.logger.warn("{} reduceId {} update to index/meta failed", this.appAttemptShuffleMergeId, Integer.valueOf(this.reduceId));
                this.indexMetaUpdateFailed = true;
                throw e;
            }
        }

        private void writeChunkTracker(int i) throws IOException {
            if (i == -1) {
                return;
            }
            this.chunkTracker.add(i);
            RemoteBlockPushResolver.logger.trace("{} mapIndex {} write chunk to meta file", this, Integer.valueOf(i));
            if (this.indexMetaUpdateFailed) {
                this.metaFile.getChannel().position(this.metaFile.getPos());
            }
            this.chunkTracker.serialize(this.metaFile.getDos());
            this.metaFile.updatePos(this.metaFile.getChannel().position() - this.metaFile.getPos());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void incrementIOExceptions() {
            this.numIOExceptions++;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean shouldAbort(int i) {
            return this.numIOExceptions > i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void finalizePartition() throws IOException {
            if (this.dataFilePos != this.lastChunkOffset) {
                try {
                    updateChunkInfo(this.dataFilePos, this.lastMergedMapIndex);
                } catch (IOException e) {
                }
            }
            RemoteBlockPushResolver.logger.trace("{} reduceId {} truncating files data {} index {} meta {}", new Object[]{this.appAttemptShuffleMergeId, Integer.valueOf(this.reduceId), Long.valueOf(this.lastChunkOffset), Long.valueOf(this.indexFile.getPos()), Long.valueOf(this.metaFile.getPos())});
            this.dataChannel.truncate(this.lastChunkOffset);
            this.indexFile.getChannel().truncate(this.indexFile.getPos());
            this.metaFile.getChannel().truncate(this.metaFile.getPos());
        }

        void closeAllFilesAndDeleteIfNeeded(boolean z) {
            try {
                if (this.dataChannel.isOpen()) {
                    this.dataChannel.close();
                }
                if (z) {
                    this.dataFile.delete();
                }
            } catch (IOException e) {
                RemoteBlockPushResolver.logger.warn("Error closing data channel for {} reduceId {}", this.appAttemptShuffleMergeId, Integer.valueOf(this.reduceId));
            }
            try {
                this.metaFile.close();
                if (z) {
                    this.metaFile.delete();
                }
            } catch (IOException e2) {
                RemoteBlockPushResolver.logger.warn("Error closing meta file for {} reduceId {}", this.appAttemptShuffleMergeId, Integer.valueOf(this.reduceId));
            }
            try {
                this.indexFile.close();
                if (z) {
                    this.indexFile.delete();
                }
            } catch (IOException e3) {
                RemoteBlockPushResolver.logger.warn("Error closing index file for {} reduceId {}", this.appAttemptShuffleMergeId, Integer.valueOf(this.reduceId));
            }
        }

        public String toString() {
            return String.format("Application %s_%s shuffleId %s shuffleMergeId %s reduceId %s", this.appAttemptShuffleMergeId.appId, Integer.valueOf(this.appAttemptShuffleMergeId.attemptId), Integer.valueOf(this.appAttemptShuffleMergeId.shuffleId), Integer.valueOf(this.appAttemptShuffleMergeId.shuffleMergeId), Integer.valueOf(this.reduceId));
        }

        protected void finalize() throws Throwable {
            closeAllFilesAndDeleteIfNeeded(false);
        }

        @VisibleForTesting
        MergeShuffleFile getIndexFile() {
            return this.indexFile;
        }

        @VisibleForTesting
        MergeShuffleFile getMetaFile() {
            return this.metaFile;
        }

        @VisibleForTesting
        FileChannel getDataChannel() {
            return this.dataChannel;
        }

        @VisibleForTesting
        public RoaringBitmap getMapTracker() {
            return this.mapTracker;
        }

        @VisibleForTesting
        int getNumIOExceptions() {
            return this.numIOExceptions;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$MergeShuffleFile.class */
    public static class MergeShuffleFile {
        private final FileChannel channel;
        private final DataOutputStream dos;
        private long pos;
        private File file;

        @VisibleForTesting
        MergeShuffleFile(File file) throws IOException {
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            this.channel = fileOutputStream.getChannel();
            this.dos = new DataOutputStream(fileOutputStream);
            this.file = file;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updatePos(long j) {
            this.pos += j;
        }

        void close() throws IOException {
            if (this.channel.isOpen()) {
                this.dos.close();
            }
        }

        void delete() throws IOException {
            try {
                if (null != this.file) {
                    this.file.delete();
                }
            } finally {
                this.file = null;
            }
        }

        @VisibleForTesting
        public DataOutputStream getDos() {
            return this.dos;
        }

        @VisibleForTesting
        FileChannel getChannel() {
            return this.channel;
        }

        @VisibleForTesting
        long getPos() {
            return this.pos;
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$PushBlockStreamCallback.class */
    static class PushBlockStreamCallback implements StreamCallbackWithID {
        private final RemoteBlockPushResolver mergeManager;
        private final AppShuffleInfo appShuffleInfo;
        private final String streamId;
        private final int mapIndex;
        private final AppShufflePartitionInfo partitionInfo;
        private int length;
        private boolean isWriting;
        private List<ByteBuffer> deferredBufs;
        private long receivedBytes;

        private PushBlockStreamCallback(RemoteBlockPushResolver remoteBlockPushResolver, AppShuffleInfo appShuffleInfo, String str, AppShufflePartitionInfo appShufflePartitionInfo, int i) {
            this.length = 0;
            this.isWriting = false;
            this.receivedBytes = 0L;
            Preconditions.checkArgument(remoteBlockPushResolver != null);
            this.mergeManager = remoteBlockPushResolver;
            Preconditions.checkArgument(appShuffleInfo != null);
            this.appShuffleInfo = appShuffleInfo;
            this.streamId = str;
            Preconditions.checkArgument(appShufflePartitionInfo != null);
            this.partitionInfo = appShufflePartitionInfo;
            this.mapIndex = i;
            abortIfNecessary();
        }

        public String getID() {
            return this.streamId;
        }

        public ByteBuffer getCompletionResponse() {
            return RemoteBlockPushResolver.SUCCESS_RESPONSE.duplicate();
        }

        private void writeBuf(ByteBuffer byteBuffer) throws IOException {
            while (byteBuffer.hasRemaining()) {
                long dataFilePos = this.partitionInfo.getDataFilePos() + this.length;
                RemoteBlockPushResolver.logger.debug("{} current pos {} updated pos {}", new Object[]{this.partitionInfo, Long.valueOf(this.partitionInfo.getDataFilePos()), Long.valueOf(dataFilePos)});
                int write = this.partitionInfo.dataChannel.write(byteBuffer, dataFilePos);
                this.length += write;
                this.mergeManager.pushMergeMetrics.blockBytesWritten.mark(write);
            }
        }

        private boolean allowedToWrite() {
            return this.partitionInfo.getCurrentMapIndex() < 0 || this.partitionInfo.getCurrentMapIndex() == this.mapIndex;
        }

        private boolean isDuplicateBlock() {
            return (this.partitionInfo.getCurrentMapIndex() == this.mapIndex && this.length == 0) || this.partitionInfo.mapTracker.contains(this.mapIndex);
        }

        private void writeDeferredBufs() throws IOException {
            long j = 0;
            Iterator<ByteBuffer> it = this.deferredBufs.iterator();
            while (it.hasNext()) {
                j += r0.limit();
                writeBuf(it.next());
                this.mergeManager.pushMergeMetrics.deferredBlocks.mark(-1L);
            }
            this.mergeManager.pushMergeMetrics.deferredBlockBytes.dec(j);
            this.deferredBufs = null;
        }

        private void freeDeferredBufs() {
            if (this.deferredBufs != null && !this.deferredBufs.isEmpty()) {
                long j = 0;
                Iterator<ByteBuffer> it = this.deferredBufs.iterator();
                while (it.hasNext()) {
                    j += it.next().limit();
                    this.mergeManager.pushMergeMetrics.deferredBlocks.mark(-1L);
                }
                this.mergeManager.pushMergeMetrics.deferredBlockBytes.dec(j);
            }
            this.deferredBufs = null;
        }

        private void abortIfNecessary() {
            if (this.partitionInfo.shouldAbort(this.mergeManager.ioExceptionsThresholdDuringMerge)) {
                freeDeferredBufs();
                throw new IllegalStateException(String.format("%s when merging %s", ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX, this.streamId));
            }
        }

        private void updateIgnoredBlockBytes() {
            if (this.receivedBytes > 0) {
                this.mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(this.receivedBytes);
                this.receivedBytes = 0L;
            }
        }

        private void incrementIOExceptionsAndAbortIfNecessary() {
            this.partitionInfo.incrementIOExceptions();
            abortIfNecessary();
        }

        private boolean isStale(AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo, int i) {
            return null == appShuffleMergePartitionsInfo || appShuffleMergePartitionsInfo.shuffleMergeId > i;
        }

        private boolean isTooLate(AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo, int i) {
            return null == appShuffleMergePartitionsInfo || appShuffleMergePartitionsInfo.isFinalized() || !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(Integer.valueOf(i));
        }

        public void onData(String str, ByteBuffer byteBuffer) throws IOException {
            this.receivedBytes += byteBuffer.remaining();
            synchronized (this.partitionInfo) {
                AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo = (AppShuffleMergePartitionsInfo) this.appShuffleInfo.shuffles.get(Integer.valueOf(this.partitionInfo.appAttemptShuffleMergeId.shuffleId));
                boolean isStale = isStale(appShuffleMergePartitionsInfo, this.partitionInfo.appAttemptShuffleMergeId.shuffleMergeId);
                boolean isTooLate = isTooLate(appShuffleMergePartitionsInfo, this.partitionInfo.reduceId);
                if (isStale || isTooLate) {
                    freeDeferredBufs();
                    if (isTooLate) {
                        this.mergeManager.pushMergeMetrics.lateBlockPushes.mark();
                    } else {
                        this.mergeManager.pushMergeMetrics.staleBlockPushes.mark();
                    }
                    return;
                }
                if (!allowedToWrite()) {
                    RemoteBlockPushResolver.logger.trace("{} onData deferred", this.partitionInfo);
                    if (this.deferredBufs == null) {
                        this.deferredBufs = new ArrayList();
                    }
                    int remaining = byteBuffer.remaining();
                    ByteBuffer allocate = ByteBuffer.allocate(remaining);
                    allocate.put(byteBuffer);
                    allocate.flip();
                    this.deferredBufs.add(allocate);
                    this.mergeManager.pushMergeMetrics.deferredBlockBytes.inc(remaining);
                    this.mergeManager.pushMergeMetrics.deferredBlocks.mark();
                } else {
                    if (isDuplicateBlock()) {
                        freeDeferredBufs();
                        return;
                    }
                    abortIfNecessary();
                    RemoteBlockPushResolver.logger.trace("{} onData writable", this.partitionInfo);
                    if (this.partitionInfo.getCurrentMapIndex() < 0) {
                        this.partitionInfo.setCurrentMapIndex(this.mapIndex);
                    }
                    this.isWriting = true;
                    try {
                        if (this.deferredBufs != null && !this.deferredBufs.isEmpty()) {
                            writeDeferredBufs();
                        }
                        writeBuf(byteBuffer);
                    } catch (IOException e) {
                        incrementIOExceptionsAndAbortIfNecessary();
                        throw e;
                    }
                }
            }
        }

        public void onComplete(String str) throws IOException {
            synchronized (this.partitionInfo) {
                RemoteBlockPushResolver.logger.trace("{} onComplete invoked", this.partitionInfo);
                AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo = (AppShuffleMergePartitionsInfo) this.appShuffleInfo.shuffles.get(Integer.valueOf(this.partitionInfo.appAttemptShuffleMergeId.shuffleId));
                if (isTooLate(appShuffleMergePartitionsInfo, this.partitionInfo.reduceId)) {
                    freeDeferredBufs();
                    this.mergeManager.pushMergeMetrics.lateBlockPushes.mark();
                    throw new BlockPushNonFatalFailure(new BlockPushReturnCode(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(), str).toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(str, BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH));
                }
                if (isStale(appShuffleMergePartitionsInfo, this.partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
                    freeDeferredBufs();
                    this.mergeManager.pushMergeMetrics.staleBlockPushes.mark();
                    throw new BlockPushNonFatalFailure(new BlockPushReturnCode(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), str).toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(str, BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH));
                }
                if (!allowedToWrite()) {
                    freeDeferredBufs();
                    this.mergeManager.pushMergeMetrics.blockAppendCollisions.mark();
                    throw new BlockPushNonFatalFailure(new BlockPushReturnCode(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), str).toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(str, BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED));
                }
                if (isDuplicateBlock()) {
                    freeDeferredBufs();
                    updateIgnoredBlockBytes();
                    return;
                }
                if (this.partitionInfo.getCurrentMapIndex() < 0) {
                    try {
                        if (this.deferredBufs != null && !this.deferredBufs.isEmpty()) {
                            abortIfNecessary();
                            this.isWriting = true;
                            writeDeferredBufs();
                        }
                    } catch (IOException e) {
                        incrementIOExceptionsAndAbortIfNecessary();
                        throw e;
                    }
                }
                long dataFilePos = this.partitionInfo.getDataFilePos() + this.length;
                boolean z = false;
                if (dataFilePos - this.partitionInfo.getLastChunkOffset() >= this.mergeManager.minChunkSize) {
                    try {
                        this.partitionInfo.updateChunkInfo(dataFilePos, this.mapIndex);
                        z = true;
                    } catch (IOException e2) {
                        incrementIOExceptionsAndAbortIfNecessary();
                    }
                }
                this.partitionInfo.setDataFilePos(dataFilePos);
                this.partitionInfo.setCurrentMapIndex(-1);
                this.partitionInfo.blockMerged(this.mapIndex);
                if (z) {
                    this.partitionInfo.resetChunkTracker();
                }
                this.isWriting = false;
            }
        }

        public void onFailure(String str, Throwable th) throws IOException {
            if (RemoteBlockPushResolver.ERROR_HANDLER.shouldLogError(th)) {
                RemoteBlockPushResolver.logger.error("Encountered issue when merging {}", str, th);
            } else {
                RemoteBlockPushResolver.logger.debug("Encountered issue when merging {}", str, th);
            }
            updateIgnoredBlockBytes();
            if (this.isWriting) {
                synchronized (this.partitionInfo) {
                    AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo = (AppShuffleMergePartitionsInfo) this.appShuffleInfo.shuffles.get(Integer.valueOf(this.partitionInfo.appAttemptShuffleMergeId.shuffleId));
                    if (!isTooLate(appShuffleMergePartitionsInfo, this.partitionInfo.reduceId) && !isStale(appShuffleMergePartitionsInfo, this.partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
                        RemoteBlockPushResolver.logger.debug("{} encountered failure", this.partitionInfo);
                        this.partitionInfo.setCurrentMapIndex(-1);
                    }
                }
            }
            this.isWriting = false;
        }

        @VisibleForTesting
        AppShufflePartitionInfo getPartitionInfo() {
            return this.partitionInfo;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$PushMergeMetrics.class */
    public static class PushMergeMetrics implements MetricSet {
        static final String BLOCK_APPEND_COLLISIONS_METRIC = "blockAppendCollisions";
        static final String LATE_BLOCK_PUSHES_METRIC = "lateBlockPushes";
        static final String BLOCK_BYTES_WRITTEN_METRIC = "blockBytesWritten";
        static final String DEFERRED_BLOCK_BYTES_METRIC = "deferredBlockBytes";
        static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
        static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes";
        static final String IGNORED_BLOCK_BYTES_METRIC = "ignoredBlockBytes";
        private final Map<String, Metric> allMetrics;
        private final Meter blockAppendCollisions;
        private final Meter lateBlockPushes;
        private final Meter blockBytesWritten;
        private final Counter deferredBlockBytes;
        private final Meter deferredBlocks;
        private final Meter staleBlockPushes;
        private final Meter ignoredBlockBytes;

        private PushMergeMetrics() {
            this.allMetrics = new HashMap();
            this.blockAppendCollisions = new Meter();
            this.allMetrics.put(BLOCK_APPEND_COLLISIONS_METRIC, this.blockAppendCollisions);
            this.lateBlockPushes = new Meter();
            this.allMetrics.put(LATE_BLOCK_PUSHES_METRIC, this.lateBlockPushes);
            this.blockBytesWritten = new Meter();
            this.allMetrics.put(BLOCK_BYTES_WRITTEN_METRIC, this.blockBytesWritten);
            this.deferredBlockBytes = new Counter();
            this.allMetrics.put(DEFERRED_BLOCK_BYTES_METRIC, this.deferredBlockBytes);
            this.deferredBlocks = new Meter();
            this.allMetrics.put(DEFERRED_BLOCKS_METRIC, this.deferredBlocks);
            this.staleBlockPushes = new Meter();
            this.allMetrics.put(STALE_BLOCK_PUSHES_METRIC, this.staleBlockPushes);
            this.ignoredBlockBytes = new Meter();
            this.allMetrics.put(IGNORED_BLOCK_BYTES_METRIC, this.ignoredBlockBytes);
        }

        public Map<String, Metric> getMetrics() {
            return this.allMetrics;
        }
    }

    public RemoteBlockPushResolver(TransportConf transportConf, File file) throws IOException {
        this.conf = transportConf;
        this.cleanerShutdownTimeout = transportConf.mergedShuffleCleanerShutdownTimeout();
        this.minChunkSize = transportConf.minChunkSizeInMergedShuffleFile();
        this.ioExceptionsThresholdDuringMerge = transportConf.ioExceptionsThresholdDuringMerge();
        this.indexCache = CacheBuilder.newBuilder().maximumWeight(transportConf.mergedIndexCacheSize()).weigher((str, shuffleIndexInformation) -> {
            return shuffleIndexInformation.getRetainedMemorySize();
        }).build(new CacheLoader<String, ShuffleIndexInformation>() { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolver.1
            public ShuffleIndexInformation load(String str2) throws IOException {
                return new ShuffleIndexInformation(str2);
            }
        });
        this.recoveryFile = file;
        DBBackend byName = DBBackend.byName(transportConf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name()));
        this.db = DBProvider.initDB(byName, this.recoveryFile, CURRENT_VERSION, mapper);
        if (this.db != null) {
            logger.info("Use {} as the implementation of {}", byName, Constants.SHUFFLE_SERVICE_DB_BACKEND);
            reloadAndCleanUpAppShuffleInfo(this.db);
        }
        this.pushMergeMetrics = new PushMergeMetrics();
    }

    @VisibleForTesting
    protected static ErrorHandler.BlockPushErrorHandler createErrorHandler() {
        return new ErrorHandler.BlockPushErrorHandler() { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolver.2
            @Override // org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler, org.apache.spark.network.shuffle.ErrorHandler
            public boolean shouldLogError(Throwable th) {
                return !(th instanceof BlockPushNonFatalFailure);
            }
        };
    }

    @VisibleForTesting
    protected AppShuffleInfo validateAndGetAppShuffleInfo(String str) {
        AppShuffleInfo appShuffleInfo = this.appsShuffleInfo.get(str);
        Preconditions.checkArgument(appShuffleInfo != null, "application " + str + " is not registered or NM was restarted.");
        return appShuffleInfo;
    }

    @VisibleForTesting
    AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(AppShuffleInfo appShuffleInfo, int i, int i2, int i3, String str) throws BlockPushNonFatalFailure {
        return (AppShufflePartitionInfo) ((AppShuffleMergePartitionsInfo) appShuffleInfo.shuffles.compute(Integer.valueOf(i), (num, appShuffleMergePartitionsInfo) -> {
            if (appShuffleMergePartitionsInfo == null) {
                logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a new shuffle merge metadata", new Object[]{appShuffleInfo.appId, Integer.valueOf(appShuffleInfo.attemptId), Integer.valueOf(i), Integer.valueOf(i2)});
                return new AppShuffleMergePartitionsInfo(i2, false);
            }
            int i4 = appShuffleMergePartitionsInfo.shuffleMergeId;
            if (i4 > i2) {
                throw new BlockPushNonFatalFailure(new BlockPushReturnCode(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), str).toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(str, BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH));
            }
            if (i4 >= i2) {
                if (appShuffleMergePartitionsInfo.isFinalized()) {
                    throw new BlockPushNonFatalFailure(new BlockPushReturnCode(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(), str).toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(str, BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH));
                }
                return appShuffleMergePartitionsInfo;
            }
            AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, i, i4);
            logger.info("{}: creating a new shuffle merge metadata since received shuffleMergeId {} is higher than latest shuffleMergeId {}", new Object[]{appAttemptShuffleMergeId, Integer.valueOf(i2), Integer.valueOf(i4)});
            submitCleanupTask(() -> {
                closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, appShuffleMergePartitionsInfo.shuffleMergePartitions);
            });
            return new AppShuffleMergePartitionsInfo(i2, false);
        })).shuffleMergePartitions.computeIfAbsent(Integer.valueOf(i3), num2 -> {
            File mergedShuffleDataFile = appShuffleInfo.getMergedShuffleDataFile(i, i2, i3);
            File file = new File(appShuffleInfo.getMergedShuffleIndexFilePath(i, i2, i3));
            File mergedShuffleMetaFile = appShuffleInfo.getMergedShuffleMetaFile(i, i2, i3);
            try {
                return newAppShufflePartitionInfo(appShuffleInfo, i, i2, i3, mergedShuffleDataFile, file, mergedShuffleMetaFile);
            } catch (IOException e) {
                logger.error("{} attempt {} shuffle {} shuffleMerge {}: cannot create merged shuffle partition with data file {}, index file {}, and meta file {}", new Object[]{appShuffleInfo.appId, Integer.valueOf(appShuffleInfo.attemptId), Integer.valueOf(i), Integer.valueOf(i2), mergedShuffleDataFile.getAbsolutePath(), file.getAbsolutePath(), mergedShuffleMetaFile.getAbsolutePath()});
                throw new RuntimeException(String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s shuffleMergeId %s reduceId %s", appShuffleInfo.appId, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)), e);
            }
        });
    }

    @VisibleForTesting
    AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleInfo appShuffleInfo, int i, int i2, int i3, File file, File file2, File file3) throws IOException {
        return new AppShufflePartitionInfo(new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, i, i2), i3, file, new MergeShuffleFile(file2), new MergeShuffleFile(file3));
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public MergedBlockMeta getMergedBlockMeta(String str, int i, int i2, int i3) {
        AppShuffleInfo validateAndGetAppShuffleInfo = validateAndGetAppShuffleInfo(str);
        AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo = (AppShuffleMergePartitionsInfo) validateAndGetAppShuffleInfo.shuffles.get(Integer.valueOf(i));
        if (null != appShuffleMergePartitionsInfo && appShuffleMergePartitionsInfo.shuffleMergeId > i2) {
            throw new RuntimeException(String.format("MergedBlockMeta fetch for shuffle %s with shuffleMergeId %s reduceId %s is %s", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), ErrorHandler.BlockFetchErrorHandler.STALE_SHUFFLE_BLOCK_FETCH));
        }
        File file = new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(i, i2, i3));
        if (!file.exists()) {
            throw new RuntimeException(String.format("Merged shuffle index file %s not found", file.getPath()));
        }
        int length = (((int) file.length()) / 8) - 1;
        File mergedShuffleMetaFile = validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(i, i2, i3);
        if (!mergedShuffleMetaFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle meta file %s not found", mergedShuffleMetaFile.getPath()));
        }
        FileSegmentManagedBuffer fileSegmentManagedBuffer = new FileSegmentManagedBuffer(this.conf, mergedShuffleMetaFile, 0L, mergedShuffleMetaFile.length());
        logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} num chunks {}", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(length)});
        return new MergedBlockMeta(length, fileSegmentManagedBuffer);
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public ManagedBuffer getMergedBlockData(String str, int i, int i2, int i3, int i4) {
        AppShuffleInfo validateAndGetAppShuffleInfo = validateAndGetAppShuffleInfo(str);
        AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo = (AppShuffleMergePartitionsInfo) validateAndGetAppShuffleInfo.shuffles.get(Integer.valueOf(i));
        if (null != appShuffleMergePartitionsInfo && appShuffleMergePartitionsInfo.shuffleMergeId > i2) {
            throw new RuntimeException(String.format("MergedBlockData fetch for shuffle %s with shuffleMergeId %s reduceId %s is %s", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), ErrorHandler.BlockFetchErrorHandler.STALE_SHUFFLE_BLOCK_FETCH));
        }
        File mergedShuffleDataFile = validateAndGetAppShuffleInfo.getMergedShuffleDataFile(i, i2, i3);
        if (!mergedShuffleDataFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle data file %s not found", mergedShuffleDataFile.getPath()));
        }
        String mergedShuffleIndexFilePath = validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(i, i2, i3);
        try {
            ShuffleIndexRecord index = ((ShuffleIndexInformation) this.indexCache.get(mergedShuffleIndexFilePath)).getIndex(i4);
            return new FileSegmentManagedBuffer(this.conf, mergedShuffleDataFile, index.getOffset(), index.getLength());
        } catch (ExecutionException e) {
            throw new RuntimeException(String.format("Failed to open merged shuffle index file %s", mergedShuffleIndexFilePath), e);
        }
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public String[] getMergedBlockDirs(String str) {
        return validateAndGetAppShuffleInfo(str).appPathsInfo.activeLocalDirs;
    }

    private void removeOldApplicationAttemptsFromDb(AppShuffleInfo appShuffleInfo) {
        if (appShuffleInfo.attemptId != -1) {
            for (int i = 0; i < appShuffleInfo.attemptId; i++) {
                removeAppAttemptPathInfoFromDB(appShuffleInfo.appId, i);
            }
        }
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public void applicationRemoved(String str, boolean z) {
        logger.info("Application {} removed, cleanupLocalDirs = {}", str, Boolean.valueOf(z));
        AtomicReference atomicReference = new AtomicReference(null);
        this.appsShuffleInfo.compute(str, (str2, appShuffleInfo) -> {
            if (null == appShuffleInfo) {
                return null;
            }
            removeAppAttemptPathInfoFromDB(appShuffleInfo.appId, appShuffleInfo.attemptId);
            removeOldApplicationAttemptsFromDb(appShuffleInfo);
            atomicReference.set(appShuffleInfo);
            return null;
        });
        AppShuffleInfo appShuffleInfo2 = (AppShuffleInfo) atomicReference.get();
        if (null != appShuffleInfo2) {
            submitCleanupTask(() -> {
                closeAndDeletePartitionsIfNeeded(appShuffleInfo2, z);
            });
        }
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) {
        AppShuffleInfo validateAndGetAppShuffleInfo = validateAndGetAppShuffleInfo(removeShuffleMerge.appId);
        if (validateAndGetAppShuffleInfo.attemptId != removeShuffleMerge.appAttemptId) {
            throw new IllegalArgumentException(String.format("The attempt id %s in this RemoveShuffleMerge message does not match with the current attempt id %s stored in shuffle service for application %s", Integer.valueOf(removeShuffleMerge.appAttemptId), Integer.valueOf(validateAndGetAppShuffleInfo.attemptId), removeShuffleMerge.appId));
        }
        validateAndGetAppShuffleInfo.shuffles.compute(Integer.valueOf(removeShuffleMerge.shuffleId), (num, appShuffleMergePartitionsInfo) -> {
            if (appShuffleMergePartitionsInfo == null) {
                if (removeShuffleMerge.shuffleMergeId == -1) {
                    return null;
                }
                writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(removeShuffleMerge.appId, removeShuffleMerge.appAttemptId, removeShuffleMerge.shuffleId, removeShuffleMerge.shuffleMergeId));
                return new AppShuffleMergePartitionsInfo(removeShuffleMerge.shuffleMergeId, true);
            }
            boolean z = removeShuffleMerge.shuffleMergeId == -1 || removeShuffleMerge.shuffleMergeId == appShuffleMergePartitionsInfo.shuffleMergeId;
            int i = removeShuffleMerge.shuffleMergeId != -1 ? removeShuffleMerge.shuffleMergeId : appShuffleMergePartitionsInfo.shuffleMergeId;
            if (!z && i <= appShuffleMergePartitionsInfo.shuffleMergeId) {
                throw new RuntimeException(String.format("Asked to remove old shuffle merged data for application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ", removeShuffleMerge.appId, Integer.valueOf(removeShuffleMerge.shuffleId), Integer.valueOf(i), Integer.valueOf(appShuffleMergePartitionsInfo.shuffleMergeId)));
            }
            AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(removeShuffleMerge.appId, removeShuffleMerge.appAttemptId, removeShuffleMerge.shuffleId, appShuffleMergePartitionsInfo.shuffleMergeId);
            if (appShuffleMergePartitionsInfo.isFinalized()) {
                submitCleanupTask(() -> {
                    deleteMergedFiles(appAttemptShuffleMergeId, validateAndGetAppShuffleInfo, appShuffleMergePartitionsInfo.getReduceIds(), false);
                });
            } else {
                submitCleanupTask(() -> {
                    closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, appShuffleMergePartitionsInfo.shuffleMergePartitions);
                });
            }
            writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(removeShuffleMerge.appId, removeShuffleMerge.appAttemptId, removeShuffleMerge.shuffleId, i));
            return new AppShuffleMergePartitionsInfo(i, true);
        });
    }

    @VisibleForTesting
    void closeAndDeletePartitionsIfNeeded(AppShuffleInfo appShuffleInfo, boolean z) {
        appShuffleInfo.shuffles.forEach((num, appShuffleMergePartitionsInfo) -> {
            appShuffleMergePartitionsInfo.shuffleMergePartitions.forEach((num, appShufflePartitionInfo) -> {
                synchronized (appShufflePartitionInfo) {
                    appShufflePartitionInfo.closeAllFilesAndDeleteIfNeeded(false);
                }
            });
        });
        if (z) {
            deleteExecutorDirs(appShuffleInfo);
        }
        removeAppShuffleInfoFromDB(appShuffleInfo);
    }

    @VisibleForTesting
    void removeAppAttemptPathInfoFromDB(String str, int i) {
        AppAttemptId appAttemptId = new AppAttemptId(str, i);
        if (this.db != null) {
            try {
                this.db.delete(getDbAppAttemptPathsKey(appAttemptId));
            } catch (Exception e) {
                logger.error("Failed to remove the application attempt {} local path in DB", appAttemptId, e);
            }
        }
    }

    @VisibleForTesting
    void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) {
        if (this.db != null) {
            appShuffleInfo.shuffles.forEach((num, appShuffleMergePartitionsInfo) -> {
                removeAppShufflePartitionInfoFromDB(new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, num.intValue(), appShuffleMergePartitionsInfo.shuffleMergeId));
            });
        }
    }

    @VisibleForTesting
    void closeAndDeleteOutdatedPartitions(AppAttemptShuffleMergeId appAttemptShuffleMergeId, Map<Integer, AppShufflePartitionInfo> map) {
        removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
        map.forEach((num, appShufflePartitionInfo) -> {
            synchronized (appShufflePartitionInfo) {
                appShufflePartitionInfo.closeAllFilesAndDeleteIfNeeded(true);
            }
        });
    }

    void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, AppShuffleInfo appShuffleInfo, int[] iArr, boolean z) {
        if (z) {
            removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
        }
        int i = appAttemptShuffleMergeId.shuffleId;
        int i2 = appAttemptShuffleMergeId.shuffleMergeId;
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        for (int i6 : iArr) {
            if (appShuffleInfo.getMergedShuffleDataFile(i, i2, i6).delete()) {
                i3++;
            }
            if (new File(appShuffleInfo.getMergedShuffleIndexFilePath(i, i2, i6)).delete()) {
                i4++;
            }
            if (appShuffleInfo.getMergedShuffleMetaFile(i, i2, i6).delete()) {
                i5++;
            }
        }
        logger.info("Delete {} data files, {} index files, {} meta files for {}", new Object[]{Integer.valueOf(i3), Integer.valueOf(i4), Integer.valueOf(i5), appAttemptShuffleMergeId});
    }

    void removeAppShufflePartitionInfoFromDB(AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
        if (this.db != null) {
            try {
                this.db.delete(getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
            } catch (Exception e) {
                logger.error("Error deleting {} from application shuffle merged partition info in DB", appAttemptShuffleMergeId, e);
            }
        }
    }

    @VisibleForTesting
    void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
        for (Path path : (Path[]) Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs).map(str -> {
            return Paths.get(str, new String[0]);
        }).toArray(i -> {
            return new Path[i];
        })) {
            try {
                if (Files.exists(path, new LinkOption[0])) {
                    JavaUtils.deleteRecursively(path.toFile());
                    logger.debug("Successfully cleaned up directory: {}", path);
                }
            } catch (Exception e) {
                logger.error("Failed to delete directory: {}", path, e);
            }
        }
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public MetricSet getMetrics() {
        return this.pushMergeMetrics;
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream pushBlockStream) {
        AppShufflePartitionInfo appShufflePartitionInfo;
        AppShuffleInfo validateAndGetAppShuffleInfo = validateAndGetAppShuffleInfo(pushBlockStream.appId);
        final String str = "shufflePush_" + pushBlockStream.shuffleId + "_" + pushBlockStream.shuffleMergeId + "_" + pushBlockStream.mapIndex + "_" + pushBlockStream.reduceId;
        if (validateAndGetAppShuffleInfo.attemptId != pushBlockStream.appAttemptId) {
            throw new BlockPushNonFatalFailure(new BlockPushReturnCode(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(), str).toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(str, BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH));
        }
        BlockPushNonFatalFailure blockPushNonFatalFailure = null;
        try {
            appShufflePartitionInfo = getOrCreateAppShufflePartitionInfo(validateAndGetAppShuffleInfo, pushBlockStream.shuffleId, pushBlockStream.shuffleMergeId, pushBlockStream.reduceId, str);
        } catch (BlockPushNonFatalFailure e) {
            appShufflePartitionInfo = null;
            blockPushNonFatalFailure = e;
        }
        AppShufflePartitionInfo appShufflePartitionInfo2 = blockPushNonFatalFailure != null ? null : appShufflePartitionInfo.mapTracker.contains(pushBlockStream.mapIndex) ? null : appShufflePartitionInfo;
        if (appShufflePartitionInfo2 != null) {
            return new PushBlockStreamCallback(validateAndGetAppShuffleInfo, str, appShufflePartitionInfo2, pushBlockStream.mapIndex);
        }
        this.pushMergeMetrics.lateBlockPushes.mark();
        final BlockPushNonFatalFailure blockPushNonFatalFailure2 = blockPushNonFatalFailure;
        return new StreamCallbackWithID() { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolver.3
            public String getID() {
                return str;
            }

            public void onData(String str2, ByteBuffer byteBuffer) {
                RemoteBlockPushResolver.this.pushMergeMetrics.ignoredBlockBytes.mark(byteBuffer.remaining());
            }

            public void onComplete(String str2) {
                if (blockPushNonFatalFailure2 != null) {
                    throw blockPushNonFatalFailure2;
                }
            }

            public void onFailure(String str2, Throwable th) {
            }

            public ByteBuffer getCompletionResponse() {
                return RemoteBlockPushResolver.SUCCESS_RESPONSE.duplicate();
            }
        };
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge finalizeShuffleMerge) {
        MergeStatuses mergeStatuses;
        logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalize shuffle merge", new Object[]{finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.appAttemptId), Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(finalizeShuffleMerge.shuffleMergeId)});
        AppShuffleInfo validateAndGetAppShuffleInfo = validateAndGetAppShuffleInfo(finalizeShuffleMerge.appId);
        if (validateAndGetAppShuffleInfo.attemptId != finalizeShuffleMerge.appAttemptId) {
            throw new IllegalArgumentException(String.format("The attempt id %s in this FinalizeShuffleMerge message does not match with the current attempt id %s stored in shuffle service for application %s", Integer.valueOf(finalizeShuffleMerge.appAttemptId), Integer.valueOf(validateAndGetAppShuffleInfo.attemptId), finalizeShuffleMerge.appId));
        }
        AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(finalizeShuffleMerge.appId, finalizeShuffleMerge.appAttemptId, finalizeShuffleMerge.shuffleId, finalizeShuffleMerge.shuffleMergeId);
        AtomicReference atomicReference = new AtomicReference(null);
        validateAndGetAppShuffleInfo.shuffles.compute(Integer.valueOf(finalizeShuffleMerge.shuffleId), (num, appShuffleMergePartitionsInfo) -> {
            if (null != appShuffleMergePartitionsInfo) {
                if (finalizeShuffleMerge.shuffleMergeId < appShuffleMergePartitionsInfo.shuffleMergeId || appShuffleMergePartitionsInfo.isFinalized()) {
                    throw new RuntimeException(String.format("Shuffle merge finalize request for shuffle %s with shuffleMergeId %s is %s", Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(finalizeShuffleMerge.shuffleMergeId), ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
                }
                if (finalizeShuffleMerge.shuffleMergeId > appShuffleMergePartitionsInfo.shuffleMergeId) {
                    AppAttemptShuffleMergeId appAttemptShuffleMergeId2 = new AppAttemptShuffleMergeId(finalizeShuffleMerge.appId, finalizeShuffleMerge.appAttemptId, finalizeShuffleMerge.shuffleId, appShuffleMergePartitionsInfo.shuffleMergeId);
                    submitCleanupTask(() -> {
                        closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId2, appShuffleMergePartitionsInfo.shuffleMergePartitions);
                    });
                } else {
                    atomicReference.set(appShuffleMergePartitionsInfo.shuffleMergePartitions);
                }
            }
            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
            return new AppShuffleMergePartitionsInfo(finalizeShuffleMerge.shuffleMergeId, true);
        });
        Map map = (Map) atomicReference.get();
        if (null == map || map.isEmpty()) {
            mergeStatuses = new MergeStatuses(finalizeShuffleMerge.shuffleId, finalizeShuffleMerge.shuffleMergeId, new RoaringBitmap[0], new int[0], new long[0]);
        } else {
            ArrayList arrayList = new ArrayList(map.size());
            ArrayList arrayList2 = new ArrayList(map.size());
            ArrayList arrayList3 = new ArrayList(map.size());
            for (AppShufflePartitionInfo appShufflePartitionInfo : map.values()) {
                synchronized (appShufflePartitionInfo) {
                    try {
                        try {
                            logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalizing shuffle partition {} ", new Object[]{finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.appAttemptId), Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(finalizeShuffleMerge.shuffleMergeId), Integer.valueOf(appShufflePartitionInfo.reduceId)});
                            appShufflePartitionInfo.finalizePartition();
                            if (appShufflePartitionInfo.mapTracker.getCardinality() > 0) {
                                arrayList.add(appShufflePartitionInfo.mapTracker);
                                arrayList2.add(Integer.valueOf(appShufflePartitionInfo.reduceId));
                                arrayList3.add(Long.valueOf(appShufflePartitionInfo.getLastChunkOffset()));
                                logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalization results added for partition {} data size {} index size {} meta size {}", new Object[]{finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.appAttemptId), Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(finalizeShuffleMerge.shuffleMergeId), Integer.valueOf(appShufflePartitionInfo.reduceId), Long.valueOf(appShufflePartitionInfo.getLastChunkOffset()), Long.valueOf(appShufflePartitionInfo.indexFile.getPos()), Long.valueOf(appShufflePartitionInfo.metaFile.getPos())});
                            }
                            appShufflePartitionInfo.closeAllFilesAndDeleteIfNeeded(false);
                        } finally {
                        }
                    } catch (IOException e) {
                        logger.warn("{} attempt {} shuffle {} shuffleMerge {}: exception while finalizing shuffle partition {}. Exception message: {}", new Object[]{finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.appAttemptId), Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(finalizeShuffleMerge.shuffleMergeId), Integer.valueOf(appShufflePartitionInfo.reduceId), e.getMessage()});
                        appShufflePartitionInfo.closeAllFilesAndDeleteIfNeeded(false);
                    }
                }
            }
            mergeStatuses = new MergeStatuses(finalizeShuffleMerge.shuffleId, finalizeShuffleMerge.shuffleMergeId, (RoaringBitmap[]) arrayList.toArray(new RoaringBitmap[arrayList.size()]), Ints.toArray(arrayList2), Longs.toArray(arrayList3));
            ((AppShuffleMergePartitionsInfo) validateAndGetAppShuffleInfo.shuffles.get(Integer.valueOf(finalizeShuffleMerge.shuffleId))).setReduceIds(Ints.toArray(arrayList2));
        }
        logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed", new Object[]{finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.appAttemptId), Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(finalizeShuffleMerge.shuffleMergeId)});
        return mergeStatuses;
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public void registerExecutor(String str, ExecutorShuffleInfo executorShuffleInfo) {
        if (logger.isDebugEnabled()) {
            logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} num sub-dirs {} shuffleManager {}", new Object[]{str, Arrays.toString(executorShuffleInfo.localDirs), Integer.valueOf(executorShuffleInfo.subDirsPerLocalDir), executorShuffleInfo.shuffleManager});
        }
        String str2 = executorShuffleInfo.shuffleManager;
        if (!str2.contains(SHUFFLE_META_DELIMITER)) {
            logger.warn("ExecutorShuffleInfo does not have the expected merge directory information");
            return;
        }
        try {
            Map map = (Map) new ObjectMapper().readValue(str2.substring(str2.indexOf(SHUFFLE_META_DELIMITER) + 1), new TypeReference<Map<String, String>>() { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolver.4
            });
            String str3 = (String) map.get(MERGE_DIR_KEY);
            int intValue = Integer.valueOf((String) map.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(-1))).intValue();
            if (str3 == null) {
                throw new IllegalArgumentException(String.format("Failed to get the merge directory information from the shuffleManagerMeta %s in executor registration message", str2));
            }
            if (intValue == -1) {
                this.appsShuffleInfo.computeIfAbsent(str, str4 -> {
                    AppPathsInfo appPathsInfo = new AppPathsInfo(str, executorShuffleInfo.localDirs, str3, executorShuffleInfo.subDirsPerLocalDir);
                    writeAppPathsInfoToDb(str, -1, appPathsInfo);
                    return new AppShuffleInfo(str, -1, appPathsInfo);
                });
            } else {
                AtomicReference atomicReference = new AtomicReference();
                this.appsShuffleInfo.compute(str, (str5, appShuffleInfo) -> {
                    if (appShuffleInfo == null || intValue > appShuffleInfo.attemptId) {
                        atomicReference.set(appShuffleInfo);
                        AppPathsInfo appPathsInfo = new AppPathsInfo(str, executorShuffleInfo.localDirs, str3, executorShuffleInfo.subDirsPerLocalDir);
                        if (appShuffleInfo != null) {
                            removeAppAttemptPathInfoFromDB(str, appShuffleInfo.attemptId);
                        }
                        writeAppPathsInfoToDb(str, intValue, appPathsInfo);
                        appShuffleInfo = new AppShuffleInfo(str, intValue, new AppPathsInfo(str, executorShuffleInfo.localDirs, str3, executorShuffleInfo.subDirsPerLocalDir));
                    }
                    return appShuffleInfo;
                });
                if (atomicReference.get() != null) {
                    AppShuffleInfo appShuffleInfo2 = (AppShuffleInfo) atomicReference.get();
                    logger.warn("Cleanup shuffle info and merged shuffle files for {}_{} as new application attempt registered", str, Integer.valueOf(appShuffleInfo2.attemptId));
                    submitCleanupTask(() -> {
                        closeAndDeletePartitionsIfNeeded(appShuffleInfo2, true);
                    });
                }
            }
        } catch (JsonProcessingException e) {
            logger.warn("Failed to get the merge directory information from ExecutorShuffleInfo: ", e);
        }
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public void close() {
        if (!this.mergedShuffleCleaner.isShutdown()) {
            try {
                this.mergedShuffleCleaner.shutdown();
                if (!this.mergedShuffleCleaner.awaitTermination(this.cleanerShutdownTimeout, TimeUnit.SECONDS)) {
                    shutdownMergedShuffleCleanerNow();
                }
            } catch (InterruptedException e) {
                logger.info("mergedShuffleCleaner is interrupted in the process of graceful shutdown", e);
                shutdownMergedShuffleCleanerNow();
                Thread.currentThread().interrupt();
            }
        }
        if (this.db != null) {
            try {
                this.db.close();
            } catch (IOException e2) {
                logger.error("Exception closing leveldb with registered app paths info and shuffle partition info", e2);
            }
        }
    }

    private void shutdownMergedShuffleCleanerNow() {
        try {
            logger.warn("There are still {} tasks not completed in mergedShuffleCleaner after {} seconds.", Integer.valueOf(this.mergedShuffleCleaner.shutdownNow().size()), Long.valueOf(this.cleanerShutdownTimeout));
            if (!this.mergedShuffleCleaner.awaitTermination(this.cleanerShutdownTimeout, TimeUnit.SECONDS)) {
                logger.warn("mergedShuffleCleaner did not terminate in {} seconds.", Long.valueOf(this.cleanerShutdownTimeout));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void writeAppPathsInfoToDb(String str, int i, AppPathsInfo appPathsInfo) {
        if (this.db != null) {
            AppAttemptId appAttemptId = new AppAttemptId(str, i);
            try {
                this.db.put(getDbAppAttemptPathsKey(appAttemptId), mapper.writeValueAsString(appPathsInfo).getBytes(StandardCharsets.UTF_8));
            } catch (Exception e) {
                logger.error("Error saving registered app paths info for {}", appAttemptId, e);
            }
        }
    }

    private void writeAppAttemptShuffleMergeInfoToDB(AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
        if (this.db != null) {
            try {
                this.db.put(getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId), new byte[0]);
            } catch (Exception e) {
                logger.error("Error saving active app shuffle partition {}", appAttemptShuffleMergeId, e);
            }
        }
    }

    private <T> T parseDbKey(String str, String str2, Class<T> cls) throws IOException {
        return (T) mapper.readValue(str.substring(str2.length() + 1), cls);
    }

    private AppAttemptId parseDbAppAttemptPathsKey(String str) throws IOException {
        return (AppAttemptId) parseDbKey(str, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
    }

    private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(String str) throws IOException {
        return (AppAttemptShuffleMergeId) parseDbKey(str, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, AppAttemptShuffleMergeId.class);
    }

    private byte[] getDbKey(Object obj, String str) throws IOException {
        return (str + DB_KEY_DELIMITER + mapper.writeValueAsString(obj)).getBytes(StandardCharsets.UTF_8);
    }

    private byte[] getDbAppAttemptShufflePartitionKey(AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
        return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
    }

    private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException {
        return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
    }

    @VisibleForTesting
    void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
        logger.info("Reload applications merged shuffle information from DB");
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(reloadActiveAppAttemptsPathInfo(db));
        arrayList.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
        removeOutdatedKeyValuesInDB(arrayList);
    }

    @VisibleForTesting
    List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
        ArrayList arrayList = new ArrayList();
        if (db != null) {
            DBIterator it = db.iterator();
            Throwable th = null;
            try {
                try {
                    it.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
                    while (it.hasNext()) {
                        Map.Entry entry = (Map.Entry) it.next();
                        String str = new String((byte[]) entry.getKey(), StandardCharsets.UTF_8);
                        if (!str.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
                            break;
                        }
                        AppAttemptId parseDbAppAttemptPathsKey = parseDbAppAttemptPathsKey(str);
                        AppPathsInfo appPathsInfo = (AppPathsInfo) mapper.readValue((byte[]) entry.getValue(), AppPathsInfo.class);
                        logger.debug("Reloading Application paths info for application {}", parseDbAppAttemptPathsKey);
                        this.appsShuffleInfo.compute(parseDbAppAttemptPathsKey.appId, (str2, appShuffleInfo) -> {
                            if (appShuffleInfo != null && appShuffleInfo.attemptId >= parseDbAppAttemptPathsKey.attemptId) {
                                arrayList.add(entry.getKey());
                                return appShuffleInfo;
                            }
                            if (appShuffleInfo != null) {
                                AppAttemptId appAttemptId = new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId);
                                try {
                                    arrayList.add(getDbAppAttemptPathsKey(appAttemptId));
                                } catch (IOException e) {
                                    logger.error("Failed to get the DB key for {}", appAttemptId, e);
                                }
                            }
                            return new AppShuffleInfo(parseDbAppAttemptPathsKey.appId, parseDbAppAttemptPathsKey.attemptId, appPathsInfo);
                        });
                    }
                    if (it != null) {
                        if (0 != 0) {
                            try {
                                it.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            it.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (it != null) {
                    if (th != null) {
                        try {
                            it.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        it.close();
                    }
                }
                throw th3;
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    List<byte[]> reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException {
        ArrayList arrayList = new ArrayList();
        if (db != null) {
            DBIterator it = db.iterator();
            Throwable th = null;
            try {
                try {
                    it.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
                    while (it.hasNext()) {
                        Map.Entry entry = (Map.Entry) it.next();
                        String str = new String((byte[]) entry.getKey(), StandardCharsets.UTF_8);
                        if (!str.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
                            break;
                        }
                        AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey = parseDbAppAttemptShufflePartitionKey(str);
                        logger.debug("Reloading finalized shuffle info for partitionId {}", parseDbAppAttemptShufflePartitionKey);
                        AppShuffleInfo appShuffleInfo = this.appsShuffleInfo.get(parseDbAppAttemptShufflePartitionKey.appId);
                        if (appShuffleInfo == null || appShuffleInfo.attemptId != parseDbAppAttemptShufflePartitionKey.attemptId) {
                            arrayList.add(entry.getKey());
                        } else {
                            appShuffleInfo.shuffles.compute(Integer.valueOf(parseDbAppAttemptShufflePartitionKey.shuffleId), (num, appShuffleMergePartitionsInfo) -> {
                                if (appShuffleMergePartitionsInfo != null && appShuffleMergePartitionsInfo.shuffleMergeId >= parseDbAppAttemptShufflePartitionKey.shuffleMergeId) {
                                    arrayList.add(entry.getKey());
                                    return appShuffleMergePartitionsInfo;
                                }
                                if (appShuffleMergePartitionsInfo != null) {
                                    AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, num.intValue(), appShuffleMergePartitionsInfo.shuffleMergeId);
                                    try {
                                        arrayList.add(getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
                                    } catch (Exception e) {
                                        logger.error("Error getting the DB key for {}", appAttemptShuffleMergeId, e);
                                    }
                                }
                                return new AppShuffleMergePartitionsInfo(parseDbAppAttemptShufflePartitionKey.shuffleMergeId, true);
                            });
                        }
                    }
                    if (it != null) {
                        if (0 != 0) {
                            try {
                                it.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            it.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (it != null) {
                    if (th != null) {
                        try {
                            it.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        it.close();
                    }
                }
                throw th3;
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    void removeOutdatedKeyValuesInDB(List<byte[]> list) {
        list.forEach(bArr -> {
            try {
                this.db.delete(bArr);
            } catch (Exception e) {
                logger.error("Error deleting dangling key {} in DB", bArr, e);
            }
        });
    }

    @VisibleForTesting
    void submitCleanupTask(Runnable runnable) {
        this.mergedShuffleCleaner.execute(runnable);
    }

    @VisibleForTesting
    boolean isCleanerShutdown() {
        return this.mergedShuffleCleaner.isShutdown();
    }
}
