package org.apache.flume.sink.hbase;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/hbase/HBaseSink.class */
public class HBaseSink extends AbstractSink implements Configurable, BatchSizeSupported {
    private String tableName;
    private byte[] columnFamily;
    private HTable table;
    private long batchSize;
    private Configuration config;
    private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
    private HbaseEventSerializer serializer;
    private String eventSerializerType;
    private Context serializerContext;
    private String kerberosPrincipal;
    private String kerberosKeytab;
    private boolean enableWal;
    private boolean batchIncrements;
    private Method refGetFamilyMap;
    private SinkCounter sinkCounter;
    private PrivilegedExecutor privilegedExecutor;
    private DebugIncrementsCallback debugIncrCallback;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    @InterfaceAudience.Private
    /* loaded from: input_file:org/apache/flume/sink/hbase/HBaseSink$DebugIncrementsCallback.class */
    public interface DebugIncrementsCallback {
        void onAfterCoalesce(Iterable<Increment> iterable);
    }

    public HBaseSink() {
        this(HBaseConfiguration.create());
    }

    public HBaseSink(Configuration configuration) {
        this.enableWal = true;
        this.batchIncrements = false;
        this.refGetFamilyMap = null;
        this.debugIncrCallback = null;
        this.config = configuration;
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    HBaseSink(Configuration configuration, DebugIncrementsCallback debugIncrementsCallback) {
        this(configuration);
        this.debugIncrCallback = debugIncrementsCallback;
    }

    public void start() {
        Preconditions.checkArgument(this.table == null, "Please call stop before calling start on an old instance.");
        try {
            this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(this.kerberosPrincipal, this.kerberosKeytab);
            try {
                this.table = (HTable) this.privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() { // from class: org.apache.flume.sink.hbase.HBaseSink.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedExceptionAction
                    public HTable run() throws Exception {
                        HTable hTable = new HTable(HBaseSink.this.config, HBaseSink.this.tableName);
                        hTable.setAutoFlush(false);
                        return hTable;
                    }
                });
                try {
                    if (!((Boolean) this.privilegedExecutor.execute(new PrivilegedExceptionAction<Boolean>() { // from class: org.apache.flume.sink.hbase.HBaseSink.2
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.security.PrivilegedExceptionAction
                        public Boolean run() throws IOException {
                            return Boolean.valueOf(HBaseSink.this.table.getTableDescriptor().hasFamily(HBaseSink.this.columnFamily));
                        }
                    })).booleanValue()) {
                        throw new IOException("Table " + this.tableName + " has no such column family " + Bytes.toString(this.columnFamily));
                    }
                    super.start();
                    this.sinkCounter.incrementConnectionCreatedCount();
                    this.sinkCounter.start();
                } catch (Exception e) {
                    this.sinkCounter.incrementConnectionFailedCount();
                    throw new FlumeException("Error getting column family from HBase.Please verify that the table " + this.tableName + " and Column Family, " + Bytes.toString(this.columnFamily) + " exists in HBase, and the current user has permissions to access that table.", e);
                }
            } catch (Exception e2) {
                this.sinkCounter.incrementConnectionFailedCount();
                logger.error("Could not load table, " + this.tableName + " from HBase", e2);
                throw new FlumeException("Could not load table, " + this.tableName + " from HBase", e2);
            }
        } catch (Exception e3) {
            this.sinkCounter.incrementConnectionFailedCount();
            throw new FlumeException("Failed to login to HBase using provided credentials.", e3);
        }
    }

    public void stop() {
        try {
            if (this.table != null) {
                this.table.close();
            }
            this.table = null;
            this.sinkCounter.incrementConnectionClosedCount();
            this.sinkCounter.stop();
        } catch (IOException e) {
            throw new FlumeException("Error closing table.", e);
        }
    }

