package org.apache.storm.hive.bolt;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.storm.Config;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.hive.common.HiveUtils;
import org.apache.storm.hive.common.HiveWriter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.BatchHelper;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hive/bolt/HiveBolt.class */
public class HiveBolt extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);

    @VisibleForTesting
    Map<HiveEndPoint, HiveWriter> allWriters;
    private OutputCollector collector;
    private HiveOptions options;
    private ExecutorService callTimeoutPool;
    private transient Timer heartBeatTimer;
    private AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    private UserGroupInformation ugi = null;
    private BatchHelper batchHelper;
    private boolean tokenAuthEnabled;

    public HiveBolt(HiveOptions hiveOptions) {
        this.options = hiveOptions;
    }

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        try {
            this.tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(map);
            try {
                this.ugi = HiveUtils.authenticate(this.tokenAuthEnabled, this.options.getKerberosKeytab(), this.options.getKerberosPrincipal());
                this.collector = outputCollector;
                this.batchHelper = new BatchHelper(this.options.getBatchSize().intValue(), outputCollector);
                this.allWriters = new ConcurrentHashMap();
                this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("hive-bolt-%d").build());
                this.sendHeartBeat.set(true);
                this.heartBeatTimer = new Timer(topologyContext.getThisTaskId() + "-hb-timer", true);
                setupHeartBeatTimer();
            } catch (HiveUtils.AuthenticationFailed e) {
                LOG.error("Hive kerberos authentication failed " + e.getMessage(), e);
                throw new IllegalArgumentException(e);
            }
        } catch (Exception e2) {
            LOG.warn("unable to make connection to hive ", e2);
        }
    }

    public void execute(Tuple tuple) {
        try {
            if (this.batchHelper.shouldHandle(tuple)) {
                getOrCreateWriter(HiveUtils.makeEndPoint(this.options.getMapper().mapPartitions(tuple), this.options)).write(this.options.getMapper().mapRecord(tuple));
                this.batchHelper.addBatch(tuple);
            }
            if (this.batchHelper.shouldFlush()) {
                flushAllWriters(true);
                LOG.info("acknowledging tuples after writers flushed ");
                this.batchHelper.ack();
            }
            if (TupleUtils.isTick(tuple)) {
                retireIdleWriters();
            }
        } catch (SerializationError e) {
            LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple);
            this.collector.reportError(e);
            this.collector.ack(tuple);
        } catch (Exception e2) {
            this.batchHelper.fail(e2);
            abortAndCloseWriters();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void cleanup() {
        this.sendHeartBeat.set(false);
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().flushAndClose();
            } catch (Exception e) {
                LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        for (ExecutorService executorService : new ExecutorService[]{this.callTimeoutPool}) {
            executorService.shutdown();
            while (!executorService.isTerminated()) {
                try {
                    executorService.awaitTermination(this.options.getCallTimeOut().intValue(), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOG.warn("shutdown interrupted on " + executorService, e2);
                }
            }
        }
        this.callTimeoutPool = null;
        if (this.heartBeatTimer != null) {
            this.heartBeatTimer.cancel();
        }
        super.cleanup();
        LOG.info("Hive Bolt stopped");
    }

    public Map<String, Object> getComponentConfiguration() {
        Config componentConfiguration = super.getComponentConfiguration();
        if (componentConfiguration == null) {
            componentConfiguration = new Config();
        }
        if (this.options.getTickTupleInterval().intValue() > 0) {
            componentConfiguration.put("topology.tick.tuple.freq.secs", this.options.getTickTupleInterval());
        }
        return componentConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupHeartBeatTimer() {
        if (this.options.getHeartBeatInterval().intValue() > 0) {
            this.heartBeatTimer.schedule(new TimerTask() { // from class: org.apache.storm.hive.bolt.HiveBolt.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        if (HiveBolt.this.sendHeartBeat.get()) {
                            HiveBolt.LOG.debug("Start sending heartbeat on all writers");
                            HiveBolt.this.sendHeartBeatOnAllWriters();
                            HiveBolt.this.setupHeartBeatTimer();
                        }
                    } catch (Exception e) {
                        HiveBolt.LOG.warn("Failed to heartbeat on HiveWriter ", e);
                    }
                }
            }, this.options.getHeartBeatInterval().intValue() * 1000);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendHeartBeatOnAllWriters() throws InterruptedException {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().heartBeat();
        }
    }

    void flushAllWriters(boolean z) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().flush(z);
        }
    }

    void abortAndCloseWriters() {
        try {
            abortAllWriters();
            closeAllWriters();
        } catch (Exception e) {
            LOG.warn("unable to close hive connections. ", e);
        }
    }

    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().abort();
            } catch (Exception e) {
                LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", e);
            }
        }
    }

    private void closeAllWriters() {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().close();
            } catch (Exception e) {
                LOG.warn("unable to close writers. ", e);
            }
        }
        this.allWriters.clear();
    }

    @VisibleForTesting
    HiveWriter getOrCreateWriter(HiveEndPoint hiveEndPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        try {
            HiveWriter hiveWriter = this.allWriters.get(hiveEndPoint);
            if (hiveWriter == null) {
                LOG.debug("Creating Writer to Hive end point : " + hiveEndPoint);
                hiveWriter = HiveUtils.makeHiveWriter(hiveEndPoint, this.callTimeoutPool, this.ugi, this.options, this.tokenAuthEnabled);
                if (this.allWriters.size() > this.options.getMaxOpenConnections().intValue() - 1) {
                    LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", Integer.valueOf(this.allWriters.size()), this.options.getMaxOpenConnections());
                    if (retireIdleWriters() == 0) {
                        retireEldestWriter();
                    }
                }
                this.allWriters.put(hiveEndPoint, hiveWriter);
                HiveUtils.logAllHiveEndPoints(this.allWriters);
            }
            return hiveWriter;
        } catch (HiveWriter.ConnectFailure e) {
            LOG.error("Failed to create HiveWriter for endpoint: " + hiveEndPoint, e);
            throw e;
        }
    }

    private void retireEldestWriter() {
        LOG.info("Attempting close eldest writers");
        long currentTimeMillis = System.currentTimeMillis();
        HiveEndPoint hiveEndPoint = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() < currentTimeMillis) {
                hiveEndPoint = entry.getKey();
                currentTimeMillis = entry.getValue().getLastUsed();
            }
        }
        try {
            LOG.info("Closing least used Writer to Hive end point : " + hiveEndPoint);
            this.allWriters.remove(hiveEndPoint).flushAndClose();
        } catch (IOException e) {
            LOG.warn("Failed to close writer for end point: " + hiveEndPoint, e);
        } catch (InterruptedException e2) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
            Thread.currentThread().interrupt();
        } catch (Exception e3) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e3);
        }
    }

    private int retireIdleWriters() {
        LOG.info("Attempting close idle writers");
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (currentTimeMillis - entry.getValue().getLastUsed() > this.options.getIdleTimeout().intValue()) {
                i++;
                retire(entry.getKey());
            }
        }
        return i;
    }

    private void retire(HiveEndPoint hiveEndPoint) {
        try {
            HiveWriter remove = this.allWriters.remove(hiveEndPoint);
            if (remove != null) {
                LOG.info("Closing idle Writer to Hive end point : {}", hiveEndPoint);
                remove.flushAndClose();
            }
        } catch (IOException e) {
            LOG.warn("Failed to close writer for end point: {}. Error: " + hiveEndPoint, e);
        } catch (InterruptedException e2) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
            Thread.currentThread().interrupt();
        } catch (Exception e3) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e3);
        }
    }
}
