package alluxio.job.plan.migrate;

import alluxio.AlluxioURI;
import alluxio.client.WriteType;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.FileAlreadyExistsException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.grpc.WritePType;
import alluxio.job.JobConfig;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.util.io.PathUtils;
import alluxio.wire.WorkerInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/job/plan/migrate/MigrateDefinition.class */
public final class MigrateDefinition extends AbstractVoidPlanDefinition<MigrateConfig, MigrateCommand> {
    private static final Logger LOG = LoggerFactory.getLogger(MigrateDefinition.class);
    private static final int JOBS_PER_WORKER = 10;
    private final Random mRandom = new Random();

    private void checkMigrateValid(MigrateConfig migrateConfig, FileSystem fileSystem) throws Exception {
        AlluxioURI alluxioURI = new AlluxioURI(migrateConfig.getSource());
        if (PathUtils.hasPrefix(new AlluxioURI(migrateConfig.getDestination()).toString(), alluxioURI.toString())) {
            throw new RuntimeException(ExceptionMessage.MIGRATE_CANNOT_BE_TO_SUBDIRECTORY.getMessage(new Object[]{alluxioURI, migrateConfig.getDestination()}));
        }
    }

    public Set<Pair<WorkerInfo, MigrateCommand>> selectExecutors(MigrateConfig migrateConfig, List<WorkerInfo> list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        AlluxioURI alluxioURI = new AlluxioURI(migrateConfig.getSource());
        AlluxioURI alluxioURI2 = new AlluxioURI(migrateConfig.getDestination());
        if (alluxioURI.equals(alluxioURI2)) {
            return Sets.newHashSet();
        }
        checkMigrateValid(migrateConfig, selectExecutorsContext.getFileSystem());
        Preconditions.checkState(!list.isEmpty(), "No workers are available");
        URIStatus status = selectExecutorsContext.getFileSystem().getStatus(alluxioURI);
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        for (WorkerInfo workerInfo : list) {
            newConcurrentMap.put(workerInfo.getAddress().getHost(), workerInfo);
        }
        List<BlockWorkerInfo> cachedWorkers = selectExecutorsContext.getFsContext().getCachedWorkers();
        if (status.isFolder()) {
            throw new RuntimeException("Cannot migrate directory");
        }
        WorkerInfo bestJobWorker = getBestJobWorker(status, cachedWorkers, list, newConcurrentMap);
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(new Pair(bestJobWorker, new MigrateCommand(status.getPath(), alluxioURI2.getPath())));
        return newHashSet;
    }

    private WorkerInfo getBestJobWorker(URIStatus uRIStatus, List<BlockWorkerInfo> list, List<WorkerInfo> list2, Map<String, WorkerInfo> map) {
        BlockWorkerInfo workerWithMostBlocks = JobUtils.getWorkerWithMostBlocks(list, uRIStatus.getFileBlockInfos());
        if (workerWithMostBlocks == null) {
            workerWithMostBlocks = list.get(this.mRandom.nextInt(list2.size()));
        }
        WorkerInfo workerInfo = map.get(workerWithMostBlocks.getNetAddress().getHost());
        return workerInfo == null ? list2.get(new Random().nextInt(list2.size())) : workerInfo;
    }

    @Override // alluxio.job.plan.PlanDefinition
    public SerializableVoid runTask(MigrateConfig migrateConfig, MigrateCommand migrateCommand, RunTaskContext runTaskContext) throws Exception {
        migrate(migrateCommand, (migrateConfig.getWriteType() == null ? (WriteType) Configuration.getEnum(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class) : migrateConfig.getWriteType()).toProto(), runTaskContext.getFileSystem(), migrateConfig.isOverwrite());
        return null;
    }

    private static void migrate(MigrateCommand migrateCommand, WritePType writePType, FileSystem fileSystem, boolean z) throws Exception {
        boolean z2;
        FileInStream openFile;
        Throwable th;
        FileOutStream createFile;
        Throwable th2;
        String source = migrateCommand.getSource();
        String destination = migrateCommand.getDestination();
        LOG.debug("Migrating {} to {}", source, destination);
        CreateFilePOptions build = CreateFilePOptions.newBuilder().setWriteType(writePType).build();
        OpenFilePOptions build2 = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).build();
        AlluxioURI alluxioURI = new AlluxioURI(destination);
        AlluxioURI alluxioURI2 = new AlluxioURI(PathUtils.temporaryFileName(System.currentTimeMillis(), alluxioURI.toString()));
        boolean z3 = false;
        do {
            z2 = false;
            try {
                openFile = fileSystem.openFile(new AlluxioURI(source), build2);
                th = null;
                try {
                    createFile = fileSystem.createFile(alluxioURI, build);
                    th2 = null;
                } finally {
                }
            } catch (FileAlreadyExistsException e) {
                if (!z) {
                    throw e;
                }
                AlluxioURI alluxioURI3 = alluxioURI;
                alluxioURI = alluxioURI2;
                alluxioURI2 = alluxioURI3;
                z2 = true;
                z3 = true;
            }
            try {
                try {
                    try {
                        IOUtils.copyLarge(openFile, createFile, new byte[8388608]);
                        if (z3) {
                            fileSystem.delete(alluxioURI2);
                            fileSystem.rename(alluxioURI, alluxioURI2);
                        }
                        if (createFile != null) {
                            if (0 != 0) {
                                try {
                                    createFile.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                createFile.close();
                            }
                        }
                        if (openFile != null) {
                            if (0 != 0) {
                                try {
                                    openFile.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                openFile.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } while (z2);
    }

    @Override // alluxio.job.plan.PlanDefinition
    public Class<MigrateConfig> getJobConfigClass() {
        return MigrateConfig.class;
    }

    @Override // alluxio.job.plan.PlanDefinition
    public /* bridge */ /* synthetic */ Set selectExecutors(JobConfig jobConfig, List list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        return selectExecutors((MigrateConfig) jobConfig, (List<WorkerInfo>) list, selectExecutorsContext);
    }
}
