package org.apache.omid.tso;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tso.TSOStateManager;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/omid/tso/AbstractRequestProcessor.class */
abstract class AbstractRequestProcessor implements EventHandler<RequestEvent>, RequestProcessor, TimeoutHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestProcessor.class);
    private final ExecutorService disruptorExec;
    protected final Disruptor<RequestEvent> disruptor;
    protected RingBuffer<RequestEvent> requestRing;
    private final TimestampOracle timestampOracle;
    private final CommitHashMap hashmap;
    private final Map<Long, Long> tableFences;
    private final MetricsRegistry metrics;
    private final LowWatermarkWriter lowWatermarkWriter;
    private long lowWatermark = -1;
    private final ReplyProcessor replyProcessor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/omid/tso/AbstractRequestProcessor$RequestEvent.class */
    public static final class RequestEvent implements Iterable<Long> {
        private MonitoringContext monCtx;
        private static final int MAX_INLINE = 40;
        static final EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() { // from class: org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.2
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public RequestEvent m3newInstance() {
                return new RequestEvent();
            }
        };
        private Type type = null;
        private Channel channel = null;
        private boolean isCommitRetry = false;
        private long startTimestamp = 0;
        private long numCells = 0;
        private Long[] writeSet = new Long[MAX_INLINE];
        private Collection<Long> writeSetAsCollection = null;
        private Collection<Long> tableIdSet = null;
        private long tableID = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/omid/tso/AbstractRequestProcessor$RequestEvent$Type.class */
        public enum Type {
            TIMESTAMP,
            COMMIT,
            FENCE
        }

        RequestEvent() {
        }

        static void makeTimestampRequest(RequestEvent requestEvent, Channel channel, MonitoringContext monitoringContext) {
            requestEvent.type = Type.TIMESTAMP;
            requestEvent.channel = channel;
            requestEvent.monCtx = monitoringContext;
        }

        static void makeCommitRequest(RequestEvent requestEvent, long j, MonitoringContext monitoringContext, Collection<Long> collection, Collection<Long> collection2, boolean z, Channel channel) {
            requestEvent.monCtx = monitoringContext;
            requestEvent.type = Type.COMMIT;
            requestEvent.channel = channel;
            requestEvent.startTimestamp = j;
            requestEvent.isCommitRetry = z;
            if (collection.size() > MAX_INLINE) {
                requestEvent.numCells = collection.size();
                requestEvent.writeSetAsCollection = collection;
            } else {
                requestEvent.writeSetAsCollection = null;
                requestEvent.numCells = collection.size();
                int i = 0;
                Iterator<Long> it = collection.iterator();
                while (it.hasNext()) {
                    requestEvent.writeSet[i] = it.next();
                    i++;
                }
            }
            requestEvent.tableIdSet = collection2;
        }

        static void makeFenceRequest(RequestEvent requestEvent, long j, Channel channel, MonitoringContext monitoringContext) {
            requestEvent.type = Type.FENCE;
            requestEvent.channel = channel;
            requestEvent.monCtx = monitoringContext;
            requestEvent.tableID = j;
        }

        MonitoringContext getMonCtx() {
            return this.monCtx;
        }

        Type getType() {
            return this.type;
        }

        long getStartTimestamp() {
            return this.startTimestamp;
        }

        Channel getChannel() {
            return this.channel;
        }

        Collection<Long> getTableIdSet() {
            return this.tableIdSet;
        }

        long getTableId() {
            return this.tableID;
        }

        @Override // java.lang.Iterable
        public Iterator<Long> iterator() {
            return this.writeSetAsCollection != null ? this.writeSetAsCollection.iterator() : new Iterator<Long>() { // from class: org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.1
                int i = 0;

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return ((long) this.i) < RequestEvent.this.numCells;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public Long next() {
                    if (!hasNext()) {
                        throw new NoSuchElementException();
                    }
                    Long[] lArr = RequestEvent.this.writeSet;
                    int i = this.i;
                    this.i = i + 1;
                    return lArr[i];
                }

                @Override // java.util.Iterator
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        Iterable<Long> writeSet() {
            return this;
        }

        boolean isCommitRetry() {
            return this.isCommitRetry;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractRequestProcessor(MetricsRegistry metricsRegistry, TimestampOracle timestampOracle, Panicker panicker, TSOServerConfig tSOServerConfig, LowWatermarkWriter lowWatermarkWriter, ReplyProcessor replyProcessor) throws IOException {
        TimeoutBlockingWaitStrategy timeoutBlockingWaitStrategy = new TimeoutBlockingWaitStrategy(tSOServerConfig.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
        this.disruptorExec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("request-%d").build());
        this.disruptor = new Disruptor<>(RequestEvent.EVENT_FACTORY, 4096, this.disruptorExec, ProducerType.MULTI, timeoutBlockingWaitStrategy);
        this.disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
        this.disruptor.handleEventsWith(new EventHandler[]{this});
        this.metrics = metricsRegistry;
        this.timestampOracle = timestampOracle;
        this.hashmap = new CommitHashMap(tSOServerConfig.getConflictMapSize());
        this.tableFences = new HashMap();
        this.lowWatermarkWriter = lowWatermarkWriter;
        this.replyProcessor = replyProcessor;
        LOG.info("RequestProcessor initialized");
    }

    @Override // org.apache.omid.tso.TSOStateManager.StateObserver
    public void update(TSOStateManager.TSOState tSOState) throws Exception {
        LOG.info("Initializing RequestProcessor state...");
        this.lowWatermark = tSOState.getLowWatermark();
        this.lowWatermarkWriter.persistLowWatermark(this.lowWatermark).get();
        LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", Long.valueOf(this.lowWatermark), Long.valueOf(tSOState.getEpoch()));
    }

    public void onEvent(RequestEvent requestEvent, long j, boolean z) throws Exception {
        switch (requestEvent.getType()) {
            case TIMESTAMP:
                handleTimestamp(requestEvent);
                return;
            case COMMIT:
                handleCommit(requestEvent);
                return;
            case FENCE:
                handleFence(requestEvent);
                return;
            default:
                throw new IllegalStateException("Event not allowed in Request Processor: " + requestEvent);
        }
    }

    public void onTimeout(long j) throws Exception {
        onTimeout();
    }

    @Override // org.apache.omid.tso.RequestProcessor
    public void timestampRequest(Channel channel, MonitoringContext monitoringContext) {
        monitoringContext.timerStart("request.processor.timestamp.latency");
        long next = this.requestRing.next();
        RequestEvent.makeTimestampRequest((RequestEvent) this.requestRing.get(next), channel, monitoringContext);
        this.requestRing.publish(next);
    }

    @Override // org.apache.omid.tso.RequestProcessor
    public void commitRequest(long j, Collection<Long> collection, Collection<Long> collection2, boolean z, Channel channel, MonitoringContext monitoringContext) {
        monitoringContext.timerStart("request.processor.commit.latency");
        long next = this.requestRing.next();
        RequestEvent.makeCommitRequest((RequestEvent) this.requestRing.get(next), j, monitoringContext, collection, collection2, z, channel);
        this.requestRing.publish(next);
    }

    @Override // org.apache.omid.tso.RequestProcessor
    public void fenceRequest(long j, Channel channel, MonitoringContext monitoringContext) {
        monitoringContext.timerStart("request.processor.fence.latency");
        long next = this.requestRing.next();
        RequestEvent.makeFenceRequest((RequestEvent) this.requestRing.get(next), j, channel, monitoringContext);
        this.requestRing.publish(next);
    }

    private void handleTimestamp(RequestEvent requestEvent) throws Exception {
        long next = this.timestampOracle.next();
        requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
        forwardTimestamp(next, requestEvent.getChannel(), requestEvent.getMonCtx());
    }

    private boolean hasConflictsWithFences(long j, Collection<Long> collection) {
        if (this.tableFences.isEmpty()) {
            return false;
        }
        Iterator<Long> it = collection.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            Long l = this.tableFences.get(Long.valueOf(longValue));
            if (l != null && l.longValue() > j) {
                return true;
            }
            if (l != null && l.longValue() < this.lowWatermark) {
                this.tableFences.remove(Long.valueOf(longValue));
            }
        }
        return false;
    }

    private boolean hasConflictsWithCommittedTransactions(long j, Iterable<Long> iterable) {
        Iterator<Long> it = iterable.iterator();
        while (it.hasNext()) {
            long latestWriteForCell = this.hashmap.getLatestWriteForCell(it.next().longValue());
            if (latestWriteForCell != 0 && latestWriteForCell >= j) {
                return true;
            }
        }
        return false;
    }

    private void handleCommit(RequestEvent requestEvent) throws Exception {
        long startTimestamp = requestEvent.getStartTimestamp();
        Iterable<Long> writeSet = requestEvent.writeSet();
        Collection<Long> tableIdSet = requestEvent.getTableIdSet();
        boolean isCommitRetry = requestEvent.isCommitRetry();
        Channel channel = requestEvent.getChannel();
        boolean hasNext = writeSet.iterator().hasNext();
        if (startTimestamp <= this.lowWatermark || hasConflictsWithFences(startTimestamp, tableIdSet) || hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
            requestEvent.getMonCtx().timerStop("request.processor.commit.latency");
            if (isCommitRetry) {
                forwardCommitRetry(startTimestamp, channel, requestEvent.getMonCtx());
                return;
            } else {
                forwardAbort(startTimestamp, channel, requestEvent.getMonCtx());
                return;
            }
        }
        long next = this.timestampOracle.next();
        Optional<Long> absent = Optional.absent();
        if (hasNext) {
            long j = this.lowWatermark;
            Iterator<Long> it = writeSet.iterator();
            while (it.hasNext()) {
                j = Math.max(this.hashmap.putLatestWriteForCell(it.next().longValue(), next), j);
            }
            if (j != this.lowWatermark) {
                LOG.trace("Setting new low Watermark to {}", Long.valueOf(j));
                this.lowWatermark = j;
                absent = Optional.of(Long.valueOf(this.lowWatermark));
            }
        }
        requestEvent.getMonCtx().timerStop("request.processor.commit.latency");
        forwardCommit(startTimestamp, next, channel, requestEvent.getMonCtx(), absent);
    }

    private void handleFence(RequestEvent requestEvent) throws Exception {
        long tableId = requestEvent.getTableId();
        Channel channel = requestEvent.getChannel();
        long next = this.timestampOracle.next();
        this.tableFences.put(Long.valueOf(tableId), Long.valueOf(next));
        requestEvent.monCtx.timerStart("reply.processor.fence.latency");
        this.replyProcessor.sendFenceResponse(tableId, next, channel, requestEvent.monCtx);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.info("Terminating Request Processor...");
        this.disruptor.halt();
        this.disruptor.shutdown();
        LOG.info("\tRequest Processor Disruptor shutdown");
        this.disruptorExec.shutdownNow();
        try {
            this.disruptorExec.awaitTermination(3L, TimeUnit.SECONDS);
            LOG.info("\tRequest Processor Disruptor executor shutdown");
        } catch (InterruptedException e) {
            LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
            Thread.currentThread().interrupt();
        }
        LOG.info("Request Processor terminated");
    }

    protected abstract void forwardCommit(long j, long j2, Channel channel, MonitoringContext monitoringContext, Optional<Long> optional) throws Exception;

    protected abstract void forwardCommitRetry(long j, Channel channel, MonitoringContext monitoringContext) throws Exception;

    protected abstract void forwardAbort(long j, Channel channel, MonitoringContext monitoringContext) throws Exception;

    protected abstract void forwardTimestamp(long j, Channel channel, MonitoringContext monitoringContext) throws Exception;

    protected abstract void onTimeout() throws Exception;
}
