package org.apache.omid.tso;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tso.TSOChannelHandler;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer.class */
public class ProgrammableTSOServer extends SimpleChannelHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ProgrammableTSOServer.class);
    private Queue<Response> responseQueue = new LinkedList();
    private ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("worker-%d").build()), ((Runtime.getRuntime().availableProcessors() * 2) + 1) * 2);
    private ChannelGroup channelGroup = new DefaultChannelGroup(ProgrammableTSOServer.class.getName());

    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$AbortResponse.class */
    public static class AbortResponse extends Response {
        final long startTS;

        public AbortResponse(long j) {
            super(Response.ResponseType.ABORT);
            this.startTS = j;
        }
    }

    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$CommitResponse.class */
    public static class CommitResponse extends Response {
        final long startTS;
        final long commitTS;

        public CommitResponse(long j, long j2) {
            super(Response.ResponseType.COMMIT);
            this.startTS = j;
            this.commitTS = j2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$Response.class */
    public static abstract class Response {
        final ResponseType type;

        /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$Response$ResponseType.class */
        enum ResponseType {
            TIMESTAMP,
            COMMIT,
            ABORT
        }

        public Response(ResponseType responseType) {
            this.type = responseType;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$TSOChannelContext.class */
    public static class TSOChannelContext {
        boolean handshakeComplete = false;

        TSOChannelContext() {
        }

        boolean getHandshakeComplete() {
            return this.handshakeComplete;
        }

        void setHandshakeComplete() {
            this.handshakeComplete = true;
        }
    }

    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$TimestampResponse.class */
    public static class TimestampResponse extends Response {
        final long startTS;

        public TimestampResponse(long j) {
            super(Response.ResponseType.TIMESTAMP);
            this.startTS = j;
        }
    }

    @Inject
    public ProgrammableTSOServer(int i) {
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.factory);
        serverBootstrap.setPipelineFactory(new TSOChannelHandler.TSOPipelineFactory(this));
        this.channelGroup.add(serverBootstrap.bind(new InetSocketAddress(i)));
        LOG.info("********** Dumb TSO Server running on port {} **********", Integer.valueOf(i));
    }

    public void queueResponse(Response response) {
        this.responseQueue.add(response);
    }

    public void cleanResponses() {
        this.responseQueue.clear();
    }

    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        this.channelGroup.add(channelHandlerContext.getChannel());
    }

    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        this.channelGroup.remove(channelHandlerContext.getChannel());
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
        Object message = messageEvent.getMessage();
        if (!(message instanceof TSOProto.Request)) {
            LOG.error("Unknown message type", message);
            return;
        }
        TSOProto.Request request = (TSOProto.Request) message;
        Channel channel = channelHandlerContext.getChannel();
        if (request.hasHandshakeRequest()) {
            checkHandshake(channelHandlerContext, request.getHandshakeRequest());
            return;
        }
        if (!handshakeCompleted(channelHandlerContext)) {
            LOG.info("handshake not completed");
            channel.close();
        }
        Response poll = this.responseQueue.poll();
        if (request.hasTimestampRequest()) {
            if (poll == null || poll.type != Response.ResponseType.TIMESTAMP) {
                throw new IllegalStateException("Expecting TS response to send but got " + poll);
            }
            sendTimestampResponse(((TimestampResponse) poll).startTS, channel);
            return;
        }
        if (!request.hasCommitRequest()) {
            LOG.error("Invalid request {}", request);
            channelHandlerContext.getChannel().close();
        } else {
            if (poll == null) {
                throw new IllegalStateException("Expecting COMMIT response to send but got null");
            }
            switch (poll.type) {
                case COMMIT:
                    CommitResponse commitResponse = (CommitResponse) poll;
                    sendCommitResponse(commitResponse.startTS, commitResponse.commitTS, channel);
                    return;
                case ABORT:
                    sendAbortResponse(((AbortResponse) poll).startTS, channel);
                    return;
                default:
                    throw new IllegalStateException("Expecting COMMIT response to send but got " + poll.type);
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        if (exceptionEvent.getCause() instanceof ClosedChannelException) {
            return;
        }
        LOG.warn("TSOHandler: Unexpected exception from downstream.", exceptionEvent.getCause());
        Channels.close(exceptionEvent.getChannel());
    }

    private void checkHandshake(ChannelHandlerContext channelHandlerContext, TSOProto.HandshakeRequest handshakeRequest) {
        TSOProto.HandshakeResponse.Builder newBuilder = TSOProto.HandshakeResponse.newBuilder();
        if (handshakeRequest.hasClientCapabilities()) {
            newBuilder.setClientCompatible(true).setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
            TSOChannelContext tSOChannelContext = new TSOChannelContext();
            tSOChannelContext.setHandshakeComplete();
            channelHandlerContext.setAttachment(tSOChannelContext);
        } else {
            newBuilder.setClientCompatible(false);
        }
        channelHandlerContext.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(newBuilder.build()).build());
    }

    private boolean handshakeCompleted(ChannelHandlerContext channelHandlerContext) {
        Object attachment = channelHandlerContext.getAttachment();
        if (attachment instanceof TSOChannelContext) {
            return ((TSOChannelContext) attachment).getHandshakeComplete();
        }
        return false;
    }

    private void sendTimestampResponse(long j, Channel channel) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.TimestampResponse.Builder newBuilder2 = TSOProto.TimestampResponse.newBuilder();
        newBuilder2.setStartTimestamp(j);
        newBuilder.setTimestampResponse(newBuilder2.build());
        channel.write(newBuilder.build());
    }

    private void sendCommitResponse(long j, long j2, Channel channel) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.CommitResponse.Builder newBuilder2 = TSOProto.CommitResponse.newBuilder();
        newBuilder2.setAborted(false).setStartTimestamp(j).setCommitTimestamp(j2);
        newBuilder.setCommitResponse(newBuilder2.build());
        channel.write(newBuilder.build());
    }

    private void sendAbortResponse(long j, Channel channel) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.CommitResponse.Builder newBuilder2 = TSOProto.CommitResponse.newBuilder();
        newBuilder2.setAborted(true).setStartTimestamp(j);
        newBuilder.setCommitResponse(newBuilder2.build());
        channel.write(newBuilder.build());
    }
}
