package org.opensearch.ml.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.transport.sync.MLSyncUpAction;
import org.opensearch.ml.common.transport.sync.MLSyncUpInput;
import org.opensearch.ml.common.transport.sync.MLSyncUpNodeResponse;
import org.opensearch.ml.common.transport.sync.MLSyncUpNodesRequest;
import org.opensearch.ml.engine.encryptor.Encryptor;
import org.opensearch.ml.indices.MLIndicesHandler;
import org.opensearch.ml.utils.RestActionUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;

/* loaded from: input_file:org/opensearch/ml/cluster/MLSyncUpCron.class */
public class MLSyncUpCron implements Runnable {

    @Generated
    private static final Logger log = LogManager.getLogger(MLSyncUpCron.class);
    public static final int DEPLOY_MODEL_TASK_GRACE_TIME_IN_MS = 20000;
    private Client client;
    private ClusterService clusterService;
    private DiscoveryNodeHelper nodeHelper;
    private MLIndicesHandler mlIndicesHandler;
    private Encryptor encryptor;

    @VisibleForTesting
    Semaphore updateModelStateSemaphore = new Semaphore(1);
    private volatile Boolean mlConfigInited = false;

    public MLSyncUpCron(Client client, ClusterService clusterService, DiscoveryNodeHelper discoveryNodeHelper, MLIndicesHandler mLIndicesHandler, Encryptor encryptor) {
        this.client = client;
        this.clusterService = clusterService;
        this.nodeHelper = discoveryNodeHelper;
        this.mlIndicesHandler = mLIndicesHandler;
        this.encryptor = encryptor;
    }

