package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/ListActionTaker.class */
public abstract class ListActionTaker {
    private static final Logger LOG = LoggerFactory.getLogger(ListActionTaker.class);
    private final Path path;
    private final AbfsBlobClient abfsClient;
    private final TracingContext tracingContext;
    private final AtomicBoolean producerThreadToBeStopped = new AtomicBoolean(false);
    private final ExecutorService executorService = Executors.newFixedThreadPool(getMaxConsumptionParallelism());

    public ListActionTaker(Path path, AbfsBlobClient abfsBlobClient, TracingContext tracingContext) {
        this.path = path;
        this.abfsClient = abfsBlobClient;
        this.tracingContext = tracingContext;
    }

    public AbfsBlobClient getAbfsClient() {
        return this.abfsClient;
    }

    abstract int getMaxConsumptionParallelism();

    abstract boolean takeAction(Path path) throws AzureBlobFileSystemException;

    private boolean takeAction(List<Path> list) throws AzureBlobFileSystemException {
        ArrayList arrayList = new ArrayList();
        for (Path path : list) {
            arrayList.add(this.executorService.submit(() -> {
                return Boolean.valueOf(takeAction(path));
            }));
        }
        AzureBlobFileSystemException azureBlobFileSystemException = null;
        boolean z = true;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                if (!((Boolean) ((Future) it.next()).get()).booleanValue()) {
                    z = false;
                }
            } catch (InterruptedException e) {
                LOG.debug("Thread interrupted while taking action on path: {}", this.path.toUri().getPath());
            } catch (ExecutionException e2) {
                azureBlobFileSystemException = (AzureBlobFileSystemException) e2.getCause();
            }
        }
        if (azureBlobFileSystemException != null) {
            throw azureBlobFileSystemException;
        }
        return z;
    }

    public boolean listRecursiveAndTakeAction() throws AzureBlobFileSystemException {
        Thread thread = null;
        try {
            ListBlobQueue createListBlobQueue = createListBlobQueue(getAbfsClient().getAbfsConfiguration());
            thread = new Thread(() -> {
                try {
                    produceConsumableList(createListBlobQueue);
                } catch (AzureBlobFileSystemException e) {
                    createListBlobQueue.markProducerFailure(e);
                }
            });
            thread.start();
            while (!createListBlobQueue.getIsCompleted()) {
                List<Path> consume = createListBlobQueue.consume();
                if (consume != null) {
                    try {
                        if (!takeAction(consume)) {
                            if (thread != null) {
                                this.producerThreadToBeStopped.set(true);
                            }
                            this.executorService.shutdownNow();
                            return false;
                        }
                    } catch (AzureBlobFileSystemException e) {
                        createListBlobQueue.markConsumptionFailed();
                        throw e;
                    }
                }
            }
            if (thread != null) {
                this.producerThreadToBeStopped.set(true);
            }
            this.executorService.shutdownNow();
            return true;
        } catch (Throwable th) {
            if (thread != null) {
                this.producerThreadToBeStopped.set(true);
            }
            this.executorService.shutdownNow();
            throw th;
        }
    }

    @VisibleForTesting
    protected ListBlobQueue createListBlobQueue(AbfsConfiguration abfsConfiguration) throws InvalidConfigurationValueException {
        return new ListBlobQueue(abfsConfiguration.getProducerQueueMaxSize(), getMaxConsumptionParallelism(), abfsConfiguration.getListingMaxConsumptionLag());
    }

    private void produceConsumableList(ListBlobQueue listBlobQueue) throws AzureBlobFileSystemException {
        String str = null;
        do {
            str = listAndEnqueue(listBlobQueue, str);
            if (this.producerThreadToBeStopped.get() || str == null) {
                break;
            }
        } while (!listBlobQueue.getConsumptionFailed());
        listBlobQueue.complete();
    }

    @VisibleForTesting
    protected String listAndEnqueue(ListBlobQueue listBlobQueue, String str) throws AzureBlobFileSystemException {
        int min = Math.min(5000, listBlobQueue.availableSizeForProduction());
        if (min == 0) {
            return null;
        }
        try {
            ListResultSchema listResultSchema = getAbfsClient().listPath(this.path.toUri().getPath(), true, min, str, this.tracingContext).getResult().getListResultSchema();
            if (listResultSchema == null) {
                return str;
            }
            String nextMarker = ((BlobListResultSchema) listResultSchema).getNextMarker();
            ArrayList arrayList = new ArrayList();
            addPaths(arrayList, listResultSchema);
            listBlobQueue.enqueue(arrayList);
            return nextMarker;
        } catch (AzureBlobFileSystemException e) {
            throw e;
        } catch (IOException e2) {
            throw new AbfsRestOperationException(-1, null, "Unknown exception from listing: " + e2.getMessage(), e2);
        }
    }

    @VisibleForTesting
    protected void addPaths(List<Path> list, ListResultSchema listResultSchema) {
        Iterator<? extends ListResultEntrySchema> it = listResultSchema.paths().iterator();
        while (it.hasNext()) {
            Path path = new Path("/", it.next().name());
            if (!path.equals(this.path)) {
                list.add(path);
            }
        }
    }
}
