package org.apache.ambari.metrics.core.timeline.availability;

import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;

/* loaded from: input_file:org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.class */
public class MetricCollectorHAController {
    private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
    static final String CLUSTER_NAME = "ambari-metrics-cluster";
    static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
    static final String DEFAULT_STATE_MODEL = "OnlineOffline";
    static final String INSTANCE_NAME_DELIMITER = "_";
    final String zkConnectUrl;
    final String instanceHostname;
    final InstanceConfig instanceConfig;
    final AggregationTaskRunner aggregationTaskRunner;
    final TimelineMetricConfiguration configuration;
    HelixAdmin admin;
    HelixManager manager;
    final List<String> liveInstanceNames = new ArrayList();
    private volatile boolean isInitialized = false;

    /* loaded from: input_file:org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController$HelixController.class */
    public class HelixController extends GenericHelixController {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        Joiner joiner = Joiner.on(", ").skipNulls();

        public HelixController() {
        }

        public void onLiveInstanceChange(List<LiveInstance> list, NotificationContext notificationContext) {
            super.onLiveInstanceChange(list, notificationContext);
            MetricCollectorHAController.this.liveInstanceNames.clear();
            Iterator<LiveInstance> it = list.iterator();
            while (it.hasNext()) {
                MetricCollectorHAController.this.liveInstanceNames.add(it.next().getInstanceName());
            }
            MetricCollectorHAController.LOG.info("Detected change in liveliness of Collector instances. LiveIsntances = " + this.joiner.join(MetricCollectorHAController.this.liveInstanceNames));
            this.executorService.schedule(new Runnable() { // from class: org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.HelixController.1
                @Override // java.lang.Runnable
                public void run() {
                    MetricCollectorHAController.this.printClusterState();
                }
            }, 30L, TimeUnit.SECONDS);
        }
    }