    @Override // java.lang.Runnable
    public void run() {
        initMLConfig();
        if (this.clusterService.state().metadata().indices().containsKey(".plugins-ml-model")) {
            log.debug("ML sync job starts");
            DiscoveryNode[] allNodes = this.nodeHelper.getAllNodes();
            this.client.execute(MLSyncUpAction.INSTANCE, new MLSyncUpNodesRequest(allNodes, MLSyncUpInput.builder().getDeployedModels(true).build()), ActionListener.wrap(mLSyncUpNodesResponse -> {
                List<MLSyncUpNodeResponse> nodes = mLSyncUpNodesResponse.getNodes();
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                HashMap hashMap3 = new HashMap();
                for (MLSyncUpNodeResponse mLSyncUpNodeResponse : nodes) {
                    String id = mLSyncUpNodeResponse.getNode().getId();
                    String[] deployedModelIds = mLSyncUpNodeResponse.getDeployedModelIds();
                    if (deployedModelIds != null && deployedModelIds.length > 0) {
                        for (String str : deployedModelIds) {
                            ((Set) hashMap.computeIfAbsent(str, str2 -> {
                                return new HashSet();
                            })).add(id);
                        }
                    }
                    String[] runningDeployModelIds = mLSyncUpNodeResponse.getRunningDeployModelIds();
                    if (runningDeployModelIds != null && runningDeployModelIds.length > 0) {
                        for (String str3 : runningDeployModelIds) {
                            ((Set) hashMap3.computeIfAbsent(str3, str4 -> {
                                return new HashSet();
                            })).add(id);
                        }
                    }
                    String[] runningDeployModelTaskIds = mLSyncUpNodeResponse.getRunningDeployModelTaskIds();
                    if (runningDeployModelTaskIds != null && runningDeployModelTaskIds.length > 0) {
                        for (String str5 : runningDeployModelTaskIds) {
                            ((Set) hashMap2.computeIfAbsent(str5, str6 -> {
                                return new HashSet();
                            })).add(id);
                        }
                    }
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    log.debug("will sync model worker nodes for model: {}: {}", (String) entry.getKey(), ((Set) entry.getValue()).toArray(new String[0]));
                }
                for (Map.Entry entry2 : hashMap2.entrySet()) {
                    log.debug("will sync running task: {}: {}", entry2.getKey(), ((Set) entry2.getValue()).toArray(new String[0]));
                }
                MLSyncUpInput.MLSyncUpInputBuilder runningDeployModelTasks = MLSyncUpInput.builder().syncRunningDeployModelTasks(true).runningDeployModelTasks(hashMap2);
                if (hashMap.size() == 0) {
                    log.debug("No deployed model found. Will clear model routing on all nodes");
                    runningDeployModelTasks.clearRoutingTable(true);
                } else {
                    runningDeployModelTasks.modelRoutingTable(hashMap);
                }
                this.client.execute(MLSyncUpAction.INSTANCE, new MLSyncUpNodesRequest(allNodes, runningDeployModelTasks.build()), ActionListener.wrap(mLSyncUpNodesResponse -> {
                    log.debug("sync model routing job finished");
                }, exc -> {
                    log.error("Failed to sync model routing", exc);
                }));
                this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(bool -> {
                    refreshModelState(hashMap, hashMap3);
                }, exc2 -> {
                    log.error("Failed to init model index", exc2);
                }));
            }, exc -> {
                log.error("Failed to sync model routing", exc);
            }));
        }
    }

    @VisibleForTesting
    void initMLConfig() {
        if (this.mlConfigInited.booleanValue()) {
            return;
        }
        this.mlIndicesHandler.initMLConfigIndex(ActionListener.wrap(bool -> {
            GetRequest id = new GetRequest(".plugins-ml-config").id("master_key");
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                this.client.get(id, ActionListener.wrap(getResponse -> {
                    if (getResponse.isExists()) {
                        this.encryptor.setMasterKey((String) getResponse.getSourceAsMap().get("master_key"));
                        this.mlConfigInited = true;
                        log.info("ML configuration already initialized, no action needed");
                        return;
                    }
                    IndexRequest id2 = new IndexRequest(".plugins-ml-config").id("master_key");
                    String generateMasterKey = this.encryptor.generateMasterKey();
                    id2.source(ImmutableMap.of("master_key", generateMasterKey, "create_time", Long.valueOf(Instant.now().toEpochMilli())));
                    id2.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                    this.client.index(id2, ActionListener.wrap(indexResponse -> {
                        log.info("ML configuration initialized successfully");
                        this.encryptor.setMasterKey(generateMasterKey);
                        this.mlConfigInited = true;
                    }, exc -> {
                        log.debug("Failed to save ML encryption master key", exc);
                    }));
                }, exc -> {
                    log.debug("Failed to get ML encryption master key", exc);
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } catch (Throwable th) {
                if (stashContext != null) {
                    try {
                        stashContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, exc -> {
            log.debug("Failed to init ML config index", exc);
        }));
    }

    @VisibleForTesting
    void refreshModelState(Map<String, Set<String>> map, Map<String, Set<String>> map2) {
        if (this.updateModelStateSemaphore.tryAcquire()) {
            try {
                SearchRequest searchRequest = new SearchRequest(new String[]{".plugins-ml-model"});
                BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
                boolQueryBuilder.filter(new TermsQueryBuilder("model_state", Arrays.asList(MLModelState.LOADING.name(), MLModelState.PARTIALLY_LOADED.name(), MLModelState.LOADED.name(), MLModelState.LOAD_FAILED.name(), MLModelState.DEPLOYING.name(), MLModelState.PARTIALLY_DEPLOYED.name(), MLModelState.DEPLOYED.name(), MLModelState.DEPLOY_FAILED.name())));
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchSourceBuilder.query(boolQueryBuilder);
                searchSourceBuilder.size(10000);
                searchSourceBuilder.fetchSource(new String[]{"model_state", RestActionUtils.PARAMETER_ALGORITHM, "deploy_to_all_nodes", "planning_worker_nodes", "planning_worker_node_count", "last_updated_time", "current_worker_node_count"}, (String[]) null);
                searchRequest.source(searchSourceBuilder);
                this.client.search(searchRequest, ActionListener.wrap(searchResponse -> {
                    SearchHit[] hits = searchResponse.getHits().getHits();
                    Map<String, MLModelState> hashMap = new HashMap<>();
                    Map<String, List<String>> hashMap2 = new HashMap<>();
                    for (SearchHit searchHit : hits) {
                        String id = searchHit.getId();
                        Map sourceAsMap = searchHit.getSourceAsMap();
                        FunctionName from = FunctionName.from((String) sourceAsMap.get(RestActionUtils.PARAMETER_ALGORITHM));
                        MLModelState from2 = MLModelState.from((String) sourceAsMap.get("model_state"));
                        Long l = sourceAsMap.containsKey("last_updated_time") ? (Long) sourceAsMap.get("last_updated_time") : null;
                        int intValue = sourceAsMap.containsKey("planning_worker_node_count") ? ((Integer) sourceAsMap.get("planning_worker_node_count")).intValue() : 0;
                        int intValue2 = sourceAsMap.containsKey("current_worker_node_count") ? ((Integer) sourceAsMap.get("current_worker_node_count")).intValue() : 0;
                        boolean booleanValue = sourceAsMap.containsKey("deploy_to_all_nodes") ? ((Boolean) sourceAsMap.get("deploy_to_all_nodes")).booleanValue() : false;
                        List arrayList = sourceAsMap.containsKey("planning_worker_nodes") ? (List) sourceAsMap.get("planning_worker_nodes") : new ArrayList();
                        if (booleanValue) {
                            DiscoveryNode[] eligibleNodes = this.nodeHelper.getEligibleNodes(from);
                            intValue = eligibleNodes.length;
                            List<String> list = (List) Arrays.asList(eligibleNodes).stream().map(discoveryNode -> {
                                return discoveryNode.getId();
                            }).collect(Collectors.toList());
                            if (list.size() != arrayList.size() || !list.containsAll(arrayList)) {
                                hashMap2.put(id, list);
                            }
                        }
                        MLModelState newModelState = getNewModelState(map2, map, id, from2, l, intValue, intValue2);
                        if (newModelState != null) {
                            hashMap.put(id, newModelState);
                        }
                    }
                    bulkUpdateModelState(map, hashMap, hashMap2);
                }, exc -> {
                    this.updateModelStateSemaphore.release();
                    log.error("Failed to search models", exc);
                }));
            } catch (Exception e) {
                this.updateModelStateSemaphore.release();
                log.error("Failed to refresh model state", e);
            }
        }
    }

    private MLModelState getNewModelState(Map<String, Set<String>> map, Map<String, Set<String>> map2, String str, MLModelState mLModelState, Long l, int i, int i2) {
        Set<String> set = map.get(str);
        if (set != null && set.size() > 0 && mLModelState != MLModelState.DEPLOYING) {
            return MLModelState.DEPLOYING;
        }
        int size = map2.containsKey(str) ? map2.get(str).size() : 0;
        if (size == 0 && mLModelState != MLModelState.DEPLOY_FAILED && (mLModelState != MLModelState.DEPLOYING || l == null || l.longValue() + 20000 <= Instant.now().toEpochMilli())) {
            return MLModelState.DEPLOY_FAILED;
        }
        if (size <= 0) {
            return null;
        }
        if (size < i && (mLModelState != MLModelState.PARTIALLY_DEPLOYED || i2 != size)) {
            return MLModelState.PARTIALLY_DEPLOYED;
        }
        if (i <= 0 || size < i || mLModelState == MLModelState.DEPLOYED) {
            return null;
        }
        if (size > i) {
            log.warn("Model {} deployed on more nodes [{}] than planning worker node [{}]", str, Integer.valueOf(size), Integer.valueOf(i));
        }
        return MLModelState.DEPLOYED;
    }

    private void bulkUpdateModelState(Map<String, Set<String>> map, Map<String, MLModelState> map2, Map<String, List<String>> map3) {
        HashSet<String> hashSet = new HashSet();
        hashSet.addAll(map2.keySet());
        hashSet.addAll(map3.keySet());
        if (hashSet.size() <= 0) {
            this.updateModelStateSemaphore.release();
            return;
        }
        BulkRequest bulkRequest = new BulkRequest();
        for (String str : hashSet) {
            UpdateRequest updateRequest = new UpdateRequest();
            Instant now = Instant.now();
            ImmutableMap.Builder builder = ImmutableMap.builder();
            if (map2.containsKey(str)) {
                builder.put("model_state", map2.get(str).name());
            }
            if (map3.containsKey(str)) {
                builder.put("planning_worker_nodes", map3.get(str));
                builder.put("planning_worker_node_count", Integer.valueOf(map3.get(str).size()));
            }
            builder.put("last_updated_time", Long.valueOf(now.toEpochMilli()));
            Set<String> set = map.get(str);
            builder.put("current_worker_node_count", Integer.valueOf(set == null ? 0 : set.size()));
            updateRequest.index(".plugins-ml-model").id(str).doc(builder.build());
            bulkRequest.add(updateRequest);
        }
        log.info("Refresh model state: {}", map2);
        this.client.bulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
            this.updateModelStateSemaphore.release();
            log.debug("Refresh model state successfully");
        }, exc -> {
            this.updateModelStateSemaphore.release();
            log.error("Failed to bulk update model state", exc);
        }));
    }
}
