package id.onyx.obdp.metrics.core.timeline.sink;

import com.fasterxml.jackson.databind.ObjectMapper;
import id.onyx.obdp.metrics.core.timeline.TimelineMetricConfiguration;
import id.onyx.obdp.metrics.core.timeline.source.InternalSourceProvider;
import java.util.Collection;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:id/onyx/obdp/metrics/core/timeline/sink/KafkaSinkProvider.class */
public class KafkaSinkProvider implements ExternalSinkProvider {
    private static String TOPIC_NAME = "obdp-metrics-topic";
    private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class);
    private Producer producer;
    private int TIMEOUT_SECONDS;
    private int FLUSH_SECONDS;
    ObjectMapper objectMapper = new ObjectMapper();

    /* loaded from: input_file:id/onyx/obdp/metrics/core/timeline/sink/KafkaSinkProvider$KafkaRawMetricsSink.class */
    class KafkaRawMetricsSink implements ExternalMetricsSink {
        KafkaRawMetricsSink() {
        }

        @Override // id.onyx.obdp.metrics.core.timeline.sink.ExternalMetricsSink
        public int getSinkTimeOutSeconds() {
            return KafkaSinkProvider.this.TIMEOUT_SECONDS;
        }

        @Override // id.onyx.obdp.metrics.core.timeline.sink.ExternalMetricsSink
        public int getFlushSeconds() {
            return KafkaSinkProvider.this.FLUSH_SECONDS;
        }

        @Override // id.onyx.obdp.metrics.core.timeline.sink.ExternalMetricsSink
        public void sinkMetricData(Collection<TimelineMetrics> collection) {
            KafkaSinkProvider.this.producer.send(new ProducerRecord(KafkaSinkProvider.TOPIC_NAME, KafkaSinkProvider.this.objectMapper.valueToTree(collection)));
        }
    }

    public KafkaSinkProvider() {
        this.TIMEOUT_SECONDS = 10;
        this.FLUSH_SECONDS = 3;
        TimelineMetricConfiguration timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
        Properties properties = new Properties();
        try {
            properties.put("bootstrap.servers", timelineMetricConfiguration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_SERVERS));
            properties.put("acks", timelineMetricConfiguration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_ACKS, "all"));
            properties.put("retries", Integer.valueOf(timelineMetricConfiguration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_RETRIES, 0)));
            properties.put("batch.size", Integer.valueOf(timelineMetricConfiguration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_BATCH_SIZE, 128)));
            properties.put("linger.ms", Integer.valueOf(timelineMetricConfiguration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_LINGER_MS, 1)));
            properties.put("buffer.memory", Long.valueOf(timelineMetricConfiguration.getMetricsConf().getLong(TimelineMetricConfiguration.KAFKA_BUFFER_MEM, 33554432L)));
            this.FLUSH_SECONDS = timelineMetricConfiguration.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
            this.TIMEOUT_SECONDS = timelineMetricConfiguration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS, 10);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer");
            this.producer = new KafkaProducer(properties);
        } catch (Exception e) {
            LOG.error("Configuration error!", e);
            throw new ExceptionInInitializerError(e);
        }
    }

    @Override // id.onyx.obdp.metrics.core.timeline.sink.ExternalSinkProvider
    public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME source_name) {
        switch (source_name) {
            case RAW_METRICS:
                return new KafkaRawMetricsSink();
            default:
                throw new UnsupportedOperationException("Provider does not support the expected source " + source_name);
        }
    }
}