    public MetricCollectorHAController(TimelineMetricConfiguration timelineMetricConfiguration) {
        this.configuration = timelineMetricConfiguration;
        try {
            this.instanceHostname = timelineMetricConfiguration.getInstanceHostnameFromEnv();
            String instancePort = timelineMetricConfiguration.getInstancePort();
            try {
                String clusterZKClientPort = timelineMetricConfiguration.getClusterZKClientPort();
                String clusterZKQuorum = timelineMetricConfiguration.getClusterZKQuorum();
                if (StringUtils.isEmpty(clusterZKClientPort) || StringUtils.isEmpty(clusterZKQuorum)) {
                    throw new Exception("Unable to parse zookeeper quorum. clientPort = " + clusterZKClientPort + ", quorum = " + clusterZKQuorum);
                }
                this.zkConnectUrl = timelineMetricConfiguration.getZkConnectionUrl(clusterZKClientPort, clusterZKQuorum);
                this.instanceConfig = new InstanceConfig(this.instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
                this.instanceConfig.setHostName(this.instanceHostname);
                this.instanceConfig.setPort(instancePort);
                this.instanceConfig.setInstanceEnabled(true);
                this.aggregationTaskRunner = new AggregationTaskRunner(this.instanceConfig.getInstanceName(), this.zkConnectUrl, getClusterName());
            } catch (Exception e) {
                LOG.error("Unable to load hbase-site from classpath.", e);
                throw new MetricsSystemInitializationException(e.getMessage());
            }
        } catch (Exception e2) {
            LOG.error("Error reading configs from classpath, will resort to defaults.", e2);
            throw new MetricsSystemInitializationException(e2.getMessage());
        }
    }

    public String getClusterName() {
        return CLUSTER_NAME;
    }

    public void initializeHAController() throws Exception {
        String clusterName = getClusterName();
        this.admin = new ZKHelixAdmin(this.zkConnectUrl);
        LOG.info("Creating zookeeper cluster node: " + clusterName);
        LOG.info("Was cluster added successfully? " + this.admin.addCluster(clusterName, false));
        boolean z = false;
        for (int i = 0; i < 5 && !z; i++) {
            try {
                List instancesInCluster = this.admin.getInstancesInCluster(clusterName);
                if (CollectionUtils.isEmpty(instancesInCluster) || !instancesInCluster.contains(this.instanceConfig.getInstanceName())) {
                    LOG.info("Adding participant instance " + this.instanceConfig);
                    this.admin.addInstance(clusterName, this.instanceConfig);
                }
                z = true;
            } catch (HelixException | ZkNoNodeException e) {
                LOG.warn("Helix Cluster not yet setup fully.");
                if (i < 5 - 1) {
                    LOG.info("Waiting for 5 seconds and retrying.");
                    TimeUnit.SECONDS.sleep(5);
                } else {
                    LOG.error(e);
                }
            }
        }
        if (!z) {
            LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
            this.admin.addCluster(clusterName, true);
            List instancesInCluster2 = this.admin.getInstancesInCluster(clusterName);
            if (CollectionUtils.isEmpty(instancesInCluster2) || !instancesInCluster2.contains(this.instanceConfig.getInstanceName())) {
                LOG.info("Adding participant instance " + this.instanceConfig);
                this.admin.addInstance(clusterName, this.instanceConfig);
            }
        }
        if (this.admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
            LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
            this.admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
        }
        if (!this.admin.getResourcesInCluster(clusterName).contains(METRIC_AGGREGATORS)) {
            LOG.info("Adding resource METRIC_AGGREGATORS with 2 partitions and 1 replicas");
            this.admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.toString());
        }
        this.admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
        startAggregators();
        startController();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                MetricCollectorHAController.this.aggregationTaskRunner.stop();
                MetricCollectorHAController.this.manager.disconnect();
            }
        });
        this.isInitialized = true;
    }

    public boolean isInitialized() {
        return this.isInitialized;
    }

    private void startAggregators() {
        try {
            this.aggregationTaskRunner.initialize();
        } catch (Exception e) {
            LOG.error("Unable to start aggregators.", e);
            throw new MetricsSystemInitializationException(e.getMessage());
        }
    }

    private void startController() throws Exception {
        this.manager = HelixManagerFactory.getZKHelixManager(getClusterName(), this.instanceHostname, InstanceType.CONTROLLER, this.zkConnectUrl);
        this.manager.connect();
        this.manager.addLiveInstanceChangeListener(new HelixController());
    }

    public AggregationTaskRunner getAggregationTaskRunner() {
        return this.aggregationTaskRunner;
    }

    public List<String> getLiveInstanceHostNames() {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.liveInstanceNames.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().split(INSTANCE_NAME_DELIMITER)[0]);
        }
        return arrayList;
    }

    public void printClusterState() {
        StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
        ExternalView resourceExternalView = this.admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
        if (resourceExternalView != null) {
            getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
        }
        sb.append("\n##################################################");
        LOG.info(sb.toString());
    }

    private void getPrintableResourceState(ExternalView externalView, String str, StringBuilder sb) {
        TreeSet treeSet = new TreeSet(externalView.getPartitionSet());
        sb.append("\nCLUSTER: ");
        sb.append(getClusterName());
        sb.append("\nRESOURCE: ");
        sb.append(str);
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            sb.append("\nPARTITION: ");
            sb.append(str2).append("\t");
            for (Map.Entry entry : externalView.getStateMap(str2).entrySet()) {
                sb.append("\t");
                sb.append((String) entry.getKey());
                sb.append("\t");
                sb.append((String) entry.getValue());
            }
        }
    }

    public Map<String, String> getAggregationSummary() {
        HashMap hashMap = new HashMap();
        CheckpointManager checkpointManager = this.aggregationTaskRunner.getCheckpointManager();
        hashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE)).toString());
        hashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY)).toString());
        hashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND)).toString());
        hashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE)).toString());
        hashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY)).toString());
        return hashMap;
    }
}
