package org.opensearch.repositories.s3.async;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.GenericStatsMetricPublisher;
import org.opensearch.repositories.s3.S3TransferRejectedException;

/* loaded from: input_file:org/opensearch/repositories/s3/async/SizeBasedBlockingQ.class */
public class SizeBasedBlockingQ extends AbstractLifecycleComponent {
    private static final Logger log = LogManager.getLogger(SizeBasedBlockingQ.class);
    protected final ByteSizeValue capacity;
    protected final ExecutorService executorService;
    protected final int consumers;
    private final GenericStatsMetricPublisher genericStatsMetricPublisher;
    private final QueueEventType queueEventType;
    protected final LinkedBlockingQueue<Item> queue = new LinkedBlockingQueue<>();
    protected final Lock lock = new ReentrantLock();
    protected final Condition notEmpty = this.lock.newCondition();
    protected final AtomicLong currentSize = new AtomicLong();
    protected final AtomicBoolean closed = new AtomicBoolean();

    /* loaded from: input_file:org/opensearch/repositories/s3/async/SizeBasedBlockingQ$Consumer.class */
    protected static class Consumer extends Thread {
        private final LinkedBlockingQueue<Item> queue;
        private final Lock lock;
        private final Condition notEmpty;
        private final AtomicLong currentSize;
        private final AtomicBoolean closed;
        private final GenericStatsMetricPublisher genericStatsMetricPublisher;
        private final QueueEventType queueEventType;

        public Consumer(LinkedBlockingQueue<Item> linkedBlockingQueue, AtomicLong atomicLong, Lock lock, Condition condition, AtomicBoolean atomicBoolean, GenericStatsMetricPublisher genericStatsMetricPublisher, QueueEventType queueEventType) {
            this.queue = linkedBlockingQueue;
            this.lock = lock;
            this.notEmpty = condition;
            this.currentSize = atomicLong;
            this.closed = atomicBoolean;
            this.genericStatsMetricPublisher = genericStatsMetricPublisher;
            this.queueEventType = queueEventType;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    consume();
                } catch (Exception e) {
                    SizeBasedBlockingQ.log.error("Failed to consume transfer event", e);
                } catch (AlreadyClosedException e2) {
                    return;
                }
            }
        }

        private void consume() throws InterruptedException {
            Lock lock = this.lock;
            AtomicLong atomicLong = this.currentSize;
            lock.lock();
            try {
                if (this.closed.get()) {
                    throw new AlreadyClosedException("transfer queue closed");
                }
                while (atomicLong.get() == 0) {
                    this.notEmpty.await();
                    if (this.closed.get()) {
                        throw new AlreadyClosedException("transfer queue closed");
                    }
                }
                Item take = this.queue.take();
                atomicLong.addAndGet(-take.size);
                SizeBasedBlockingQ.updateStats(-take.size, this.queueEventType, this.genericStatsMetricPublisher);
                lock.unlock();
                try {
                    take.consumable.run();
                } catch (Exception e) {
                    SizeBasedBlockingQ.log.error("Exception on executing item consumable", e);
                }
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/opensearch/repositories/s3/async/SizeBasedBlockingQ$Item.class */
    public static class Item {
        private final long size;
        private final Runnable consumable;

        public Item(long j, Runnable runnable) {
            this.size = j;
            this.consumable = runnable;
        }
    }

    /* loaded from: input_file:org/opensearch/repositories/s3/async/SizeBasedBlockingQ$QueueEventType.class */
    public enum QueueEventType {
        NORMAL,
        LOW
    }

    public SizeBasedBlockingQ(ByteSizeValue byteSizeValue, ExecutorService executorService, int i, GenericStatsMetricPublisher genericStatsMetricPublisher, QueueEventType queueEventType) {
        this.capacity = byteSizeValue;
        this.executorService = executorService;
        this.consumers = i;
        this.genericStatsMetricPublisher = genericStatsMetricPublisher;
        this.queueEventType = queueEventType;
    }

    protected void doStart() {
        for (int i = 0; i < this.consumers; i++) {
            this.executorService.submit(new Consumer(this.queue, this.currentSize, this.lock, this.notEmpty, this.closed, this.genericStatsMetricPublisher, this.queueEventType));
        }
    }

    public void produce(Item item) throws InterruptedException {
        if (item == null || item.size <= 0) {
            throw new IllegalStateException("Invalid item input to produce.");
        }
        log.debug(() -> {
            long j = item.size;
            this.currentSize.get();
            return "Transfer queue event received of size: " + j + ". Current queue utilisation: " + j;
        });
        if (this.currentSize.get() + item.size >= this.capacity.getBytes()) {
            throw new S3TransferRejectedException("S3 Transfer queue capacity reached");
        }
        Lock lock = this.lock;
        AtomicLong atomicLong = this.currentSize;
        lock.lock();
        try {
            if (atomicLong.get() + item.size >= this.capacity.getBytes()) {
                throw new S3TransferRejectedException("S3 Transfer queue capacity reached");
            }
            if (this.closed.get()) {
                throw new AlreadyClosedException("Transfer queue is already closed.");
            }
            this.queue.put(item);
            atomicLong.addAndGet(item.size);
            this.notEmpty.signalAll();
            updateStats(item.size, this.queueEventType, this.genericStatsMetricPublisher);
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private static void updateStats(long j, QueueEventType queueEventType, GenericStatsMetricPublisher genericStatsMetricPublisher) {
        if (queueEventType == QueueEventType.NORMAL) {
            genericStatsMetricPublisher.updateNormalPriorityQSize(j);
        } else if (queueEventType == QueueEventType.LOW) {
            genericStatsMetricPublisher.updateLowPriorityQSize(j);
        }
    }

    public int getSize() {
        return this.queue.size();
    }

    public boolean isMaxCapacityBelowContentLength(long j) {
        return j < this.capacity.getBytes();
    }

    protected void doStop() {
        doClose();
    }

    protected void doClose() {
        this.lock.lock();
        try {
            if (this.closed.get()) {
                return;
            }
            this.closed.set(true);
            this.notEmpty.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
