package org.apache.hadoop.metrics2.sink.storm;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.task.IErrorReporter;
import org.apache.storm.task.TopologyContext;

/* loaded from: input_file:org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.class */
public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
    private static final String[] METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE = {"-latency", "timems", "time_ms", "rate_secs", "timesecs"};
    private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = {".", "_"};
    private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\"";
    public static final int SYSTEM_TASK_ID = -1;
    public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
    public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
    public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset.";
    private String collectorUri;
    private TimelineMetricsCache metricsCache;
    private String hostname;
    private int timeoutSeconds;
    private Collection<String> collectorHosts;
    private String zkQuorum;
    private String protocol;
    private String port;
    private String topologyName;
    private String applicationId;
    private String instanceId;
    private boolean setInstanceId;
    private boolean hostInMemoryAggregationEnabled;
    private int hostInMemoryAggregationPort;
    private String hostInMemoryAggregationProtocol;

    protected String getCollectorUri(String str) {
        return constructTimelineMetricUri(this.protocol, str, this.port);
    }

    protected String getCollectorProtocol() {
        return this.protocol;
    }

    protected int getTimeoutSeconds() {
        return this.timeoutSeconds;
    }

    protected String getZookeeperQuorum() {
        return this.zkQuorum;
    }

    protected Collection<String> getConfiguredCollectorHosts() {
        return this.collectorHosts;
    }

    protected String getCollectorPort() {
        return this.port;
    }

    protected String getHostname() {
        return this.hostname;
    }

    protected boolean isHostInMemoryAggregationEnabled() {
        return this.hostInMemoryAggregationEnabled;
    }

    protected int getHostInMemoryAggregationPort() {
        return this.hostInMemoryAggregationPort;
    }

    protected String getHostInMemoryAggregationProtocol() {
        return this.hostInMemoryAggregationProtocol;
    }

    public void prepare(Map map, Object obj, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
        this.LOG.info("Preparing Storm Metrics Sink");
        try {
            this.hostname = InetAddress.getLocalHost().getHostName();
            if (this.hostname == null || !this.hostname.contains(".")) {
                this.hostname = InetAddress.getLocalHost().getCanonicalHostName();
            }
            Configuration configuration = new Configuration("/storm-metrics2.properties");
            this.timeoutSeconds = Integer.parseInt(configuration.getProperty("timeout", String.valueOf(10)));
            int parseInt = Integer.parseInt(configuration.getProperty("maxRowCacheSize", String.valueOf(10000)));
            int parseInt2 = Integer.parseInt(configuration.getProperty("sendInterval", String.valueOf(59000)));
            this.applicationId = configuration.getProperty("clusterReporterAppId", DEFAULT_CLUSTER_REPORTER_APP_ID);
            this.metricsCache = new TimelineMetricsCache(parseInt, parseInt2);
            this.collectorHosts = parseHostsStringIntoCollection(configuration.getProperty("collector.hosts"));
            this.zkQuorum = StringUtils.isEmpty(configuration.getProperty("metrics.zookeeper.quorum")) ? configuration.getProperty("zookeeper.quorum") : configuration.getProperty("metrics.zookeeper.quorum");
            this.protocol = configuration.getProperty("protocol", "http");
            this.port = configuration.getProperty("port", "6188");
            this.instanceId = configuration.getProperty("instanceId", (String) null);
            this.setInstanceId = Boolean.valueOf(configuration.getProperty("set.instanceId", "false")).booleanValue();
            this.hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty("host_in_memory_aggregation", "false")).booleanValue();
            this.hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty("host_in_memory_aggregation_port", "61888")).intValue();
            this.hostInMemoryAggregationProtocol = configuration.getProperty("host_in_memory_aggregation_protocol", "http");
            super.init();
            if (this.protocol.contains("https") || this.hostInMemoryAggregationProtocol.contains("https")) {
                loadTruststore(configuration.getProperty("truststore.path").trim(), configuration.getProperty("truststore.type").trim(), configuration.getProperty("truststore.password").trim());
            }
            this.topologyName = removeNonce(topologyContext.getStormId());
            warnIfTopologyNameContainsWarnString(this.topologyName);
        } catch (UnknownHostException e) {
            this.LOG.error("Could not identify hostname.");
            throw new RuntimeException("Could not identify hostname.", e);
        }
    }

    public void handleDataPoints(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> collection) {
        ArrayList arrayList = new ArrayList();
        for (IMetricsConsumer.DataPoint dataPoint : collection) {
            this.LOG.debug(dataPoint.name + " = " + dataPoint.value);
            for (IMetricsConsumer.DataPoint dataPoint2 : (taskInfo.srcTaskId == -1 && (dataPoint.value instanceof Collection)) ? applyAggregationToMetricNameKeyedDataPoints(groupByMetricNameDataPoints(populateAllDataPointValues(dataPoint))) : populateDataPoints(dataPoint)) {
                String createKafkaOffsetMetricName = dataPoint2.name.startsWith(METRIC_NAME_PREFIX_KAFKA_OFFSET) ? createKafkaOffsetMetricName(dataPoint2.name) : createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost, taskInfo.srcWorkerPort, taskInfo.srcTaskId, dataPoint2.name);
                this.LOG.debug("populated datapoint: " + createKafkaOffsetMetricName + " = " + dataPoint2.value);
                TimelineMetric createTimelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost, createKafkaOffsetMetricName, Double.valueOf(dataPoint2.value.toString()));
                this.metricsCache.putTimelineMetric(createTimelineMetric);
                TimelineMetric timelineMetric = this.metricsCache.getTimelineMetric(createTimelineMetric.getMetricName());
                if (timelineMetric != null) {
                    arrayList.add(timelineMetric);
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        TimelineMetrics timelineMetrics = new TimelineMetrics();
        timelineMetrics.setMetrics(arrayList);
        try {
            emitMetrics(timelineMetrics);
        } catch (UnableToConnectException e) {
            this.LOG.warn("Unable to send metrics to collector by address:" + e.getConnectUrl());
        }
    }

    public void cleanup() {
        this.LOG.info("Stopping Storm Metrics Sink");
    }

    void setTopologyName(String str) {
        this.topologyName = str;
    }

    private String removeNonce(String str) {
        return str.substring(0, str.substring(0, str.lastIndexOf("-")).lastIndexOf("-"));
    }

    private List<IMetricsConsumer.DataPoint> populateAllDataPointValues(IMetricsConsumer.DataPoint dataPoint) {
        ArrayList arrayList = new ArrayList();
        Iterator it = ((Collection) dataPoint.value).iterator();
        while (it.hasNext()) {
            for (IMetricsConsumer.DataPoint dataPoint2 : populateDataPoints(new IMetricsConsumer.DataPoint(dataPoint.name, it.next()))) {
                if (dataPoint2.value != null) {
                    arrayList.add(dataPoint2);
                }
            }
        }
        return arrayList;
    }

    private List<IMetricsConsumer.DataPoint> populateDataPoints(IMetricsConsumer.DataPoint dataPoint) {
        ArrayList arrayList = new ArrayList();
        if (dataPoint.value == null) {
            this.LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name);
        } else if (dataPoint.value instanceof Map) {
            for (Map.Entry entry : ((Map) dataPoint.value).entrySet()) {
                Double convertValueToDouble = convertValueToDouble((String) entry.getKey(), entry.getValue());
                if (convertValueToDouble != null) {
                    arrayList.add(new IMetricsConsumer.DataPoint(dataPoint.name + "." + ((String) entry.getKey()), convertValueToDouble));
                }
            }
        } else {
            Double convertValueToDouble2 = convertValueToDouble(dataPoint.name, dataPoint.value);
            if (convertValueToDouble2 != null) {
                arrayList.add(new IMetricsConsumer.DataPoint(dataPoint.name, convertValueToDouble2));
            }
        }
        return arrayList;
    }

    private Double convertValueToDouble(String str, Object obj) {
        if (obj instanceof Number) {
            return Double.valueOf(((Number) obj).doubleValue());
        }
        if (!(obj instanceof String)) {
            this.LOG.warn("Data point with name " + str + " has value " + obj + " which is not supported. Discarding.");
            return null;
        }
        try {
            return Double.valueOf(Double.parseDouble((String) obj));
        } catch (NumberFormatException e) {
            this.LOG.warn("Data point with name " + str + " doesn't have number format value " + obj + ". Discarding.");
            return null;
        }
    }

    private Map<String, List<Double>> groupByMetricNameDataPoints(List<IMetricsConsumer.DataPoint> list) {
        HashMap hashMap = new HashMap();
        for (IMetricsConsumer.DataPoint dataPoint : list) {
            List list2 = (List) hashMap.get(dataPoint.name);
            if (list2 == null) {
                list2 = new ArrayList();
                hashMap.put(dataPoint.name, list2);
            }
            list2.add(Double.valueOf(dataPoint.value.toString()));
        }
        return hashMap;
    }

    private List<IMetricsConsumer.DataPoint> applyAggregationToMetricNameKeyedDataPoints(Map<String, List<Double>> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<Double>> entry : map.entrySet()) {
            String key = entry.getKey();
            arrayList.add(new IMetricsConsumer.DataPoint(key, applyAggregateFunction(key, entry.getValue())));
        }
        return arrayList;
    }

    private Double applyAggregateFunction(String str, List<Double> list) {
        String lowerCase = str.toLowerCase();
        for (String str2 : METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE) {
            if (lowerCase.contains(str2)) {
                return calculateAverage(list);
            }
        }
        return calculateSummation(list);
    }

    private Double calculateSummation(List<Double> list) {
        Double valueOf = Double.valueOf(0.0d);
        Iterator<Double> it = list.iterator();
        while (it.hasNext()) {
            valueOf = Double.valueOf(valueOf.doubleValue() + it.next().doubleValue());
        }
        return valueOf;
    }

    private Double calculateAverage(List<Double> list) {
        return list.isEmpty() ? Double.valueOf(0.0d) : Double.valueOf(calculateSummation(list).doubleValue() / list.size());
    }

    private TimelineMetric createTimelineMetric(long j, String str, String str2, Double d) {
        TimelineMetric timelineMetric = new TimelineMetric();
        timelineMetric.setMetricName(str2);
        timelineMetric.setHostName(str);
        if (this.setInstanceId) {
            timelineMetric.setInstanceId(this.instanceId);
        }
        timelineMetric.setAppId(this.applicationId);
        timelineMetric.setStartTime(j);
        timelineMetric.setType(ClassUtils.getShortCanonicalName(d, "Number"));
        timelineMetric.getMetricValues().put(Long.valueOf(j), d);
        return timelineMetric;
    }

    private String createMetricName(String str, String str2, int i, int i2, String str3) {
        return ("topology." + this.topologyName + "." + str + "." + str2 + "." + i + "." + i2 + "." + str3).replace('_', '-');
    }

    private String createKafkaOffsetMetricName(String str) {
        String[] split = str.substring(METRIC_NAME_PREFIX_KAFKA_OFFSET.length()).split("/");
        if (split.length == 1) {
            throw new IllegalArgumentException("Unknown metrics for kafka offset metric: " + str);
        }
        String str2 = "topology." + this.topologyName + ".kafka-topic." + split[0];
        return (split.length > 2 ? str2 + "." + split[1] + "." + split[2] : str2 + "." + split[1]).replace('_', '-');
    }

    private void warnIfTopologyNameContainsWarnString(String str) {
        for (String str2 : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) {
            if (str.contains(str2)) {
                this.LOG.warn("Topology name \"" + str + "\" contains \"" + str2 + "\" which can be problematic for AMS.");
                this.LOG.warn("Encouraged to not using any of these strings: \".\", \"_\"");
                this.LOG.warn("Same suggestion applies to component name.");
            }
        }
    }

    public void setMetricsCache(TimelineMetricsCache timelineMetricsCache) {
        this.metricsCache = timelineMetricsCache;
    }
}
