package io.riemann.riemann.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.riemann.riemann.Proto;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/riemann/riemann/client/TcpTransport.class */
public class TcpTransport implements AsynchronousTransport {
    public final Logger logger;
    public static final ProtobufDecoder pbDecoder = new ProtobufDecoder(Proto.Msg.getDefaultInstance());
    public static final ProtobufEncoder pbEncoder = new ProtobufEncoder();
    public static final LengthFieldPrepender frameEncoder = new LengthFieldPrepender(4);
    public static final int DEFAULT_PORT = 5555;
    public volatile State state;
    public final EventLoopGroup eventLoopGroup;
    public final ChannelGroup channels;
    public volatile Bootstrap bootstrap;
    public volatile Semaphore writeLimiter;
    public final AtomicBoolean autoFlush;
    public final AtomicInteger writeLimit;
    public final AtomicLong reconnectDelay;
    public final AtomicInteger connectTimeout;
    public final AtomicInteger writeTimeout;
    public final AtomicInteger writeBufferHigh;
    public final AtomicInteger writeBufferLow;
    public final InetSocketAddress remoteAddress;
    public final InetSocketAddress localAddress;
    public final AtomicReference<SSLContext> sslContext;
    public volatile ExceptionReporter exceptionReporter;

    /* loaded from: input_file:io/riemann/riemann/client/TcpTransport$State.class */
    public enum State {
        DISCONNECTED,
        CONNECTING,
        CONNECTED,
        DISCONNECTING
    }

    public void setExceptionReporter(ExceptionReporter exceptionReporter) {
        this.exceptionReporter = exceptionReporter;
    }

    public TcpTransport(InetSocketAddress inetSocketAddress) {
        this.logger = LoggerFactory.getLogger(TcpTransport.class);
        this.state = State.DISCONNECTED;
        this.eventLoopGroup = new NioEventLoopGroup(1);
        this.channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        this.writeLimiter = new Semaphore(8192);
        this.autoFlush = new AtomicBoolean(true);
        this.writeLimit = new AtomicInteger(8192);
        this.reconnectDelay = new AtomicLong(5000L);
        this.connectTimeout = new AtomicInteger(5000);
        this.writeTimeout = new AtomicInteger(5000);
        this.writeBufferHigh = new AtomicInteger(65536);
        this.writeBufferLow = new AtomicInteger(8192);
        this.sslContext = new AtomicReference<>();
        this.exceptionReporter = new ExceptionReporter() { // from class: io.riemann.riemann.client.TcpTransport.1
            @Override // io.riemann.riemann.client.ExceptionReporter
            public void reportException(Throwable th) {
            }
        };
        this.remoteAddress = inetSocketAddress;
        this.localAddress = null;
    }

    public TcpTransport(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) {
        this.logger = LoggerFactory.getLogger(TcpTransport.class);
        this.state = State.DISCONNECTED;
        this.eventLoopGroup = new NioEventLoopGroup(1);
        this.channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        this.writeLimiter = new Semaphore(8192);
        this.autoFlush = new AtomicBoolean(true);
        this.writeLimit = new AtomicInteger(8192);
        this.reconnectDelay = new AtomicLong(5000L);
        this.connectTimeout = new AtomicInteger(5000);
        this.writeTimeout = new AtomicInteger(5000);
        this.writeBufferHigh = new AtomicInteger(65536);
        this.writeBufferLow = new AtomicInteger(8192);
        this.sslContext = new AtomicReference<>();
        this.exceptionReporter = new ExceptionReporter() { // from class: io.riemann.riemann.client.TcpTransport.1
            @Override // io.riemann.riemann.client.ExceptionReporter
            public void reportException(Throwable th) {
            }
        };
        this.remoteAddress = inetSocketAddress;
        this.localAddress = inetSocketAddress2;
    }

    public TcpTransport(String str, int i) throws IOException {
        this(InetSocketAddress.createUnresolved(str, i));
    }

    public TcpTransport(String str, int i, String str2, int i2) throws IOException {
        this(InetSocketAddress.createUnresolved(str, i), InetSocketAddress.createUnresolved(str2, i2));
    }

    public TcpTransport(String str) throws IOException {
        this(str, 5555);
    }

    public TcpTransport(String str, String str2) throws IOException {
        this(str, 5555, str2, 0);
    }

    public TcpTransport(int i) throws IOException {
        this(InetAddress.getLocalHost().getHostAddress(), i);
    }

    public synchronized TcpTransport setWriteBufferLimit(int i) {
        if (isConnected()) {
            throw new IllegalStateException("can't modify the write buffer limit of a connected transport; please set the limit before connecting");
        }
        this.writeLimit.set(i);
        this.writeLimiter = new Semaphore(i);
        return this;
    }