    public void configure(Context context) {
        if (!HBaseVersionCheck.hasVersionLessThan2(logger)) {
            throw new ConfigurationException("HBase major version number must be less than 2 for hbase-sink.");
        }
        this.tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE);
        String string = context.getString(HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
        this.batchSize = context.getLong(HBaseSinkConfigurationConstants.CONFIG_BATCHSIZE, new Long(100L)).longValue();
        this.serializerContext = new Context();
        this.eventSerializerType = context.getString(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER);
        Preconditions.checkNotNull(this.tableName, "Table name cannot be empty, please specify in configuration file");
        Preconditions.checkNotNull(string, "Column family cannot be empty, please specify in configuration file");
        if (this.eventSerializerType == null || this.eventSerializerType.isEmpty()) {
            this.eventSerializerType = "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer";
            logger.info("No serializer defined, Will use default");
        }
        this.serializerContext.putAll(context.getSubProperties(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
        this.columnFamily = string.getBytes(Charsets.UTF_8);
        try {
            this.serializer = (HbaseEventSerializer) Class.forName(this.eventSerializerType).newInstance();
            this.serializer.configure(this.serializerContext);
        } catch (Exception e) {
            logger.error("Could not instantiate event serializer.", e);
            Throwables.propagate(e);
        }
        this.kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB);
        this.kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL);
        this.enableWal = context.getBoolean(HBaseSinkConfigurationConstants.CONFIG_ENABLE_WAL, true).booleanValue();
        logger.info("The write to WAL option is set to: " + String.valueOf(this.enableWal));
        if (!this.enableWal) {
            logger.warn("HBase Sink's enableWal configuration is set to false. All writes to HBase will have WAL disabled, and any data in the memstore of this region in the Region Server could be lost!");
        }
        this.batchIncrements = context.getBoolean(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS).booleanValue();
        if (this.batchIncrements) {
            logger.info("Increment coalescing is enabled. Increments will be buffered.");
            this.refGetFamilyMap = reflectLookupGetFamilyMap();
        }
        String string2 = context.getString(HBaseSinkConfigurationConstants.ZK_QUORUM);
        Integer num = null;
        if (string2 != null && !string2.isEmpty()) {
            StringBuilder sb = new StringBuilder();
            logger.info("Using ZK Quorum: " + string2);
            String[] split = string2.split(",");
            int length = split.length;
            for (int i = 0; i < length; i++) {
                String[] split2 = split[i].split(":");
                sb.append(split2[0].trim());
                if (i != length - 1) {
                    sb.append(",");
                } else {
                    string2 = sb.toString();
                }
                if (split2[1] == null) {
                    throw new FlumeException("Expected client port for the ZK node!");
                }
                if (num == null) {
                    num = Integer.valueOf(Integer.parseInt(split2[1].trim()));
                } else if (!num.equals(Integer.valueOf(Integer.parseInt(split2[1].trim())))) {
                    throw new FlumeException("All Zookeeper nodes in the quorum must use the same client port.");
                }
            }
            if (num == null) {
                num = 2181;
            }
            this.config.set(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY, string2);
            this.config.setInt("hbase.zookeeper.property.clientPort", num.intValue());
        }
        String string3 = context.getString(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT);
        if (string3 != null && !string3.isEmpty()) {
            this.config.set("zookeeper.znode.parent", string3);
        }
        this.sinkCounter = new SinkCounter(getName());
    }

