package org.apache.omid.tso;

import com.codahale.metrics.MetricRegistry;
import com.google.inject.name.Named;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/omid/tso/RetryProcessorImpl.class */
class RetryProcessorImpl implements EventHandler<RetryEvent>, RetryProcessor {
    private static final Logger LOG;
    private final ExecutorService disruptorExec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("retry-%d").build());
    private final Disruptor<RetryEvent> disruptor;
    private final RingBuffer<RetryEvent> retryRing;
    final ReplyProcessor replyProc;
    final CommitTable.Client commitTableClient;
    final ObjectPool<Batch> batchPool;
    private final Meter txAlreadyCommittedMeter;
    private final Meter invalidTxMeter;
    private final Meter noCTFoundMeter;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/omid/tso/RetryProcessorImpl$RetryEvent.class */
    public static final class RetryEvent {
        private Type type = null;
        private long startTimestamp = 0;
        private Channel channel = null;
        private MonitoringContext monCtx;
        public static final EventFactory<RetryEvent> EVENT_FACTORY = new EventFactory<RetryEvent>() { // from class: org.apache.omid.tso.RetryProcessorImpl.RetryEvent.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public RetryEvent m31newInstance() {
                return new RetryEvent();
            }
        };

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/omid/tso/RetryProcessorImpl$RetryEvent$Type.class */
        public enum Type {
            COMMIT
        }

        static void makeCommitRetry(RetryEvent retryEvent, long j, Channel channel, MonitoringContext monitoringContext) {
            retryEvent.monCtx = monitoringContext;
            retryEvent.type = Type.COMMIT;
            retryEvent.startTimestamp = j;
            retryEvent.channel = channel;
        }

        MonitoringContext getMonCtx() {
            return this.monCtx;
        }

        Type getType() {
            return this.type;
        }

        Channel getChannel() {
            return this.channel;
        }

        long getStartTimestamp() {
            return this.startTimestamp;
        }
    }

    @Inject
    RetryProcessorImpl(@Named("RetryStrategy") WaitStrategy waitStrategy, MetricsRegistry metricsRegistry, CommitTable commitTable, ReplyProcessor replyProcessor, Panicker panicker, ObjectPool<Batch> objectPool) throws InterruptedException, ExecutionException, IOException {
        this.disruptor = new Disruptor<>(RetryEvent.EVENT_FACTORY, 4096, this.disruptorExec, ProducerType.SINGLE, waitStrategy);
        this.disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
        this.disruptor.handleEventsWith(new EventHandler[]{this});
        this.retryRing = this.disruptor.start();
        this.commitTableClient = commitTable.getClient();
        this.replyProc = replyProcessor;
        this.batchPool = objectPool;
        this.txAlreadyCommittedMeter = metricsRegistry.meter(MetricRegistry.name("tso", new String[]{"retries", "commits", "tx-already-committed"}));
        this.invalidTxMeter = metricsRegistry.meter(MetricRegistry.name("tso", new String[]{"retries", "aborts", "tx-invalid"}));
        this.noCTFoundMeter = metricsRegistry.meter(MetricRegistry.name("tso", new String[]{"retries", "aborts", "tx-without-commit-timestamp"}));
        LOG.info("RetryProcessor initialized");
    }

    public void onEvent(RetryEvent retryEvent, long j, boolean z) throws Exception {
        switch (retryEvent.getType()) {
            case COMMIT:
                handleCommitRetry(retryEvent);
                retryEvent.getMonCtx().timerStop("retry.processor.commit-retry.latency");
                break;
            default:
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                break;
        }
        retryEvent.getMonCtx().publish();
    }

    private void handleCommitRetry(RetryEvent retryEvent) {
        long startTimestamp = retryEvent.getStartTimestamp();
        try {
            Optional optional = (Optional) this.commitTableClient.getCommitTimestamp(startTimestamp).get();
            if (!optional.isPresent()) {
                LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", Long.valueOf(startTimestamp));
                this.replyProc.sendAbortResponse(startTimestamp, retryEvent.getChannel(), retryEvent.getMonCtx());
                this.noCTFoundMeter.mark();
            } else if (((CommitTable.CommitTimestamp) optional.get()).isValid()) {
                LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", Long.valueOf(startTimestamp));
                this.replyProc.sendCommitResponse(startTimestamp, ((CommitTable.CommitTimestamp) optional.get()).getValue(), retryEvent.getChannel(), retryEvent.getMonCtx(), Optional.absent());
                this.txAlreadyCommittedMeter.mark();
            } else {
                LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", Long.valueOf(startTimestamp));
                this.replyProc.sendAbortResponse(startTimestamp, retryEvent.getChannel(), retryEvent.getMonCtx());
                this.invalidTxMeter.mark();
            }
        } catch (InterruptedException e) {
            LOG.error("Interrupted reading from commit table");
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            LOG.error("Error reading from commit table", e2);
        }
    }

    @Override // org.apache.omid.tso.RetryProcessor
    public void disambiguateRetryRequestHeuristically(long j, Channel channel, MonitoringContext monitoringContext) {
        long next = this.retryRing.next();
        RetryEvent retryEvent = (RetryEvent) this.retryRing.get(next);
        monitoringContext.timerStart("retry.processor.commit-retry.latency");
        RetryEvent.makeCommitRetry(retryEvent, j, channel, monitoringContext);
        this.retryRing.publish(next);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.info("Terminating Retry Processor...");
        this.disruptor.halt();
        this.disruptor.shutdown();
        LOG.info("\tRetry Processor Disruptor shutdown");
        this.disruptorExec.shutdownNow();
        try {
            this.disruptorExec.awaitTermination(3L, TimeUnit.SECONDS);
            LOG.info("\tRetry Processor Disruptor executor shutdown");
        } catch (InterruptedException e) {
            LOG.error("Interrupted whilst finishing Retry Processor Disruptor executor");
            Thread.currentThread().interrupt();
        }
        LOG.info("Retry Processor terminated");
    }

    static {
        $assertionsDisabled = !RetryProcessorImpl.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(RetryProcessor.class);
    }
}
