package id.onyx.obdp.metrics.core.timeline.availability;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import id.onyx.obdp.metrics.core.timeline.MetricsSystemInitializationException;
import id.onyx.obdp.metrics.core.timeline.TimelineMetricConfiguration;
import id.onyx.obdp.metrics.core.timeline.availability.AggregationTaskRunner;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.OnlineOfflineSMD;

/* loaded from: input_file:id/onyx/obdp/metrics/core/timeline/availability/MetricCollectorHAController.class */
public class MetricCollectorHAController {
    private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);

    @VisibleForTesting
    static final String CLUSTER_NAME = "obdp-metrics-cluster";

    @VisibleForTesting
    static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";

    @VisibleForTesting
    static final String DEFAULT_STATE_MODEL = "OnlineOffline";
    private static final String INSTANCE_NAME_DELIMITER = "_";
    private static final int PARTITION_NUMBER = 2;
    private static final int REPLICATION_FACTOR = 1;

    @VisibleForTesting
    final String zkConnectUrl;
    private final String instanceHostname;
    private final InstanceConfig instanceConfig;
    private final AggregationTaskRunner aggregationTaskRunner;

    @VisibleForTesting
    HelixAdmin admin;
    private HelixManager manager;
    private final List<String> liveInstanceNames = new ArrayList(PARTITION_NUMBER);
    private final LiveInstanceTracker liveInstanceTracker = new LiveInstanceTracker();
    private volatile boolean isInitialized = false;

    /* loaded from: input_file:id/onyx/obdp/metrics/core/timeline/availability/MetricCollectorHAController$LiveInstanceTracker.class */
    public final class LiveInstanceTracker implements LiveInstanceChangeListener {
        private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        private final Joiner joiner = Joiner.on(", ").skipNulls();

        public LiveInstanceTracker() {
        }

        public void onLiveInstanceChange(List<LiveInstance> list, NotificationContext notificationContext) {
            MetricCollectorHAController.this.liveInstanceNames.clear();
            Iterator<LiveInstance> it = list.iterator();
            while (it.hasNext()) {
                MetricCollectorHAController.this.liveInstanceNames.add(it.next().getInstanceName());
            }
            MetricCollectorHAController.LOG.info(String.format("Detected change in liveliness of Collector instances. LiveInstances = %s", this.joiner.join(MetricCollectorHAController.this.liveInstanceNames)));
            this.executorService.schedule(() -> {
                MetricCollectorHAController.this.printClusterState();
            }, 30L, TimeUnit.SECONDS);
        }

        public void shutdown() {
            this.executorService.shutdown();
        }
    }

    public MetricCollectorHAController(TimelineMetricConfiguration timelineMetricConfiguration) {
        try {
            this.instanceHostname = timelineMetricConfiguration.getInstanceHostnameFromEnv();
            String instancePort = timelineMetricConfiguration.getInstancePort();
            try {
                String clusterZKClientPort = timelineMetricConfiguration.getClusterZKClientPort();
                String clusterZKQuorum = timelineMetricConfiguration.getClusterZKQuorum();
                if (StringUtils.isEmpty(clusterZKClientPort) || StringUtils.isEmpty(clusterZKQuorum)) {
                    throw new Exception(String.format("Unable to parse zookeeper quorum. clientPort = %s, quorum = %s", clusterZKClientPort, clusterZKQuorum));
                }
                this.zkConnectUrl = timelineMetricConfiguration.getZkConnectionUrl(clusterZKClientPort, clusterZKQuorum);
                this.instanceConfig = new InstanceConfig(this.instanceHostname + "_" + instancePort);
                this.instanceConfig.setHostName(this.instanceHostname);
                this.instanceConfig.setPort(instancePort);
                this.instanceConfig.setInstanceEnabled(true);
                this.aggregationTaskRunner = new AggregationTaskRunner(this.instanceConfig.getInstanceName(), this.zkConnectUrl, CLUSTER_NAME);
            } catch (Exception e) {
                LOG.error("Unable to load hbase-site from classpath.", e);
                throw new MetricsSystemInitializationException(e.getMessage(), e);
            }
        } catch (Exception e2) {
            LOG.error("Error reading configs from classpath, will resort to defaults.", e2);
            throw new MetricsSystemInitializationException(e2.getMessage());
        }
    }

    public void initializeHAController() throws Exception {
        this.admin = new ZKHelixAdmin(this.zkConnectUrl);
        LOG.info(String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME));
        LOG.info(String.format("Was cluster added successfully? %s", Boolean.valueOf(this.admin.addCluster(CLUSTER_NAME, false))));
        boolean z = false;
        for (int i = 0; i < 5 && !z; i += REPLICATION_FACTOR) {
            try {
                if (!this.admin.getInstancesInCluster(CLUSTER_NAME).contains(this.instanceConfig.getInstanceName())) {
                    LOG.info(String.format("Adding participant instance %s", this.instanceConfig));
                    this.admin.addInstance(CLUSTER_NAME, this.instanceConfig);
                }
                z = REPLICATION_FACTOR;
            } catch (HelixException | ZkNoNodeException e) {
                LOG.warn("Helix Cluster not yet setup fully.");
                if (i < 5 - REPLICATION_FACTOR) {
                    LOG.info(String.format("Waiting for %d seconds and retrying.", 5));
                    TimeUnit.SECONDS.sleep(5);
                } else {
                    LOG.error(e);
                }
            }
        }
        if (!z) {
            LOG.info(String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME));
            this.admin.addCluster(CLUSTER_NAME, true);
            if (!this.admin.getInstancesInCluster(CLUSTER_NAME).contains(this.instanceConfig.getInstanceName())) {
                LOG.info(String.format("Adding participant instance %s", this.instanceConfig));
                this.admin.addInstance(CLUSTER_NAME, this.instanceConfig);
            }
        }
        if (this.admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
            LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
            this.admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build());
        }
        if (!this.admin.getResourcesInCluster(CLUSTER_NAME).contains(METRIC_AGGREGATORS)) {
            LOG.info(String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, Integer.valueOf(PARTITION_NUMBER), Integer.valueOf(REPLICATION_FACTOR)));
            this.admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, DEFAULT_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.toString());
        }
        this.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR);
        startAggregators();
        startController();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            shutdownHAController();
        }));
        this.isInitialized = true;
    }

    public boolean isInitialized() {
        return this.isInitialized;
    }

    private void startAggregators() {
        try {
            this.aggregationTaskRunner.initialize();
        } catch (Exception e) {
            LOG.error("Unable to start aggregators.", e);
            throw new MetricsSystemInitializationException(e.getMessage(), e);
        }
    }

    private void startController() throws Exception {
        this.manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, this.instanceHostname, InstanceType.CONTROLLER, this.zkConnectUrl);
        this.manager.connect();
        this.manager.addLiveInstanceChangeListener(this.liveInstanceTracker);
    }

    public void shutdownHAController() {
        if (this.isInitialized) {
            LOG.info("Shooting down Metrics Collector's HAController.");
            this.manager.removeListener(new PropertyKey.Builder(CLUSTER_NAME).liveInstances(), this.liveInstanceTracker);
            this.liveInstanceTracker.shutdown();
            this.aggregationTaskRunner.stop();
            this.manager.disconnect();
            this.admin.close();
            this.isInitialized = false;
            LOG.info("Shutdown of Metrics Collector's HAController finished.");
        }
    }

    public AggregationTaskRunner getAggregationTaskRunner() {
        return this.aggregationTaskRunner;
    }

    public List<String> getLiveInstanceHostNames() {
        ArrayList arrayList = new ArrayList(PARTITION_NUMBER);
        Iterator<String> it = this.liveInstanceNames.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().split(INSTANCE_NAME_DELIMITER)[0]);
        }
        return arrayList;
    }

    public void printClusterState() {
        StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
        ExternalView resourceExternalView = this.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
        if (resourceExternalView != null) {
            getPrintableResourceState(resourceExternalView, sb);
        }
        sb.append("\n##################################################");
        LOG.info(sb.toString());
    }

    private void getPrintableResourceState(ExternalView externalView, StringBuilder sb) {
        TreeSet treeSet = new TreeSet(externalView.getPartitionSet());
        sb.append("\nCLUSTER: ");
        sb.append(CLUSTER_NAME);
        sb.append("\nRESOURCE: ");
        sb.append(METRIC_AGGREGATORS);
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            sb.append("\nPARTITION: ");
            sb.append(str).append("\t");
            for (Map.Entry entry : externalView.getStateMap(str).entrySet()) {
                sb.append("\t");
                sb.append((String) entry.getKey());
                sb.append("\t");
                sb.append((String) entry.getValue());
            }
        }
    }

    public Map<String, String> getAggregationSummary() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        CheckpointManager checkpointManager = this.aggregationTaskRunner.getCheckpointManager();
        linkedHashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE)).toString());
        linkedHashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY)).toString());
        linkedHashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY)).toString());
        linkedHashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND)).toString());
        linkedHashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE)).toString());
        linkedHashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY)).toString());
        linkedHashMap.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY)).toString());
        return linkedHashMap;
    }
}
