package org.apache.ambari.logfeeder.output;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedTransferQueue;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/ambari/logfeeder/output/OutputKafka.class */
public class OutputKafka extends Output<LogFeederProps, InputFileMarker> {
    private static final Logger LOG = Logger.getLogger(OutputKafka.class);
    private static final int FAILED_RETRY_INTERVAL = 30;
    private static final int CATCHUP_RETRY_INTERVAL = 5;
    private static final int DEFAULT_BATCH_SIZE = 5000;
    private static final int DEFAULT_LINGER_MS = 1000;
    private String topic = null;
    private boolean isAsync = true;
    private long messageCount = 0;
    private KafkaProducer<String, String> producer = null;
    private BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue();
    private boolean isKafkaBrokerUp = false;
    private LogFeederProps logFeederProps;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ambari/logfeeder/output/OutputKafka$KafkaCallBack.class */
    public class KafkaCallBack implements Callback {
        private long thisMessageNumber;
        private OutputKafka output;
        private String message;
        private InputMarker inputMarker;

        public KafkaCallBack(OutputKafka outputKafka, String str, InputMarker inputMarker, long j) {
            this.output = null;
            this.thisMessageNumber = j;
            this.output = outputKafka;
            this.inputMarker = inputMarker;
            this.message = str;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (recordMetadata == null) {
                this.output.isKafkaBrokerUp = false;
                LogFeederUtil.logErrorMessageByInterval(getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR", "Error sending message to Kafka. Async Callback", exc, OutputKafka.LOG, Level.ERROR);
                this.output.failedMessages.add(this);
            } else {
                if (!this.output.isKafkaBrokerUp) {
                    OutputKafka.LOG.info("Started writing to kafka. " + this.output.getShortDescription());
                    this.output.isKafkaBrokerUp = true;
                }
                this.output.incrementStat(1);
                this.output.writeBytesMetric.value += this.message.length();
            }
        }
    }

    public String getStatMetricName() {
        return "output.kafka.write_logs";
    }

    public String getWriteBytesMetricName() {
        return "output.kafka.write_bytes";
    }

    public void init(LogFeederProps logFeederProps) throws Exception {
        this.logFeederProps = logFeederProps;
        this.producer = creteKafkaProducer(initProperties());
        createKafkaRetryThread();
    }

    private Properties initProperties() throws Exception {
        Object obj;
        String stringValue = getStringValue("broker_list");
        if (StringUtils.isEmpty(stringValue)) {
            throw new Exception("For kafka output, bootstrap broker_list is needed");
        }
        this.topic = getStringValue("topic");
        if (StringUtils.isEmpty(this.topic)) {
            throw new Exception("For kafka output, topic is needed");
        }
        this.isAsync = getBooleanValue("is_async", true).booleanValue();
        int intValue = getIntValue("batch_size", Integer.valueOf(DEFAULT_BATCH_SIZE)).intValue();
        int intValue2 = getIntValue("linger_ms", 1000).intValue();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", stringValue);
        properties.put("client.id", "logfeeder_producer");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        properties.put("compression.type", "snappy");
        properties.put("batch.size", Integer.valueOf(intValue));
        properties.put("linger.ms", Integer.valueOf(intValue2));
        for (String str : getConfigs().keySet()) {
            if (str.startsWith("kafka.") && (obj = getConfigs().get(str)) != null && obj.toString().length() != 0) {
                String substring = str.substring("kafka.".length());
                LOG.info("Adding custom Kafka property. " + substring + "=" + obj);
                properties.put(substring, obj);
            }
        }
        return properties;
    }

    protected KafkaProducer<String, String> creteKafkaProducer(Properties properties) {
        return new KafkaProducer<>(properties);
    }