    public Configuration getConfig() {
        return this.config;
    }

    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        try {
            try {
                transaction.begin();
                if (this.serializer instanceof BatchAware) {
                    ((BatchAware) this.serializer).onBatchStart();
                }
                long j = 0;
                while (true) {
                    if (j >= this.batchSize) {
                        break;
                    }
                    Event take = channel.take();
                    if (take != null) {
                        this.serializer.initialize(take, this.columnFamily);
                        linkedList.addAll(this.serializer.getActions());
                        linkedList2.addAll(this.serializer.getIncrements());
                        j++;
                    } else if (j == 0) {
                        status = Sink.Status.BACKOFF;
                        this.sinkCounter.incrementBatchEmptyCount();
                    } else {
                        this.sinkCounter.incrementBatchUnderflowCount();
                    }
                }
                if (j == this.batchSize) {
                    this.sinkCounter.incrementBatchCompleteCount();
                }
                this.sinkCounter.addToEventDrainAttemptCount(j);
                putEventsAndCommit(linkedList, linkedList2, transaction);
                transaction.close();
            } catch (Throwable th) {
                try {
                    transaction.rollback();
                } catch (Exception e) {
                    logger.error("Exception in rollback. Rollback might not have been successful.", e);
                }
                logger.error("Failed to commit transaction.Transaction rolled back.", th);
                if (!(th instanceof Error) && !(th instanceof RuntimeException)) {
                    logger.error("Failed to commit transaction.Transaction rolled back.", th);
                    throw new EventDeliveryException("Failed to commit transaction.Transaction rolled back.", th);
                }
                logger.error("Failed to commit transaction.Transaction rolled back.", th);
                Throwables.propagate(th);
                transaction.close();
            }
            return status;
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    private void putEventsAndCommit(final List<Row> list, final List<Increment> list2, Transaction transaction) throws Exception {
        this.privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() { // from class: org.apache.flume.sink.hbase.HBaseSink.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public Void run() throws Exception {
                for (Put put : list) {
                    if (put instanceof Put) {
                        put.setWriteToWAL(HBaseSink.this.enableWal);
                    }
                    if (put instanceof Increment) {
                        ((Increment) put).setWriteToWAL(HBaseSink.this.enableWal);
                    }
                }
                HBaseSink.this.table.batch(list);
                return null;
            }
        });
        this.privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() { // from class: org.apache.flume.sink.hbase.HBaseSink.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public Void run() throws Exception {
                List<Increment> coalesceIncrements = HBaseSink.this.batchIncrements ? HBaseSink.this.coalesceIncrements(list2) : list2;
                if (HBaseSink.this.debugIncrCallback != null) {
                    HBaseSink.this.debugIncrCallback.onAfterCoalesce(coalesceIncrements);
                }
                for (Increment increment : coalesceIncrements) {
                    increment.setWriteToWAL(HBaseSink.this.enableWal);
                    HBaseSink.this.table.increment(increment);
                }
                return null;
            }
        });
        transaction.commit();
        this.sinkCounter.addToEventDrainSuccessCount(list.size());
    }

    @VisibleForTesting
    static Method reflectLookupGetFamilyMap() {
        Method method = null;
        for (String str : new String[]{"getFamilyMapOfLongs", "getFamilyMap"}) {
            try {
                method = Increment.class.getMethod(str, new Class[0]);
                if (method != null && method.getReturnType().equals(Map.class)) {
                    logger.debug("Using Increment.{} for coalesce", str);
                    break;
                }
            } catch (NoSuchMethodException e) {
                logger.debug("Increment.{} does not exist. Exception follows.", str, e);
            } catch (SecurityException e2) {
                logger.debug("No access to Increment.{}; Exception follows.", str, e2);
            }
        }
        if (method == null) {
            throw new UnsupportedOperationException("Cannot find Increment.getFamilyMap()");
        }
        return method;
    }

    private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment increment) {
        Preconditions.checkNotNull(this.refGetFamilyMap, "Increment.getFamilymap() not found");
        Preconditions.checkNotNull(increment, "Increment required");
        Map<byte[], NavigableMap<byte[], Long>> map = null;
        try {
            map = (Map) this.refGetFamilyMap.invoke(increment, new Object[0]);
        } catch (IllegalAccessException e) {
            logger.warn("Unexpected error calling getFamilyMap()", e);
            Throwables.propagate(e);
        } catch (InvocationTargetException e2) {
            logger.warn("Unexpected error calling getFamilyMap()", e2);
            Throwables.propagate(e2);
        }
        return map;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Increment> coalesceIncrements(Iterable<Increment> iterable) {
        Preconditions.checkNotNull(iterable, "List of Increments must not be null");
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        for (Increment increment : iterable) {
            byte[] row = increment.getRow();
            for (Map.Entry<byte[], NavigableMap<byte[], Long>> entry : getFamilyMap(increment).entrySet()) {
                byte[] key = entry.getKey();
                for (Map.Entry<byte[], Long> entry2 : entry.getValue().entrySet()) {
                    incrementCounter(newTreeMap, row, key, entry2.getKey(), entry2.getValue());
                }
            }
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        for (Map.Entry<byte[], Map<byte[], NavigableMap<byte[], Long>>> entry3 : newTreeMap.entrySet()) {
            byte[] key2 = entry3.getKey();
            Map<byte[], NavigableMap<byte[], Long>> value = entry3.getValue();
            Increment increment2 = new Increment(key2);
            for (Map.Entry<byte[], NavigableMap<byte[], Long>> entry4 : value.entrySet()) {
                byte[] key3 = entry4.getKey();
                for (Map.Entry<byte[], Long> entry5 : entry4.getValue().entrySet()) {
                    increment2.addColumn(key3, entry5.getKey(), entry5.getValue().longValue());
                }
            }
            newLinkedList.add(increment2);
        }
        return newLinkedList;
    }

    private void incrementCounter(Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> map, byte[] bArr, byte[] bArr2, byte[] bArr3, Long l) {
        Map<byte[], NavigableMap<byte[], Long>> map2 = map.get(bArr);
        if (map2 == null) {
            map2 = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            map.put(bArr, map2);
        }
        NavigableMap<byte[], Long> navigableMap = map2.get(bArr2);
        if (navigableMap == null) {
            navigableMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            map2.put(bArr2, navigableMap);
        }
        Long l2 = (Long) navigableMap.get(bArr3);
        if (l2 == null) {
            navigableMap.put(bArr3, l);
        } else {
            navigableMap.put(bArr3, Long.valueOf(l2.longValue() + l.longValue()));
        }
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    HbaseEventSerializer getSerializer() {
        return this.serializer;
    }

    public long getBatchSize() {
        return this.batchSize;
    }
}
