package org.apache.phoenix.flume.sink;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.phoenix.flume.FlumeConstants;
import org.apache.phoenix.flume.serializer.EventSerializer;
import org.apache.phoenix.flume.serializer.EventSerializers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/phoenix/flume/sink/PhoenixSink.class */
public class PhoenixSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(PhoenixSink.class);
    private static AtomicInteger counter = new AtomicInteger();
    private static final String NAME = "Phoenix Sink__";
    private SinkCounter sinkCounter;
    private Integer batchSize;
    private EventSerializer serializer;

    public void configure(Context context) {
        setName(NAME + counter.incrementAndGet());
        this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, FlumeConstants.DEFAULT_BATCH_SIZE);
        String string = context.getString(FlumeConstants.CONFIG_SERIALIZER);
        Preconditions.checkNotNull(string, "Event serializer cannot be empty, please specify in the configuration file");
        initializeSerializer(context, string);
        this.sinkCounter = new SinkCounter(getName());
    }

    private void initializeSerializer(Context context, String str) {
        String str2 = null;
        EventSerializers eventSerializers = null;
        try {
            eventSerializers = EventSerializers.valueOf(str.toUpperCase());
        } catch (IllegalArgumentException e) {
            str2 = str;
        }
        Context context2 = new Context();
        context2.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX));
        copyPropertiesToSerializerContext(context, context2);
        try {
            this.serializer = (EventSerializer) (str2 == null ? Class.forName(eventSerializers.getClassName()) : Class.forName(str2)).newInstance();
            this.serializer.configure(context2);
        } catch (Exception e2) {
            logger.error("Could not instantiate event serializer.", e2);
            Throwables.propagate(e2);
        }
    }

    private void copyPropertiesToSerializerContext(Context context, Context context2) {
        context2.put(FlumeConstants.CONFIG_TABLE_DDL, context.getString(FlumeConstants.CONFIG_TABLE_DDL));
        context2.put(FlumeConstants.CONFIG_TABLE, context.getString(FlumeConstants.CONFIG_TABLE));
        context2.put(FlumeConstants.CONFIG_ZK_QUORUM, context.getString(FlumeConstants.CONFIG_ZK_QUORUM));
        context2.put(FlumeConstants.CONFIG_JDBC_URL, context.getString(FlumeConstants.CONFIG_JDBC_URL));
        context2.put(FlumeConstants.CONFIG_BATCHSIZE, context.getString(FlumeConstants.CONFIG_BATCHSIZE));
    }

    public void start() {
        logger.info("Starting sink {} ", getName());
        this.sinkCounter.start();
        try {
            this.serializer.initialize();
            this.sinkCounter.incrementConnectionCreatedCount();
        } catch (Exception e) {
            this.sinkCounter.incrementConnectionFailedCount();
            logger.error("Error {} in initializing the serializer.", e.getMessage());
            Throwables.propagate(e);
        }
        super.start();
    }

    public void stop() {
        super.stop();
        try {
            this.serializer.close();
        } catch (SQLException e) {
            logger.error(" Error while closing connection {} for sink {} ", e.getMessage(), getName());
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = null;
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.batchSize.intValue());
        long nanoTime = System.nanoTime();
        try {
            try {
                try {
                    transaction = channel.getTransaction();
                    transaction.begin();
                    long j = 0;
                    while (true) {
                        if (j >= this.batchSize.intValue()) {
                            break;
                        }
                        Event take = channel.take();
                        if (take == null) {
                            status = Sink.Status.BACKOFF;
                            if (j == 0) {
                                this.sinkCounter.incrementBatchEmptyCount();
                            } else {
                                this.sinkCounter.incrementBatchUnderflowCount();
                            }
                        } else {
                            newArrayListWithExpectedSize.add(take);
                            j++;
                        }
                    }
                    if (newArrayListWithExpectedSize.isEmpty()) {
                        logger.debug("no events to process ");
                        this.sinkCounter.incrementBatchEmptyCount();
                        status = Sink.Status.BACKOFF;
                    } else {
                        if (newArrayListWithExpectedSize.size() == this.batchSize.intValue()) {
                            this.sinkCounter.incrementBatchCompleteCount();
                        } else {
                            this.sinkCounter.incrementBatchUnderflowCount();
                            status = Sink.Status.BACKOFF;
                        }
                        this.serializer.upsertEvents(newArrayListWithExpectedSize);
                        this.sinkCounter.addToEventDrainSuccessCount(newArrayListWithExpectedSize.size());
                    }
                    transaction.commit();
                    logger.info(String.format("Time taken to process [%s] events was [%s] seconds", Integer.valueOf(newArrayListWithExpectedSize.size()), Long.valueOf(TimeUnit.SECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS))));
                    if (transaction != null) {
                        transaction.close();
                    }
                } catch (Throwable th) {
                    transaction.rollback();
                    logger.error("exception while processing in Phoenix Sink", th);
                    throw new EventDeliveryException("Failed to persist message", th);
                }
            } catch (SQLException e) {
                this.sinkCounter.incrementConnectionFailedCount();
                transaction.rollback();
                logger.error("exception while persisting to Hbase ", e);
                throw new EventDeliveryException("Failed to persist message to Hbase", e);
            } catch (ChannelException e2) {
                transaction.rollback();
                status = Sink.Status.BACKOFF;
                this.sinkCounter.incrementConnectionFailedCount();
                logger.info(String.format("Time taken to process [%s] events was [%s] seconds", Integer.valueOf(newArrayListWithExpectedSize.size()), Long.valueOf(TimeUnit.SECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS))));
                if (transaction != null) {
                    transaction.close();
                }
            }
            return status;
        } catch (Throwable th2) {
            logger.info(String.format("Time taken to process [%s] events was [%s] seconds", Integer.valueOf(newArrayListWithExpectedSize.size()), Long.valueOf(TimeUnit.SECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS))));
            if (transaction != null) {
                transaction.close();
            }
            throw th2;
        }
    }
}