    private void createKafkaRetryThread() {
        Thread thread = new Thread("kafka-writer-retry,topic=" + this.topic) { // from class: org.apache.ambari.logfeeder.output.OutputKafka.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                KafkaCallBack kafkaCallBack = null;
                OutputKafka.LOG.info("Started thread to monitor failed messsages. " + OutputKafka.this.getShortDescription());
                while (true) {
                    if (kafkaCallBack == null) {
                        try {
                            kafkaCallBack = (KafkaCallBack) OutputKafka.this.failedMessages.take();
                        } catch (Throwable th) {
                            LogFeederUtil.logErrorMessageByInterval(getClass().getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR", "Error sending message to Kafka during retry. message=" + (kafkaCallBack == null ? null : kafkaCallBack.message), th, OutputKafka.LOG, Level.ERROR);
                        }
                    }
                    if (OutputKafka.this.publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) {
                        kafkaCallBack = null;
                    } else {
                        OutputKafka.LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for " + OutputKafka.FAILED_RETRY_INTERVAL + " seconds");
                        Thread.sleep(30000L);
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }

    public synchronized void write(String str, InputFileMarker inputFileMarker) throws Exception {
        while (!isDrain() && !inputFileMarker.getInput().isDrain()) {
            try {
                if (this.failedMessages.size() == 0 && publishMessage(str, inputFileMarker)) {
                    return;
                }
                if (isDrain() || inputFileMarker.getInput().isDrain()) {
                    return;
                }
                if (this.isKafkaBrokerUp) {
                    LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages=" + this.failedMessages.size() + " Going to sleep for 5 seconds");
                    Thread.sleep(5000L);
                } else {
                    LOG.error("Kafka is down. Going to sleep for 30 seconds");
                    Thread.sleep(30000L);
                }
            } catch (Throwable th) {
                return;
            }
        }
    }

    public void setDrain(boolean z) {
        super.setDrain(z);
    }

    public void flush() {
        LOG.info("Flush called...");
        setDrain(true);
    }

    public void close() {
        LOG.info("Closing Kafka client...");
        flush();
        if (this.producer != null) {
            try {
                this.producer.close();
            } catch (Throwable th) {
                LOG.error("Error closing Kafka topic. topic=" + this.topic);
            }
        }
        LOG.info("Closed Kafka client");
        super.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean publishMessage(String str, InputMarker inputMarker) {
        if (this.isAsync && this.isKafkaBrokerUp) {
            KafkaProducer<String, String> kafkaProducer = this.producer;
            ProducerRecord producerRecord = new ProducerRecord(this.topic, str);
            long j = this.messageCount + 1;
            this.messageCount = j;
            kafkaProducer.send(producerRecord, new KafkaCallBack(this, str, inputMarker, j));
            return true;
        }
        try {
            if (((RecordMetadata) this.producer.send(new ProducerRecord(this.topic, str)).get()) != null) {
                this.statMetric.value++;
                this.writeBytesMetric.value += str.length();
            }
            if (this.isKafkaBrokerUp) {
                return true;
            }
            LOG.info("Started writing to kafka. " + getShortDescription());
            this.isKafkaBrokerUp = true;
            return true;
        } catch (InterruptedException e) {
            this.isKafkaBrokerUp = false;
            LogFeederUtil.logErrorMessageByInterval(getClass().getSimpleName() + "_KAFKA_INTERRUPT", "InterruptedException-Error sending message to Kafka", e, LOG, Level.ERROR);
            return false;
        } catch (ExecutionException e2) {
            this.isKafkaBrokerUp = false;
            LogFeederUtil.logErrorMessageByInterval(getClass().getSimpleName() + "_KAFKA_EXECUTION", "ExecutionException-Error sending message to Kafka", e2, LOG, Level.ERROR);
            return false;
        } catch (Throwable th) {
            this.isKafkaBrokerUp = false;
            LogFeederUtil.logErrorMessageByInterval(getClass().getSimpleName() + "_KAFKA_WRITE_ERROR", "GenericException-Error sending message to Kafka", th, LOG, Level.ERROR);
            return false;
        }
    }

    public String getShortDescription() {
        return "output:destination=kafka,topic=" + this.topic;
    }

    public void copyFile(File file, InputMarker inputMarker) throws UnsupportedOperationException {
        throw new UnsupportedOperationException("copyFile method is not yet supported for output=kafka");
    }

    public String getOutputType() {
        throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
    }

    public Long getPendingCount() {
        return 0L;
    }
}