    @Override // io.riemann.riemann.client.Transport
    public boolean isConnected() {
        if (this.state != State.CONNECTED) {
            return false;
        }
        Iterator it = this.channels.iterator();
        while (it.hasNext()) {
            if (((Channel) it.next()).isOpen()) {
                return true;
            }
        }
        return false;
    }

    public SslHandler sslHandler() {
        SSLContext sSLContext = this.sslContext.get();
        if (sSLContext == null) {
            return null;
        }
        SSLEngine createSSLEngine = sSLContext.createSSLEngine();
        createSSLEngine.setUseClientMode(true);
        return new SslHandler(createSSLEngine);
    }

    @Override // io.riemann.riemann.client.Transport
    public synchronized void connect() throws IOException {
        if (this.state != State.DISCONNECTED) {
            return;
        }
        this.state = State.CONNECTING;
        this.bootstrap = new Bootstrap().group(this.eventLoopGroup).localAddress(this.localAddress).remoteAddress(this.remoteAddress).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { // from class: io.riemann.riemann.client.TcpTransport.2
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast("reconnect", new ReconnectHandler(TcpTransport.this.bootstrap, TcpTransport.this.channels, TcpTransport.this.reconnectDelay, TimeUnit.MILLISECONDS));
                SslHandler sslHandler = TcpTransport.this.sslHandler();
                if (sslHandler != null) {
                    pipeline.addLast("tls", sslHandler);
                }
                pipeline.addLast("frame-decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                pipeline.addLast("frame-encoder", TcpTransport.frameEncoder);
                pipeline.addLast("protobuf-decoder", TcpTransport.pbDecoder);
                pipeline.addLast("protobuf-encoder", TcpTransport.pbEncoder);
                pipeline.addLast("handler", new TcpHandler(TcpTransport.this.exceptionReporter));
            }
        });
        this.bootstrap.option(ChannelOption.TCP_NODELAY, true);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.connectTimeout.get()));
        this.bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, Integer.valueOf(this.writeBufferLow.get()));
        this.bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(this.writeBufferHigh.get()));
        this.bootstrap.localAddress(this.localAddress);
        this.bootstrap.remoteAddress(this.remoteAddress);
        ChannelFuture connect = this.bootstrap.connect();
        this.channels.add(connect.channel());
        connect.awaitUninterruptibly();
        this.state = State.CONNECTED;
        if (!connect.isSuccess()) {
            throw new IOException("Connection failed", connect.cause());
        }
    }

    @Override // io.riemann.riemann.client.Transport, java.lang.AutoCloseable
    public void close() {
        close(false);
    }

    public synchronized void close(boolean z) {
        if (z || this.state == State.CONNECTED) {
            try {
                this.channels.close().awaitUninterruptibly();
                this.eventLoopGroup.shutdownGracefully().awaitUninterruptibly();
            } finally {
                this.bootstrap = null;
                this.state = State.DISCONNECTED;
            }
        }
    }

    @Override // io.riemann.riemann.client.Transport
    public synchronized void reconnect() throws IOException {
        close();
        connect();
    }

    @Override // io.riemann.riemann.client.Transport
    public void flush() throws IOException {
        this.channels.flush();
    }

    @Override // io.riemann.riemann.client.AsynchronousTransport
    public IPromise<Proto.Msg> sendMessage(Proto.Msg msg) {
        return sendMessage(msg, new Promise<>());
    }

    public Promise<Proto.Msg> sendMessage(Proto.Msg msg, Promise<Proto.Msg> promise) {
        if (this.state != State.CONNECTED) {
            promise.deliver(new IOException("client not connected"));
            return promise;
        }
        Write write = new Write(msg, promise);
        final Semaphore semaphore = this.writeLimiter;
        if (!semaphore.tryAcquire()) {
            promise.deliver(new OverloadedException("client write buffer is full: " + this.writeLimiter.availablePermits() + " / " + this.writeLimit.get() + " messages."));
            return promise;
        }
        Iterator it = this.channels.iterator();
        if (it.hasNext()) {
            Channel channel = (Channel) it.next();
            (this.autoFlush.get() ? channel.writeAndFlush(write) : channel.write(write)).addListener(new ChannelFutureListener() { // from class: io.riemann.riemann.client.TcpTransport.3
                public void operationComplete(ChannelFuture channelFuture) {
                    semaphore.release();
                }
            });
            return promise;
        }
        semaphore.release();
        promise.deliver(new IOException("no channels available"));
        return promise;
    }

    @Override // io.riemann.riemann.client.Transport
    public Transport transport() {
        return null;
    }
}
