package org.opensearch.ml.model;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import java.io.File;
import java.nio.file.Path;
import java.security.PrivilegedActionException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.TokenBucket;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.MLTaskType;
import org.opensearch.ml.common.connector.Connector;
import org.opensearch.ml.common.controller.MLController;
import org.opensearch.ml.common.controller.MLRateLimiter;
import org.opensearch.ml.common.exception.MLException;
import org.opensearch.ml.common.exception.MLLimitExceededException;
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
import org.opensearch.ml.common.exception.MLValidationException;
import org.opensearch.ml.common.model.Guardrails;
import org.opensearch.ml.common.model.MLGuard;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.transport.deploy.MLDeployModelAction;
import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest;
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.register.MLRegisterModelResponse;
import org.opensearch.ml.common.transport.upload_chunk.MLRegisterModelMetaInput;
import org.opensearch.ml.common.utils.StringUtils;
import org.opensearch.ml.engine.MLEngine;
import org.opensearch.ml.engine.MLExecutable;
import org.opensearch.ml.engine.ModelHelper;
import org.opensearch.ml.engine.Predictable;
import org.opensearch.ml.engine.indices.MLIndicesHandler;
import org.opensearch.ml.engine.utils.FileUtils;
import org.opensearch.ml.plugin.MachineLearningPlugin;
import org.opensearch.ml.profile.MLModelProfile;
import org.opensearch.ml.settings.MLCommonsSettings;
import org.opensearch.ml.settings.MLFeatureEnabledSetting;
import org.opensearch.ml.stats.ActionName;
import org.opensearch.ml.stats.MLActionLevelStat;
import org.opensearch.ml.stats.MLNodeLevelStat;
import org.opensearch.ml.stats.MLStats;
import org.opensearch.ml.task.MLTaskManager;
import org.opensearch.ml.utils.MLExceptionUtils;
import org.opensearch.ml.utils.MLNodeUtils;
import org.opensearch.ml.utils.RestActionUtils;
import org.opensearch.script.ScriptService;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.threadpool.ThreadPool;

/* loaded from: input_file:org/opensearch/ml/model/MLModelManager.class */
public class MLModelManager {
    public static final int TIMEOUT_IN_MILLIS = 5000;
    public static final long MODEL_FILE_SIZE_LIMIT = 4294967296L;
    private final Client client;
    private final ClusterService clusterService;
    private final ScriptService scriptService;
    private ThreadPool threadPool;
    private NamedXContentRegistry xContentRegistry;
    private ModelHelper modelHelper;
    private final MLModelCacheHelper modelCacheHelper;
    private final MLStats mlStats;
    private final MLCircuitBreakerService mlCircuitBreakerService;
    private final MLIndicesHandler mlIndicesHandler;
    private final MLTaskManager mlTaskManager;
    private final MLEngine mlEngine;
    private final DiscoveryNodeHelper nodeHelper;
    private final MLFeatureEnabledSetting mlFeatureEnabledSetting;
    private volatile Integer maxModelPerNode;
    private volatile Integer maxRegisterTasksPerNode;
    private volatile Integer maxDeployTasksPerNode;
    private volatile Integer maxBatchInferenceTasks;
    private volatile Integer maxBatchIngestionTasks;

    @Generated
    private static final Logger log = LogManager.getLogger(MLModelManager.class);
    public static final ImmutableSet MODEL_DONE_STATES = ImmutableSet.of(MLModelState.TRAINED, MLModelState.REGISTERED, MLModelState.DEPLOYED, MLModelState.PARTIALLY_DEPLOYED, MLModelState.DEPLOY_FAILED, MLModelState.UNDEPLOYED, new MLModelState[0]);

    public MLModelManager(ClusterService clusterService, ScriptService scriptService, Client client, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry, ModelHelper modelHelper, Settings settings, MLStats mLStats, MLCircuitBreakerService mLCircuitBreakerService, MLIndicesHandler mLIndicesHandler, MLTaskManager mLTaskManager, MLModelCacheHelper mLModelCacheHelper, MLEngine mLEngine, DiscoveryNodeHelper discoveryNodeHelper, MLFeatureEnabledSetting mLFeatureEnabledSetting) {
        this.client = client;
        this.threadPool = threadPool;
        this.xContentRegistry = namedXContentRegistry;
        this.modelHelper = modelHelper;
        this.clusterService = clusterService;
        this.scriptService = scriptService;
        this.modelCacheHelper = mLModelCacheHelper;
        this.mlStats = mLStats;
        this.mlCircuitBreakerService = mLCircuitBreakerService;
        this.mlIndicesHandler = mLIndicesHandler;
        this.mlTaskManager = mLTaskManager;
        this.mlEngine = mLEngine;
        this.nodeHelper = discoveryNodeHelper;
        this.mlFeatureEnabledSetting = mLFeatureEnabledSetting;
        this.maxModelPerNode = (Integer) MLCommonsSettings.ML_COMMONS_MAX_MODELS_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_MODELS_PER_NODE, num -> {
            this.maxModelPerNode = num;
        });
        this.maxRegisterTasksPerNode = (Integer) MLCommonsSettings.ML_COMMONS_MAX_REGISTER_MODEL_TASKS_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_REGISTER_MODEL_TASKS_PER_NODE, num2 -> {
            this.maxRegisterTasksPerNode = num2;
        });
        this.maxDeployTasksPerNode = (Integer) MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE, num3 -> {
            this.maxDeployTasksPerNode = num3;
        });
        this.maxBatchInferenceTasks = (Integer) MLCommonsSettings.ML_COMMONS_MAX_BATCH_INFERENCE_TASKS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_BATCH_INFERENCE_TASKS, num4 -> {
            this.maxBatchInferenceTasks = num4;
        });
        this.maxBatchIngestionTasks = (Integer) MLCommonsSettings.ML_COMMONS_MAX_BATCH_INGESTION_TASKS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MAX_BATCH_INGESTION_TASKS, num5 -> {
            this.maxBatchIngestionTasks = num5;
        });
    }

    public void registerModelMeta(MLRegisterModelMetaInput mLRegisterModelMetaInput, ActionListener<String> actionListener) {
        try {
            FunctionName functionName = mLRegisterModelMetaInput.getFunctionName();
            this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
            this.mlStats.createCounterStatIfAbsent(functionName, ActionName.REGISTER, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
            String modelGroupId = mLRegisterModelMetaInput.getModelGroupId();
            if (Strings.isBlank(modelGroupId)) {
                uploadMLModelMeta(mLRegisterModelMetaInput, "1", actionListener);
            } else {
                try {
                    ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
                    try {
                        ActionListener runBefore = ActionListener.runBefore(actionListener, () -> {
                            stashContext.restore();
                        });
                        this.client.get(new GetRequest(".plugins-ml-model-group").id(modelGroupId), ActionListener.wrap(getResponse -> {
                            if (!getResponse.isExists()) {
                                log.error("Model group not found");
                                runBefore.onFailure(new MLResourceNotFoundException("Fail to find model group"));
                            } else {
                                Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
                                int incrementLatestVersion = incrementLatestVersion(sourceAsMap);
                                this.client.update(createUpdateModelGroupRequest(sourceAsMap, modelGroupId, getResponse.getSeqNo(), getResponse.getPrimaryTerm(), incrementLatestVersion), ActionListener.wrap(updateResponse -> {
                                    uploadMLModelMeta(mLRegisterModelMetaInput, incrementLatestVersion, runBefore);
                                }, exc -> {
                                    log.error("Failed to update model group", exc);
                                    runBefore.onFailure(exc);
                                }));
                            }
                        }, exc -> {
                            if (exc instanceof IndexNotFoundException) {
                                runBefore.onFailure(new MLResourceNotFoundException("Fail to find model group"));
                            } else {
                                log.error("Failed to get model group", exc);
                                runBefore.onFailure(new MLValidationException("Failed to get model group"));
                            }
                        }));
                        if (stashContext != null) {
                            stashContext.close();
                        }
                    } catch (Throwable th) {
                        if (stashContext != null) {
                            try {
                                stashContext.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    log.error("Failed to register model", e);
                    actionListener.onFailure(e);
                }
            }
        } catch (Exception e2) {
            log.error("Failed to init model index", e2);
            actionListener.onFailure(e2);
        }
    }

    private void uploadMLModelMeta(MLRegisterModelMetaInput mLRegisterModelMetaInput, String str, ActionListener<String> actionListener) {
        FunctionName functionName = mLRegisterModelMetaInput.getFunctionName();
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                ActionListener runBefore = ActionListener.runBefore(actionListener, () -> {
                    stashContext.restore();
                });
                String name = mLRegisterModelMetaInput.getName();
                this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(bool -> {
                    if (!bool.booleanValue()) {
                        runBefore.onFailure(new RuntimeException("No response to create ML Model index"));
                        return;
                    }
                    Instant now = Instant.now();
                    MLModel build = MLModel.builder().name(name).algorithm(functionName).version(str).modelGroupId(mLRegisterModelMetaInput.getModelGroupId()).description(mLRegisterModelMetaInput.getDescription()).isEnabled(mLRegisterModelMetaInput.getIsEnabled()).rateLimiter(mLRegisterModelMetaInput.getRateLimiter()).isEnabled(mLRegisterModelMetaInput.getIsEnabled()).modelFormat(mLRegisterModelMetaInput.getModelFormat()).modelState(MLModelState.REGISTERING).modelConfig(mLRegisterModelMetaInput.getModelConfig()).deploySetting(mLRegisterModelMetaInput.getDeploySetting()).totalChunks(mLRegisterModelMetaInput.getTotalChunks()).modelContentHash(mLRegisterModelMetaInput.getModelContentHashValue()).modelContentSizeInBytes(mLRegisterModelMetaInput.getModelContentSizeInBytes()).isHidden(mLRegisterModelMetaInput.getIsHidden()).modelInterface(mLRegisterModelMetaInput.getModelInterface()).createdTime(now).lastUpdateTime(now).build();
                    IndexRequest indexRequest = new IndexRequest(".plugins-ml-model");
                    if (mLRegisterModelMetaInput.getIsHidden() != null && mLRegisterModelMetaInput.getIsHidden().booleanValue()) {
                        indexRequest.id(name);
                    }
                    indexRequest.source(build.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                    indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                    this.client.index(indexRequest, ActionListener.wrap(indexResponse -> {
                        log.debug("Index model meta doc successfully {}", name);
                        runBefore.onResponse(indexResponse.getId());
                    }, exc -> {
                        deleteOrUpdateModelGroup(mLRegisterModelMetaInput.getModelGroupId(), mLRegisterModelMetaInput.getDoesVersionCreateModelGroup(), str);
                        log.error("Failed to index model meta doc", exc);
                        runBefore.onFailure(exc);
                    }));
                }, exc -> {
                    log.error("Failed to init model index", exc);
                    runBefore.onFailure(exc);
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to register model", e);
            actionListener.onFailure(e);
        }
    }

    public void registerMLRemoteModel(MLRegisterModelInput mLRegisterModelInput, MLTask mLTask, ActionListener<MLRegisterModelResponse> actionListener) {
        try {
            try {
                ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
                try {
                    checkAndAddRunningTask(mLTask, this.maxRegisterTasksPerNode);
                    this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
                    this.mlStats.createCounterStatIfAbsent(mLTask.getFunctionName(), ActionName.REGISTER, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                    this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).increment();
                    String modelGroupId = mLRegisterModelInput.getModelGroupId();
                    this.client.get(new GetRequest(".plugins-ml-model-group").id(modelGroupId), ActionListener.wrap(getResponse -> {
                        if (!getResponse.isExists()) {
                            log.error("Model group response is empty");
                            handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), new MLValidationException("Model group not found"));
                            actionListener.onFailure(new MLResourceNotFoundException("Model Group Response is empty for " + modelGroupId));
                        } else {
                            Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
                            int incrementLatestVersion = incrementLatestVersion(sourceAsMap);
                            this.client.update(createUpdateModelGroupRequest(sourceAsMap, modelGroupId, getResponse.getSeqNo(), getResponse.getPrimaryTerm(), incrementLatestVersion), ActionListener.wrap(updateResponse -> {
                                indexRemoteModel(mLRegisterModelInput, mLTask, incrementLatestVersion, actionListener);
                            }, exc -> {
                                log.error("Failed to update model group " + modelGroupId, exc);
                                handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), exc);
                                actionListener.onFailure(exc);
                            }));
                        }
                    }, exc -> {
                        if (exc instanceof IndexNotFoundException) {
                            log.error("Model group Index is missing");
                            handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), new MLResourceNotFoundException("Failed to get model group due to index missing"));
                            actionListener.onFailure(exc);
                        } else {
                            log.error("Failed to get model group", exc);
                            handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), exc);
                            actionListener.onFailure(exc);
                        }
                    }));
                    if (stashContext != null) {
                        stashContext.close();
                    }
                    this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
                } catch (Throwable th) {
                    if (stashContext != null) {
                        try {
                            stashContext.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
                throw th3;
            }
        } catch (Exception e) {
            log.error("Failed to register remote model", e);
            handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), e);
            actionListener.onFailure(e);
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
        }
    }

    public void registerMLModel(MLRegisterModelInput mLRegisterModelInput, MLTask mLTask) {
        checkAndAddRunningTask(mLTask, this.maxRegisterTasksPerNode);
        try {
            try {
                this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
                this.mlStats.createCounterStatIfAbsent(mLTask.getFunctionName(), ActionName.REGISTER, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).increment();
                String modelGroupId = mLRegisterModelInput.getModelGroupId();
                GetRequest id = new GetRequest(".plugins-ml-model-group").id(modelGroupId);
                try {
                    ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
                    try {
                        this.client.get(id, ActionListener.runBefore(ActionListener.wrap(getResponse -> {
                            if (!getResponse.isExists()) {
                                log.error("Model group not found");
                                handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), new MLValidationException("Model group not found"));
                                return;
                            }
                            Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
                            int incrementLatestVersion = incrementLatestVersion(sourceAsMap);
                            UpdateRequest createUpdateModelGroupRequest = createUpdateModelGroupRequest(sourceAsMap, modelGroupId, getResponse.getSeqNo(), getResponse.getPrimaryTerm(), incrementLatestVersion);
                            ThreadContext.StoredContext stashContext2 = this.client.threadPool().getThreadContext().stashContext();
                            try {
                                this.client.update(createUpdateModelGroupRequest, ActionListener.wrap(updateResponse -> {
                                    uploadModel(mLRegisterModelInput, mLTask, incrementLatestVersion);
                                }, exc -> {
                                    log.error("Failed to update model group", exc);
                                    handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), exc);
                                }));
                                if (stashContext2 != null) {
                                    stashContext2.close();
                                }
                            } catch (Throwable th) {
                                if (stashContext2 != null) {
                                    try {
                                        stashContext2.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        }, exc -> {
                            if (exc instanceof IndexNotFoundException) {
                                handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), new MLResourceNotFoundException("Failed to get model group"));
                            } else {
                                log.error("Failed to get model group", exc);
                                handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), exc);
                            }
                        }), () -> {
                            stashContext.restore();
                        }));
                        if (stashContext != null) {
                            stashContext.close();
                        }
                    } catch (Throwable th) {
                        if (stashContext != null) {
                            try {
                                stashContext.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    log.error("Failed to register model", e);
                    handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), e);
                }
                this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
            } catch (Exception e2) {
                handleException(mLRegisterModelInput.getFunctionName(), mLTask.getTaskId(), e2);
                this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
            }
        } catch (Throwable th3) {
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
            throw th3;
        }
    }

    private UpdateRequest createUpdateModelGroupRequest(Map<String, Object> map, String str, long j, long j2, int i) {
        map.put("latest_version", Integer.valueOf(i));
        map.put("last_updated_time", Long.valueOf(Instant.now().toEpochMilli()));
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index(".plugins-ml-model-group").id(str).setIfSeqNo(j).setIfPrimaryTerm(j2).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).doc(map);
        return updateRequest;
    }

    private int incrementLatestVersion(Map<String, Object> map) {
        return ((Integer) map.get("latest_version")).intValue() + 1;
    }

    private void indexRemoteModel(MLRegisterModelInput mLRegisterModelInput, MLTask mLTask, String str, ActionListener<MLRegisterModelResponse> actionListener) {
        String taskId = mLTask.getTaskId();
        FunctionName functionName = mLTask.getFunctionName();
        ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
        try {
            String modelName = mLRegisterModelInput.getModelName();
            String version = str == null ? mLRegisterModelInput.getVersion() : str;
            Instant now = Instant.now();
            if (mLRegisterModelInput.getConnector() != null) {
                Connector connector = mLRegisterModelInput.getConnector();
                MLEngine mLEngine = this.mlEngine;
                Objects.requireNonNull(mLEngine);
                connector.encrypt(mLEngine::encrypt);
            }
            this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(bool -> {
                if (!bool.booleanValue()) {
                    actionListener.onFailure(new RuntimeException("No response to create ML Model index"));
                    return;
                }
                MLModel build = MLModel.builder().name(modelName).algorithm(functionName).modelGroupId(mLRegisterModelInput.getModelGroupId()).version(version).description(mLRegisterModelInput.getDescription()).rateLimiter(mLRegisterModelInput.getRateLimiter()).isEnabled(mLRegisterModelInput.getIsEnabled()).modelFormat(mLRegisterModelInput.getModelFormat()).modelState(MLModelState.REGISTERED).connector(mLRegisterModelInput.getConnector()).connectorId(mLRegisterModelInput.getConnectorId()).modelConfig(mLRegisterModelInput.getModelConfig()).deploySetting(mLRegisterModelInput.getDeploySetting()).createdTime(now).lastUpdateTime(now).isHidden(mLRegisterModelInput.getIsHidden()).guardrails(mLRegisterModelInput.getGuardrails()).modelInterface(mLRegisterModelInput.getModelInterface()).build();
                IndexRequest indexRequest = new IndexRequest(".plugins-ml-model");
                if (mLRegisterModelInput.getIsHidden() != null && mLRegisterModelInput.getIsHidden().booleanValue()) {
                    indexRequest.id(modelName);
                }
                indexRequest.source(build.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                this.client.index(indexRequest, threadedActionListener(MachineLearningPlugin.REGISTER_THREAD_POOL, ActionListener.wrap(indexResponse -> {
                    String id = indexResponse.getId();
                    mLTask.setModelId(id);
                    log.info("create new model meta doc {} for upload task {}", id, taskId);
                    this.mlTaskManager.updateMLTask(taskId, Map.of("model_id", id, "state", MLTaskState.COMPLETED), 5000L, true);
                    if (mLRegisterModelInput.isDeployModel()) {
                        deployModelAfterRegistering(mLRegisterModelInput, id);
                    }
                    actionListener.onResponse(new MLRegisterModelResponse(taskId, MLTaskState.CREATED.name(), id));
                }, exc -> {
                    log.error("Failed to index model meta doc", exc);
                    handleException(functionName, taskId, exc);
                    actionListener.onFailure(exc);
                })));
            }, exc -> {
                log.error("Failed to init model index", exc);
                handleException(functionName, taskId, exc);
                actionListener.onFailure(exc);
            }));
            if (stashContext != null) {
                stashContext.close();
            }
        } catch (Throwable th) {
            if (stashContext != null) {
                try {
                    stashContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @VisibleForTesting
    void indexRemoteModel(MLRegisterModelInput mLRegisterModelInput, MLTask mLTask, String str) {
        String taskId = mLTask.getTaskId();
        FunctionName functionName = mLTask.getFunctionName();
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                String modelName = mLRegisterModelInput.getModelName();
                String version = str == null ? mLRegisterModelInput.getVersion() : str;
                Instant now = Instant.now();
                if (mLRegisterModelInput.getConnector() != null) {
                    Connector connector = mLRegisterModelInput.getConnector();
                    MLEngine mLEngine = this.mlEngine;
                    Objects.requireNonNull(mLEngine);
                    connector.encrypt(mLEngine::encrypt);
                }
                this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore(ActionListener.wrap(bool -> {
                    if (!bool.booleanValue()) {
                        handleException(functionName, taskId, new RuntimeException("No response to create ML Model index"));
                        return;
                    }
                    MLModel build = MLModel.builder().name(modelName).algorithm(functionName).modelGroupId(mLRegisterModelInput.getModelGroupId()).version(version).description(mLRegisterModelInput.getDescription()).rateLimiter(mLRegisterModelInput.getRateLimiter()).isEnabled(mLRegisterModelInput.getIsEnabled()).modelFormat(mLRegisterModelInput.getModelFormat()).modelState(MLModelState.REGISTERED).connector(mLRegisterModelInput.getConnector()).connectorId(mLRegisterModelInput.getConnectorId()).modelConfig(mLRegisterModelInput.getModelConfig()).deploySetting(mLRegisterModelInput.getDeploySetting()).createdTime(now).lastUpdateTime(now).isHidden(mLRegisterModelInput.getIsHidden()).guardrails(mLRegisterModelInput.getGuardrails()).modelInterface(mLRegisterModelInput.getModelInterface()).build();
                    IndexRequest indexRequest = new IndexRequest(".plugins-ml-model");
                    if (mLRegisterModelInput.getIsHidden() != null && mLRegisterModelInput.getIsHidden().booleanValue()) {
                        indexRequest.id(modelName);
                    }
                    indexRequest.source(build.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                    indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                    this.client.index(indexRequest, threadedActionListener(MachineLearningPlugin.REGISTER_THREAD_POOL, ActionListener.wrap(indexResponse -> {
                        String id = indexResponse.getId();
                        mLTask.setModelId(id);
                        log.info("create new model meta doc {} for upload task {}", id, taskId);
                        this.mlTaskManager.updateMLTask(taskId, Map.of("model_id", id, "state", MLTaskState.COMPLETED), 5000L, true);
                        if (mLRegisterModelInput.isDeployModel()) {
                            deployModelAfterRegistering(mLRegisterModelInput, id);
                        }
                    }, exc -> {
                        log.error("Failed to index model meta doc", exc);
                        handleException(functionName, taskId, exc);
                    })));
                }, exc -> {
                    log.error("Failed to init model index", exc);
                    handleException(functionName, taskId, exc);
                }), () -> {
                    stashContext.restore();
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            MLExceptionUtils.logException("Failed to upload model", e, log);
            handleException(functionName, taskId, e);
        }
    }

    private void uploadModel(MLRegisterModelInput mLRegisterModelInput, MLTask mLTask, String str) throws PrivilegedActionException {
        if (mLRegisterModelInput.getUrl() != null) {
            registerModelFromUrl(mLRegisterModelInput, mLTask, str);
        } else if (mLRegisterModelInput.getFunctionName() == FunctionName.REMOTE || mLRegisterModelInput.getConnectorId() != null) {
            indexRemoteModel(mLRegisterModelInput, mLTask, str);
        } else {
            registerPrebuiltModel(mLRegisterModelInput, mLTask, str);
        }
    }

    private void registerModelFromUrl(MLRegisterModelInput mLRegisterModelInput, MLTask mLTask, String str) {
        String taskId = mLTask.getTaskId();
        FunctionName functionName = mLTask.getFunctionName();
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                String modelName = mLRegisterModelInput.getModelName();
                String version = str == null ? mLRegisterModelInput.getVersion() : str;
                String modelGroupId = mLRegisterModelInput.getModelGroupId();
                Instant now = Instant.now();
                this.mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore(ActionListener.wrap(bool -> {
                    if (!bool.booleanValue()) {
                        handleException(functionName, taskId, new RuntimeException("No response to create ML Model index"));
                        return;
                    }
                    MLModel build = MLModel.builder().name(modelName).modelGroupId(modelGroupId).algorithm(functionName).version(version).description(mLRegisterModelInput.getDescription()).rateLimiter(mLRegisterModelInput.getRateLimiter()).isEnabled(mLRegisterModelInput.getIsEnabled()).modelFormat(mLRegisterModelInput.getModelFormat()).modelState(MLModelState.REGISTERING).modelConfig(mLRegisterModelInput.getModelConfig()).deploySetting(mLRegisterModelInput.getDeploySetting()).createdTime(now).lastUpdateTime(now).isHidden(mLRegisterModelInput.getIsHidden()).guardrails(mLRegisterModelInput.getGuardrails()).modelInterface(mLRegisterModelInput.getModelInterface()).build();
                    IndexRequest indexRequest = new IndexRequest(".plugins-ml-model");
                    if (functionName == FunctionName.METRICS_CORRELATION) {
                        indexRequest.id(functionName.name());
                    }
                    if (mLRegisterModelInput.getIsHidden() != null && mLRegisterModelInput.getIsHidden().booleanValue()) {
                        indexRequest.id(modelName);
                    }
                    indexRequest.source(build.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                    indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                    this.client.index(indexRequest, threadedActionListener(MachineLearningPlugin.REGISTER_THREAD_POOL, ActionListener.wrap(indexResponse -> {
                        String id = indexResponse.getId();
                        mLTask.setModelId(id);
                        log.info("create new model meta doc {} for register model task {}", id, taskId);
                        registerModel(mLRegisterModelInput, taskId, functionName, modelName, version, id);
                    }, exc -> {
                        log.error("Failed to index model meta doc", exc);
                        handleException(functionName, taskId, exc);
                    })));
                }, exc -> {
                    log.error("Failed to init model index", exc);
                    handleException(functionName, taskId, exc);
                }), () -> {
                    stashContext.restore();
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            MLExceptionUtils.logException("Failed to register model", e, log);
            handleException(functionName, taskId, e);
        }
    }

    private void registerModel(MLRegisterModelInput mLRegisterModelInput, String str, FunctionName functionName, String str2, String str3, String str4) {
        this.modelHelper.downloadAndSplit(mLRegisterModelInput.getModelFormat(), str4, str2, str3, mLRegisterModelInput.getUrl(), mLRegisterModelInput.getHashValue(), functionName, ActionListener.wrap(map -> {
            Long l = (Long) map.get("model_size_in_bytes");
            if (l.longValue() >= MODEL_FILE_SIZE_LIMIT) {
                throw new MLException("Model file size exceeds the limit of 4GB: " + l);
            }
            List<String> list = (List) map.get("chunk_files");
            String str5 = (String) map.get("model_file_hash");
            Semaphore semaphore = new Semaphore(1);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            for (String str6 : list) {
                semaphore.tryAcquire(10L, TimeUnit.SECONDS);
                if (atomicBoolean.get()) {
                    throw new MLException("Failed to save model chunk");
                }
                File file = new File(str6);
                byte[] byteArray = Files.toByteArray(file);
                int parseInt = Integer.parseInt(file.getName());
                Instant now = Instant.now();
                MLModel build = MLModel.builder().modelId(str4).name(str2).algorithm(functionName).version(str3).modelFormat(mLRegisterModelInput.getModelFormat()).rateLimiter(mLRegisterModelInput.getRateLimiter()).isEnabled(mLRegisterModelInput.getIsEnabled()).chunkNumber(Integer.valueOf(parseInt)).totalChunks(Integer.valueOf(list.size())).content(Base64.getEncoder().encodeToString(byteArray)).createdTime(now).lastUpdateTime(now).isHidden(mLRegisterModelInput.getIsHidden()).guardrails(mLRegisterModelInput.getGuardrails()).modelInterface(mLRegisterModelInput.getModelInterface()).build();
                IndexRequest indexRequest = new IndexRequest(".plugins-ml-model");
                if (mLRegisterModelInput.getIsHidden() != null && mLRegisterModelInput.getIsHidden().booleanValue()) {
                    indexRequest.id(str2);
                }
                String modelChunkId = getModelChunkId(str4, Integer.valueOf(parseInt));
                indexRequest.id(modelChunkId);
                indexRequest.source(build.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
                indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                this.client.index(indexRequest, ActionListener.wrap(indexResponse -> {
                    atomicInteger.getAndIncrement();
                    if (atomicInteger.get() == list.size()) {
                        updateModelRegisterStateAsDone(mLRegisterModelInput, str, str4, l, list, str5, str3);
                    } else {
                        FileUtils.deleteFileQuietly(file);
                    }
                    semaphore.release();
                }, exc -> {
                    log.error("Failed to index model chunk " + modelChunkId, exc);
                    atomicBoolean.set(true);
                    handleException(functionName, str, exc);
                    FileUtils.deleteFileQuietly(file);
                    deleteModel(str4, mLRegisterModelInput, str3);
                    semaphore.release();
                    FileUtils.deleteFileQuietly(this.mlEngine.getRegisterModelPath(str4));
                }));
            }
        }, exc -> {
            log.error("Failed to index chunk file", exc);
            FileUtils.deleteFileQuietly(this.mlEngine.getRegisterModelPath(str4));
            deleteModel(str4, mLRegisterModelInput, str3);
            handleException(functionName, str, exc);
        }));
    }

    private void registerPrebuiltModel(MLRegisterModelInput mLRegisterModelInput, MLTask mLTask, String str) throws PrivilegedActionException {
        String taskId = mLTask.getTaskId();
        if (!this.modelHelper.isModelAllowed(mLRegisterModelInput, this.modelHelper.downloadPrebuiltModelMetaList(taskId, mLRegisterModelInput))) {
            throw new IllegalArgumentException("This model is not in the pre-trained model list, please check your parameters.");
        }
        this.modelHelper.downloadPrebuiltModelConfig(taskId, mLRegisterModelInput, ActionListener.wrap(mLRegisterModelInput2 -> {
            mLTask.setFunctionName(mLRegisterModelInput2.getFunctionName());
            this.mlTaskManager.updateMLTask(taskId, Map.of("function_name", mLRegisterModelInput2.getFunctionName()), 5000L, false);
            registerModelFromUrl(mLRegisterModelInput2, mLTask, str);
        }, exc -> {
            log.error("Failed to register prebuilt model", exc);
            handleException(mLRegisterModelInput.getFunctionName(), taskId, exc);
        }));
    }

    private <T> ThreadedActionListener<T> threadedActionListener(String str, ActionListener<T> actionListener) {
        return new ThreadedActionListener<>(log, this.threadPool, str, actionListener, false);
    }

    public void checkAndAddRunningTask(MLTask mLTask, Integer num) {
        if (Objects.nonNull(mLTask) && mLTask.getFunctionName() != FunctionName.REMOTE) {
            MLNodeUtils.checkOpenCircuitBreaker(this.mlCircuitBreakerService, this.mlStats);
        }
        this.mlTaskManager.checkLimitAndAddRunningTask(mLTask, num);
    }

    public void checkMaxBatchJobTask(MLTask mLTask, ActionListener<Boolean> actionListener) {
        MLTaskType taskType = mLTask.getTaskType();
        this.mlTaskManager.checkMaxBatchJobTask(taskType, Integer.valueOf((taskType.equals(MLTaskType.BATCH_PREDICTION) ? this.maxBatchInferenceTasks : this.maxBatchIngestionTasks).intValue()), actionListener);
    }

    private void updateModelRegisterStateAsDone(MLRegisterModelInput mLRegisterModelInput, String str, String str2, Long l, List<String> list, String str3, String str4) {
        FunctionName functionName = mLRegisterModelInput.getFunctionName();
        FileUtils.deleteFileQuietly(this.mlEngine.getRegisterModelPath(str2));
        Map<String, Object> of = Map.of("model_state", MLModelState.REGISTERED, "last_registered_time", Long.valueOf(Instant.now().toEpochMilli()), "total_chunks", Integer.valueOf(list.size()), "model_content_hash_value", str3, "model_content_size_in_bytes", l);
        log.info("Model registered successfully, model id: {}, task id: {}", str2, str);
        updateModel(str2, of, ActionListener.wrap(updateResponse -> {
            this.mlTaskManager.updateMLTask(str, Map.of("state", MLTaskState.COMPLETED, "model_id", str2), 5000L, true);
            if (mLRegisterModelInput.isDeployModel()) {
                deployModelAfterRegistering(mLRegisterModelInput, str2);
            }
        }, exc -> {
            log.error("Failed to update model", exc);
            handleException(functionName, str, exc);
            deleteModel(str2, mLRegisterModelInput, str4);
        }));
    }

    @VisibleForTesting
    void deployModelAfterRegistering(MLRegisterModelInput mLRegisterModelInput, String str) {
        String[] modelNodeIds = mLRegisterModelInput.getModelNodeIds();
        log.debug("start deploying model after registering, modelId: {} on nodes: {}", str, Arrays.toString(modelNodeIds));
        this.client.execute(MLDeployModelAction.INSTANCE, new MLDeployModelRequest(str, modelNodeIds, false, true, true), ActionListener.wrap(mLDeployModelResponse -> {
            log.debug("model deployed, response {}", mLDeployModelResponse);
        }, exc -> {
            log.error("Failed to deploy model", exc);
        }));
    }

    private void deleteModel(String str, MLRegisterModelInput mLRegisterModelInput, String str2) {
        DeleteRequest deleteRequest = new DeleteRequest();
        deleteRequest.index(".plugins-ml-model").id(str).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        this.client.delete(deleteRequest);
        this.client.execute(DeleteByQueryAction.INSTANCE, new DeleteByQueryRequest(new String[]{".plugins-ml-model"}).setQuery(new TermQueryBuilder("model_id", str)).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setAbortOnVersionConflict(false));
        deleteOrUpdateModelGroup(mLRegisterModelInput.getModelGroupId(), mLRegisterModelInput.getDoesVersionCreateModelGroup(), str2);
    }

    private void deleteOrUpdateModelGroup(String str, Boolean bool, String str2) {
        if (!Boolean.TRUE.equals(bool)) {
            updateLatestVersionInModelGroup(str, Integer.valueOf(Integer.parseInt(str2) - 1), ActionListener.wrap(updateResponse -> {
                log.debug("model group updated, response {}", updateResponse);
            }, exc -> {
                log.error("Failed to update model group", exc);
            }));
            return;
        }
        DeleteRequest deleteRequest = new DeleteRequest();
        deleteRequest.index(".plugins-ml-model-group").id(str).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        this.client.delete(deleteRequest);
    }

    private void updateLatestVersionInModelGroup(String str, Integer num, ActionListener<UpdateResponse> actionListener) {
        HashMap hashMap = new HashMap();
        hashMap.put("latest_version", num);
        hashMap.put("last_updated_time", Long.valueOf(Instant.now().toEpochMilli()));
        UpdateRequest updateRequest = new UpdateRequest(".plugins-ml-model-group", str);
        updateRequest.doc(hashMap);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                this.client.update(updateRequest, ActionListener.runBefore(actionListener, () -> {
                    stashContext.restore();
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            actionListener.onFailure(e);
        }
    }

    private void handleException(FunctionName functionName, String str, Exception exc) {
        if (!(exc instanceof MLLimitExceededException) && !(exc instanceof MLResourceNotFoundException) && !(exc instanceof IllegalArgumentException)) {
            this.mlStats.createCounterStatIfAbsent(functionName, ActionName.REGISTER, MLActionLevelStat.ML_ACTION_FAILURE_COUNT).increment();
            this.mlStats.getStat(MLNodeLevelStat.ML_FAILURE_COUNT).increment();
        }
        this.mlTaskManager.updateMLTask(str, Map.of("error", MLExceptionUtils.getRootCauseMessage(exc), "state", MLTaskState.FAILED), 5000L, true);
    }

    public void getConnectorCredential(String str, ActionListener<Map<String, String>> actionListener) {
        CheckedConsumer checkedConsumer = connector -> {
            actionListener.onResponse(this.mlEngine.getConnectorCredential(connector));
            log.info("Completed loading credential in the connector {}", str);
        };
        Objects.requireNonNull(actionListener);
        getConnector(str, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
    }

    public void deployModel(String str, String str2, FunctionName functionName, boolean z, boolean z2, MLTask mLTask, ActionListener<String> actionListener) {
        this.mlStats.createCounterStatIfAbsent(functionName, ActionName.DEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
        this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).increment();
        this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
        this.mlStats.createModelCounterStatIfAbsent(str, ActionName.DEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
        List<String> workerNodes = mLTask.getWorkerNodes();
        if (this.modelCacheHelper.isModelDeployed(str)) {
            if (!z2 && workerNodes != null && !workerNodes.isEmpty()) {
                log.info("Set new target node ids {} for model {}", Arrays.toString(workerNodes.toArray(new String[0])), str);
                this.modelCacheHelper.setDeployToAllNodes(str, Boolean.valueOf(z));
                this.modelCacheHelper.setTargetWorkerNodes(str, workerNodes);
                this.modelCacheHelper.refreshLastAccessTime(str);
            }
            actionListener.onResponse("successful");
            return;
        }
        if (functionName != FunctionName.REMOTE && this.modelCacheHelper.getLocalDeployedModels().length >= this.maxModelPerNode.intValue()) {
            actionListener.onFailure(new IllegalArgumentException("Exceed max local model per node limit"));
            return;
        }
        int size = workerNodes.size();
        if (z2) {
            this.modelCacheHelper.initModelStateAutoDeploy(str, MLModelState.DEPLOYING, functionName, workerNodes);
        } else {
            this.modelCacheHelper.initModelState(str, MLModelState.DEPLOYING, functionName, workerNodes, z);
        }
        try {
            try {
                ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
                try {
                    ActionListener runBefore = ActionListener.runBefore(actionListener, () -> {
                        stashContext.restore();
                        this.modelCacheHelper.removeAutoDeployModel(str);
                        this.modelCacheHelper.setIsAutoDeploying(str, false);
                    });
                    if (!z2) {
                        checkAndAddRunningTask(mLTask, this.maxDeployTasksPerNode);
                    }
                    getModel(str, threadedActionListener(MachineLearningPlugin.DEPLOY_THREAD_POOL, ActionListener.wrap(mLModel -> {
                        this.modelCacheHelper.setIsModelEnabled(str, mLModel.getIsEnabled());
                        this.modelCacheHelper.setModelInfo(str, mLModel);
                        if (FunctionName.REMOTE == mLModel.getAlgorithm() || !(FunctionName.isDLModel(mLModel.getAlgorithm()) || mLModel.getAlgorithm() == FunctionName.METRICS_CORRELATION)) {
                            if (BooleanUtils.isTrue(mLModel.getIsControllerEnabled())) {
                                getController(str, ActionListener.wrap(mLController -> {
                                    setupUserRateLimiterMap(str, Integer.valueOf(size), mLController.getUserRateLimiter());
                                    log.info("Successfully redeployed model controller for model " + str);
                                    log.info("Trying to deploy remote model with model controller configured.");
                                    deployRemoteOrBuiltInModel(mLModel, Integer.valueOf(size), runBefore);
                                }, exc -> {
                                    log.error("Trying to deploy remote model with exceptions in re-deploying its model controller. Model ID: " + str, exc);
                                    deployRemoteOrBuiltInModel(mLModel, Integer.valueOf(size), runBefore);
                                }));
                                return;
                            } else {
                                log.info("Trying to deploy remote or built-in model without model controller configured.");
                                deployRemoteOrBuiltInModel(mLModel, Integer.valueOf(size), runBefore);
                                return;
                            }
                        }
                        setupRateLimiter(str, Integer.valueOf(size), mLModel.getRateLimiter());
                        setupMLGuard(str, mLModel.getGuardrails());
                        setupModelInterface(str, mLModel.getModelInterface());
                        deployControllerWithDeployingModel(mLModel, Integer.valueOf(size));
                        MLNodeUtils.checkOpenCircuitBreaker(this.mlCircuitBreakerService, this.mlStats);
                        retrieveModelChunks(mLModel, ActionListener.wrap(file -> {
                            String calculateFileHash = FileUtils.calculateFileHash(file);
                            if (str2 != null && !str2.equals(calculateFileHash)) {
                                log.error("Model content hash can't match original hash value");
                                removeModel(str);
                                runBefore.onFailure(new IllegalArgumentException("model content changed"));
                                return;
                            }
                            log.debug("Model content matches original hash value, continue deploying");
                            Map of = Map.of("model_zip_file", file, "model_helper", this.modelHelper, "ml_engine", this.mlEngine);
                            if (FunctionName.METRICS_CORRELATION.equals(mLModel.getAlgorithm())) {
                                MLExecutable deployExecute = this.mlEngine.deployExecute(mLModel, of);
                                try {
                                    this.modelCacheHelper.setMLExecutor(str, deployExecute);
                                    this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
                                    this.modelCacheHelper.setModelState(str, MLModelState.DEPLOYED);
                                    this.modelCacheHelper.refreshLastAccessTime(str);
                                    runBefore.onResponse("successful");
                                    return;
                                } catch (Exception e) {
                                    log.error("Failed to add predictor to cache", e);
                                    deployExecute.close();
                                    runBefore.onFailure(e);
                                    return;
                                }
                            }
                            Predictable deploy = this.mlEngine.deploy(mLModel, of);
                            try {
                                this.modelCacheHelper.setPredictor(str, deploy);
                                this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
                                this.modelCacheHelper.setModelState(str, MLModelState.DEPLOYED);
                                this.modelCacheHelper.refreshLastAccessTime(str);
                                Long modelContentSizeInBytes = mLModel.getModelContentSizeInBytes();
                                this.modelCacheHelper.setMemSizeEstimation(str, mLModel.getModelFormat(), Long.valueOf(modelContentSizeInBytes == null ? mLModel.getTotalChunks().intValue() * 10000000 : modelContentSizeInBytes.longValue()));
                                runBefore.onResponse("successful");
                            } catch (Exception e2) {
                                log.error("Failed to add predictor to cache", e2);
                                deploy.close();
                                runBefore.onFailure(e2);
                            }
                        }, exc2 -> {
                            log.error("Failed to retrieve model " + str, exc2);
                            handleDeployModelException(str, functionName, runBefore, exc2);
                        }));
                    }, exc -> {
                        log.error("Failed to deploy model " + str, exc);
                        handleDeployModelException(str, functionName, runBefore, exc);
                    })));
                    if (stashContext != null) {
                        stashContext.close();
                    }
                    this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
                } catch (Throwable th) {
                    if (stashContext != null) {
                        try {
                            stashContext.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                handleDeployModelException(str, functionName, actionListener, e);
                this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
            }
        } catch (Throwable th3) {
            this.mlStats.getStat(MLNodeLevelStat.ML_EXECUTING_TASK_COUNT).decrement();
            throw th3;
        }
    }

    public void deployRemoteModelToLocal(String str, MLModel mLModel, ActionListener<String> actionListener) {
        if (this.modelCacheHelper.isModelDeployed(str)) {
            actionListener.onResponse("Success");
            return;
        }
        this.modelCacheHelper.initModelState(str, MLModelState.DEPLOYING, FunctionName.REMOTE, new ArrayList(), mLModel.isDeployToAllNodes());
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                Objects.requireNonNull(stashContext);
                ActionListener<String> runBefore = ActionListener.runBefore(actionListener, stashContext::restore);
                this.modelCacheHelper.setIsModelEnabled(str, mLModel.getIsEnabled());
                deployRemoteOrBuiltInModel(mLModel, 1, runBefore);
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to deploy model to local node" + str, e);
            actionListener.onFailure(e);
        }
    }

    private void deployRemoteOrBuiltInModel(MLModel mLModel, Integer num, ActionListener<String> actionListener) {
        String modelId = mLModel.getModelId();
        setupRateLimiter(modelId, num, mLModel.getRateLimiter());
        setupMLGuard(modelId, mLModel.getGuardrails());
        setupModelInterface(modelId, mLModel.getModelInterface());
        if (mLModel.getConnector() == null && FunctionName.REMOTE == mLModel.getAlgorithm()) {
            log.info("Set connector {} for the model: {}", mLModel.getConnectorId(), modelId);
            String connectorId = mLModel.getConnectorId();
            CheckedConsumer checkedConsumer = connector -> {
                mLModel.setConnector(connector);
                setupParamsAndPredictable(modelId, mLModel);
                this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
                this.modelCacheHelper.setModelState(modelId, MLModelState.DEPLOYED);
                this.modelCacheHelper.refreshLastAccessTime(modelId);
                actionListener.onResponse("successful");
                log.info("Completed setting connector {} in the model {}", mLModel.getConnectorId(), modelId);
            };
            Objects.requireNonNull(actionListener);
            getConnector(connectorId, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
            return;
        }
        setupParamsAndPredictable(modelId, mLModel);
        this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).increment();
        this.modelCacheHelper.setModelState(modelId, MLModelState.DEPLOYED);
        this.modelCacheHelper.refreshLastAccessTime(modelId);
        actionListener.onResponse("successful");
    }

    private void setupParamsAndPredictable(String str, MLModel mLModel) {
        this.modelCacheHelper.setPredictor(str, this.mlEngine.deploy(mLModel, setUpParameterMap(str)));
    }

    private Map<String, Object> setUpParameterMap(String str) {
        TokenBucket rateLimiter = getRateLimiter(str);
        Map<String, TokenBucket> userRateLimiterMap = getUserRateLimiterMap(str);
        MLGuard mLGuard = getMLGuard(str);
        HashMap hashMap = new HashMap();
        hashMap.put("ml_engine", this.mlEngine);
        hashMap.put("script_service", this.scriptService);
        hashMap.put("client", this.client);
        hashMap.put("xcontent_registry", this.xContentRegistry);
        hashMap.put("cluster_service", this.clusterService);
        if (rateLimiter == null && userRateLimiterMap == null) {
            log.info("Setting up basic ML predictor parameters.");
        } else if (rateLimiter != null && userRateLimiterMap == null) {
            hashMap.put("rate_limiter", rateLimiter);
            log.info("Setting up basic ML predictor parameters with model level throttling.");
        } else if (rateLimiter == null) {
            hashMap.put("user_rate_limiter_map", userRateLimiterMap);
            log.info("Setting up basic ML predictor parameters with user level throttling.");
        } else {
            hashMap.put("rate_limiter", rateLimiter);
            hashMap.put("user_rate_limiter_map", userRateLimiterMap);
            log.info("Setting up basic ML predictor parameters with both model and user level throttling.");
        }
        if (mLGuard != null) {
            hashMap.put("guardrails", mLGuard);
            log.info("Setting up ML guard parameter for ML predictor.");
        }
        hashMap.put("connectorPrivateIpEnabled", this.mlFeatureEnabledSetting.isConnectorPrivateIpEnabled());
        return Collections.unmodifiableMap(hashMap);
    }

    private void handleDeployModelException(String str, FunctionName functionName, ActionListener<String> actionListener, Exception exc) {
        if (!(exc instanceof MLLimitExceededException) && !(exc instanceof MLResourceNotFoundException) && !(exc instanceof IllegalArgumentException)) {
            this.mlStats.createCounterStatIfAbsent(functionName, ActionName.DEPLOY, MLActionLevelStat.ML_ACTION_FAILURE_COUNT).increment();
            this.mlStats.getStat(MLNodeLevelStat.ML_FAILURE_COUNT).increment();
        }
        removeModel(str);
        actionListener.onFailure(exc);
    }

    public synchronized void updateModelCache(String str, ActionListener<String> actionListener) {
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                Objects.requireNonNull(stashContext);
                ActionListener runBefore = ActionListener.runBefore(actionListener, stashContext::restore);
                CheckedConsumer checkedConsumer = mLModel -> {
                    int length = getWorkerNodes(str, mLModel.getAlgorithm()).length;
                    this.modelCacheHelper.setIsModelEnabled(str, mLModel.getIsEnabled());
                    setupRateLimiter(str, Integer.valueOf(length), mLModel.getRateLimiter());
                    setupMLGuard(str, mLModel.getGuardrails());
                    setupModelInterface(str, mLModel.getModelInterface());
                    if (mLModel.getAlgorithm() == FunctionName.REMOTE) {
                        if (mLModel.getConnector() != null) {
                            setupParamsAndPredictable(str, mLModel);
                            runBefore.onResponse("Successfully updated model cache for the remote model " + str);
                            log.info("Completed the model cache update for the remote model {}", str);
                        } else {
                            String connectorId = mLModel.getConnectorId();
                            CheckedConsumer checkedConsumer2 = connector -> {
                                mLModel.setConnector(connector);
                                setupParamsAndPredictable(str, mLModel);
                                runBefore.onResponse("Successfully updated model cache for the remote model " + str);
                                log.info("Completed the model cache update for the remote model {}", str);
                            };
                            Objects.requireNonNull(runBefore);
                            getConnector(connectorId, ActionListener.wrap(checkedConsumer2, runBefore::onFailure));
                        }
                    }
                    runBefore.onResponse("Successfully updated model cache for the model " + str);
                    log.info("Completed the model cache update for the model {}", str);
                };
                Objects.requireNonNull(runBefore);
                getModel(str, ActionListener.wrap(checkedConsumer, runBefore::onFailure));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to updated model cache for the model " + str, e);
            actionListener.onFailure(e);
        }
    }

    public synchronized void deployControllerWithDeployedModel(String str, ActionListener<String> actionListener) {
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                if (!this.mlFeatureEnabledSetting.isControllerEnabled().booleanValue()) {
                    throw new IllegalStateException(MLExceptionUtils.CONTROLLER_DISABLED_ERR_MSG);
                }
                if (!this.modelCacheHelper.isModelDeployed(str)) {
                    throw new OpenSearchStatusException("The model of this model controller has not deployed yet, please deploy the model first.", RestStatus.CONFLICT, new Object[0]);
                }
                Objects.requireNonNull(stashContext);
                ActionListener runBefore = ActionListener.runBefore(actionListener, stashContext::restore);
                CheckedConsumer checkedConsumer = mLModel -> {
                    CheckedConsumer checkedConsumer2 = mLController -> {
                        setupUserRateLimiterMap(str, Integer.valueOf(getWorkerNodes(str, mLModel.getAlgorithm()).length), mLController.getUserRateLimiter());
                        if (mLModel.getAlgorithm() != FunctionName.REMOTE) {
                            runBefore.onResponse("Successfully deployed model controller for the model " + str);
                            log.info("Deployed model controller for the model {}", str);
                        } else if (mLModel.getConnector() != null) {
                            setupParamsAndPredictable(str, mLModel);
                            runBefore.onResponse("Successfully deployed model controller for the remote model " + str);
                            log.info("Deployed model controller for the remote model {}", str);
                        } else {
                            String connectorId = mLModel.getConnectorId();
                            CheckedConsumer checkedConsumer3 = connector -> {
                                mLModel.setConnector(connector);
                                setupParamsAndPredictable(str, mLModel);
                                runBefore.onResponse("Successfully deployed model controller for the remote model " + str);
                                log.info("Deployed model controller for the remote model {}", str);
                            };
                            Objects.requireNonNull(runBefore);
                            getConnector(connectorId, ActionListener.wrap(checkedConsumer3, runBefore::onFailure));
                        }
                    };
                    Objects.requireNonNull(runBefore);
                    getController(str, ActionListener.wrap(checkedConsumer2, runBefore::onFailure));
                };
                Objects.requireNonNull(runBefore);
                getModel(str, ActionListener.wrap(checkedConsumer, runBefore::onFailure));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to deploy model controller for the model " + str, e);
            actionListener.onFailure(e);
        }
    }

    public synchronized void undeployController(String str, ActionListener<String> actionListener) {
        if (!this.modelCacheHelper.isModelDeployed(str)) {
            if (isModelRunningOnNode(str)) {
                log.error("Failed to undeploy model controller because model is in ML cache but with a state other than deployed. Please check model: " + str, new RuntimeException());
                actionListener.onFailure(new RuntimeException("Failed to undeploy model controller because model is in ML cache but with a state other than deployed. Please check model: " + str));
                return;
            } else {
                log.info("Successfully deployed model controller from cache due to model not exist in cache. Model ID: " + str);
                actionListener.onResponse("Successfully deployed model controller from cache due to model not exist in cache. Model ID: " + str);
                return;
            }
        }
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                Objects.requireNonNull(stashContext);
                ActionListener runBefore = ActionListener.runBefore(actionListener, stashContext::restore);
                CheckedConsumer checkedConsumer = mLModel -> {
                    removeUserRateLimiterMap(str);
                    if (mLModel.getAlgorithm() != FunctionName.REMOTE) {
                        runBefore.onResponse("Successfully undeployed model controller for the model " + str);
                        log.info("Undeployed model controller for the model {}", str);
                    } else if (mLModel.getConnector() != null) {
                        setupParamsAndPredictable(str, mLModel);
                        runBefore.onResponse("Successfully undeployed model controller for the remote model " + str);
                        log.info("Undeployed model controller for the remote model {}", str);
                    } else {
                        String connectorId = mLModel.getConnectorId();
                        CheckedConsumer checkedConsumer2 = connector -> {
                            mLModel.setConnector(connector);
                            setupParamsAndPredictable(str, mLModel);
                            runBefore.onResponse("Successfully undeployed model controller for the remote model " + str);
                            log.info("Undeployed model controller for the remote model {}", str);
                        };
                        Objects.requireNonNull(runBefore);
                        getConnector(connectorId, ActionListener.wrap(checkedConsumer2, runBefore::onFailure));
                    }
                };
                Objects.requireNonNull(runBefore);
                getModel(str, ActionListener.wrap(checkedConsumer, runBefore::onFailure));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to undeploy model controller for the model " + str, e);
            actionListener.onFailure(e);
        }
    }

    private synchronized void deployControllerWithDeployingModel(MLModel mLModel, Integer num, ActionListener<String> actionListener) {
        String modelId = mLModel.getModelId();
        this.client.get(new GetRequest(".plugins-ml-controller").id(modelId).fetchSourceContext(new FetchSourceContext(true)), ActionListener.wrap(getResponse -> {
            if (getResponse == null || !getResponse.isExists()) {
                if (BooleanUtils.isTrue(mLModel.getIsControllerEnabled())) {
                    actionListener.onFailure(new OpenSearchStatusException("Failed to find model controller", RestStatus.NOT_FOUND, new Object[0]));
                    return;
                } else {
                    actionListener.onResponse("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller. Please use the create controller api to create one if this is unexpected.");
                    log.debug("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller.");
                    return;
                }
            }
            try {
                XContentParser createXContentParserFromRegistry = MLNodeUtils.createXContentParserFromRegistry(this.xContentRegistry, getResponse.getSourceAsBytesRef());
                try {
                    XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, createXContentParserFromRegistry.nextToken(), createXContentParserFromRegistry);
                    setupUserRateLimiterMap(modelId, num, MLController.parse(createXContentParserFromRegistry).getUserRateLimiter());
                    log.info("Successfully redeployed model controller for model " + modelId);
                    actionListener.onResponse("Successfully redeployed model controller for model " + modelId);
                    if (createXContentParserFromRegistry != null) {
                        createXContentParserFromRegistry.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                log.error("Failed to parse ml task" + getResponse.getId(), e);
                actionListener.onFailure(e);
            }
        }, exc -> {
            if (!(exc instanceof IndexNotFoundException)) {
                log.error("Failed to re-deploy the model controller for model: " + modelId, exc);
                actionListener.onFailure(exc);
            } else if (BooleanUtils.isTrue(mLModel.getIsControllerEnabled())) {
                actionListener.onFailure(new OpenSearchStatusException("Failed to find model controller", RestStatus.NOT_FOUND, new Object[0]));
            } else {
                actionListener.onResponse("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller. Please use the create model controller api to create one if this is unexpected.");
                log.debug("No controller is deployed because the model " + modelId + " is expected not having an enabled model controller.");
            }
        }));
    }

    public void deployControllerWithDeployingModel(MLModel mLModel, Integer num) {
        if (!this.mlFeatureEnabledSetting.isControllerEnabled().booleanValue()) {
            throw new IllegalStateException(MLExceptionUtils.CONTROLLER_DISABLED_ERR_MSG);
        }
        if (mLModel.getModelState() != MLModelState.DEPLOYING) {
            throw new OpenSearchStatusException("This method should only be called when model is in DEPLOYING state, but the model is in state: " + String.valueOf(mLModel.getModelState()), RestStatus.CONFLICT, new Object[0]);
        }
        deployControllerWithDeployingModel(mLModel, num, ActionListener.wrap(str -> {
            if (str.startsWith("Successfully")) {
                log.debug(str, mLModel.getModelId());
            } else if (str.endsWith("is expected not having a model controller. Please use the create model controller api to create one if this is unexpected.")) {
                log.warn(str);
            } else {
                log.error(str);
            }
        }, exc -> {
            log.error("Failed to re-deploy the model controller for model: " + mLModel.getModelId(), exc);
        }));
    }

    private void setupRateLimiter(String str, Integer num, MLRateLimiter mLRateLimiter) {
        if (mLRateLimiter != null) {
            this.modelCacheHelper.setRateLimiter(str, createTokenBucket(num, mLRateLimiter));
        } else {
            this.modelCacheHelper.removeRateLimiter(str);
        }
    }

    private void setupUserRateLimiterMap(String str, Integer num, Map<String, MLRateLimiter> map) {
        if (map == null || map.isEmpty()) {
            this.modelCacheHelper.removeUserRateLimiterMap(str);
            return;
        }
        HashMap hashMap = new HashMap();
        map.forEach((str2, mLRateLimiter) -> {
            hashMap.put(str2, createTokenBucket(num, mLRateLimiter));
        });
        this.modelCacheHelper.setUserRateLimiterMap(str, hashMap);
    }

    private void removeUserRateLimiterMap(String str) {
        this.modelCacheHelper.removeUserRateLimiterMap(str);
    }

    private TokenBucket createTokenBucket(Integer num, MLRateLimiter mLRateLimiter) {
        if (!mLRateLimiter.isValid()) {
            return null;
        }
        double parseDouble = Double.parseDouble(mLRateLimiter.getLimit());
        log.info("Initializing the rate limiter with setting {} per {} (TPS limit {}), evenly distributed on {} nodes", Double.valueOf(parseDouble), mLRateLimiter.getUnit(), Double.valueOf(parseDouble / r0.toSeconds(1L)), num);
        return new TokenBucket(System::nanoTime, (parseDouble / r0.toNanos(1L)) / num.intValue(), Math.max(parseDouble / num.intValue(), 1.0d), Math.max(parseDouble / num.intValue(), 1.0d));
    }

    public TokenBucket getRateLimiter(String str) {
        return this.modelCacheHelper.getRateLimiter(str);
    }

    public Map<String, TokenBucket> getUserRateLimiterMap(String str) {
        return this.modelCacheHelper.getUserRateLimiterMap(str);
    }

    private void setupModelInterface(String str, Map<String, String> map) {
        log.debug("Model interface for model: {} loaded into cache.", str);
        if (map != null) {
            this.modelCacheHelper.setModelInterface(str, map);
        } else {
            this.modelCacheHelper.removeModelInterface(str);
        }
    }

    public Map<String, String> getModelInterface(String str) {
        return this.modelCacheHelper.getModelInterface(str);
    }

    private void setupMLGuard(String str, Guardrails guardrails) {
        if (guardrails != null) {
            this.modelCacheHelper.setMLGuard(str, createMLGuard(guardrails, this.xContentRegistry, this.client));
        } else {
            this.modelCacheHelper.removeMLGuard(str);
        }
    }

    private MLGuard createMLGuard(Guardrails guardrails, NamedXContentRegistry namedXContentRegistry, Client client) {
        return new MLGuard(guardrails, namedXContentRegistry, client);
    }

    public MLGuard getMLGuard(String str) {
        return this.modelCacheHelper.getMLGuard(str);
    }

    public void getModel(String str, ActionListener<MLModel> actionListener) {
        getModel(str, null, null, actionListener);
    }

    public void getModel(String str, String[] strArr, String[] strArr2, ActionListener<MLModel> actionListener) {
        GetRequest getRequest = new GetRequest();
        getRequest.index(".plugins-ml-model").id(str).fetchSourceContext(new FetchSourceContext(true, strArr, strArr2));
        Client client = this.client;
        CheckedConsumer checkedConsumer = getResponse -> {
            if (getResponse == null || !getResponse.isExists()) {
                actionListener.onFailure(new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND, new Object[0]));
                return;
            }
            try {
                XContentParser createXContentParserFromRegistry = MLNodeUtils.createXContentParserFromRegistry(this.xContentRegistry, getResponse.getSourceAsBytesRef());
                try {
                    XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, createXContentParserFromRegistry.nextToken(), createXContentParserFromRegistry);
                    MLModel parse = MLModel.parse(createXContentParserFromRegistry, getResponse.getSource().get(RestActionUtils.PARAMETER_ALGORITHM).toString());
                    parse.setModelId(str);
                    actionListener.onResponse(parse);
                    if (createXContentParserFromRegistry != null) {
                        createXContentParserFromRegistry.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                log.error("Failed to parse ml task" + getResponse.getId(), e);
                actionListener.onFailure(e);
            }
        };
        Objects.requireNonNull(actionListener);
        client.get(getRequest, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
    }

    public void getController(String str, ActionListener<MLController> actionListener) {
        GetRequest fetchSourceContext = new GetRequest(".plugins-ml-controller").id(str).fetchSourceContext(new FetchSourceContext(true));
        Client client = this.client;
        CheckedConsumer checkedConsumer = getResponse -> {
            if (getResponse == null || !getResponse.isExists()) {
                actionListener.onFailure(new OpenSearchStatusException("Failed to find model controller", RestStatus.NOT_FOUND, new Object[0]));
                return;
            }
            try {
                XContentParser createXContentParserFromRegistry = MLNodeUtils.createXContentParserFromRegistry(this.xContentRegistry, getResponse.getSourceAsBytesRef());
                try {
                    XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, createXContentParserFromRegistry.nextToken(), createXContentParserFromRegistry);
                    actionListener.onResponse(MLController.parse(createXContentParserFromRegistry));
                    if (createXContentParserFromRegistry != null) {
                        createXContentParserFromRegistry.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                log.error("Failed to parse ml task" + getResponse.getId(), e);
                actionListener.onFailure(e);
            }
        };
        Objects.requireNonNull(actionListener);
        client.get(fetchSourceContext, ActionListener.wrap(checkedConsumer, actionListener::onFailure));
    }

    public void getConnector(String str, ActionListener<Connector> actionListener) {
        GetRequest id = new GetRequest().index(".plugins-ml-connector").id(str);
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                Objects.requireNonNull(stashContext);
                ActionListener runBefore = ActionListener.runBefore(actionListener, stashContext::restore);
                this.client.get(id, ActionListener.wrap(getResponse -> {
                    if (getResponse == null || !getResponse.isExists()) {
                        runBefore.onFailure(new OpenSearchStatusException("Failed to find connector:" + str, RestStatus.NOT_FOUND, new Object[0]));
                        return;
                    }
                    try {
                        XContentParser createXContentParserFromRegistry = MLNodeUtils.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, getResponse.getSourceAsBytesRef());
                        try {
                            XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, createXContentParserFromRegistry.nextToken(), createXContentParserFromRegistry);
                            runBefore.onResponse(Connector.createConnector(createXContentParserFromRegistry));
                            if (createXContentParserFromRegistry != null) {
                                createXContentParserFromRegistry.close();
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        log.error("Failed to parse connector:" + str);
                        runBefore.onFailure(e);
                    }
                }, exc -> {
                    log.error("Failed to get connector", exc);
                    runBefore.onFailure(new OpenSearchStatusException("Failed to get connector:" + str, RestStatus.NOT_FOUND, new Object[0]));
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to get connector", e);
            actionListener.onFailure(e);
        }
    }

    private void retrieveModelChunks(MLModel mLModel, ActionListener<File> actionListener) throws InterruptedException {
        String modelId = mLModel.getModelId();
        String name = mLModel.getName();
        Integer totalChunks = mLModel.getTotalChunks();
        GetRequest getRequest = new GetRequest();
        getRequest.index(".plugins-ml-model");
        getRequest.id();
        Semaphore semaphore = new Semaphore(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        String deployModelZipPath = this.mlEngine.getDeployModelZipPath(modelId, name);
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < totalChunks.intValue(); i++) {
            semaphore.tryAcquire(10L, TimeUnit.SECONDS);
            if (atomicBoolean.get()) {
                throw new MLException("Failed to deploy model");
            }
            String modelChunkId = getModelChunkId(modelId, Integer.valueOf(i));
            int i2 = i;
            getModel(modelChunkId, threadedActionListener(MachineLearningPlugin.DEPLOY_THREAD_POOL, ActionListener.wrap(mLModel2 -> {
                Path deployModelChunkPath = this.mlEngine.getDeployModelChunkPath(modelId, Integer.valueOf(i2));
                FileUtils.write(Base64.getDecoder().decode(mLModel2.getContent()), deployModelChunkPath.toString());
                concurrentLinkedDeque.add(new File(deployModelChunkPath.toUri()));
                atomicInteger.getAndIncrement();
                if (atomicInteger.get() == totalChunks.intValue()) {
                    File file = new File(deployModelZipPath);
                    FileUtils.mergeFiles(concurrentLinkedDeque, file);
                    actionListener.onResponse(file);
                }
                semaphore.release();
            }, exc -> {
                atomicBoolean.set(true);
                semaphore.release();
                log.error("Failed to retrieve model chunk " + modelChunkId, exc);
                if (atomicInteger.get() == totalChunks.intValue() - 1) {
                    actionListener.onFailure(new MLResourceNotFoundException("Fail to find model chunk " + modelChunkId));
                }
            })));
        }
    }

    public void updateModel(String str, Boolean bool, Map<String, Object> map) {
        updateModel(str, map, ActionListener.wrap(updateResponse -> {
            if (updateResponse.status() == RestStatus.OK) {
                log.debug(StringUtils.getErrorMessage("Updated ML model successfully: {}", str, bool), updateResponse.status());
            } else {
                log.error(StringUtils.getErrorMessage("Failed to update provided ML model, status: {}", str, bool), updateResponse.status());
            }
        }, exc -> {
            log.error(StringUtils.getErrorMessage("Failed to update the provided ML model", str, bool), exc);
        }));
    }

    public void updateModel(String str, Map<String, Object> map, ActionListener<UpdateResponse> actionListener) {
        if (map == null || map.size() == 0) {
            actionListener.onFailure(new IllegalArgumentException("Updated fields is null or empty"));
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.put("last_updated_time", Long.valueOf(Instant.now().toEpochMilli()));
        UpdateRequest updateRequest = new UpdateRequest(".plugins-ml-model", str);
        updateRequest.doc(hashMap);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        if (hashMap.containsKey("model_state") && MODEL_DONE_STATES.contains(hashMap.get("model_state"))) {
            updateRequest.retryOnConflict(3);
        }
        try {
            ThreadContext.StoredContext stashContext = this.client.threadPool().getThreadContext().stashContext();
            try {
                this.client.update(updateRequest, ActionListener.runBefore(actionListener, () -> {
                    stashContext.restore();
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            actionListener.onFailure(e);
        }
    }

    public String getModelChunkId(String str, Integer num) {
        return str + "_" + num;
    }

    public void addModelWorkerNode(String str, String... strArr) {
        if (strArr != null) {
            for (String str2 : strArr) {
                this.modelCacheHelper.addWorkerNode(str, str2);
            }
        }
    }

    public void addModelWorkerNodes(List<String> list) {
        if (list != null) {
            String[] allModelIds = getAllModelIds();
            for (String str : list) {
                Arrays.stream(allModelIds).forEach(str2 -> {
                    this.modelCacheHelper.addWorkerNode(str2, str);
                });
            }
        }
    }

    public void removeModelWorkerNode(String str, boolean z, String... strArr) {
        if (strArr != null) {
            for (String str2 : strArr) {
                this.modelCacheHelper.removeWorkerNode(str, str2, z);
            }
        }
    }

    public void removeWorkerNodes(Set<String> set, boolean z) {
        this.modelCacheHelper.removeWorkerNodes(set, z);
    }

    public synchronized Map<String, String> undeployModel(String[] strArr) {
        HashMap hashMap = new HashMap();
        if (strArr == null || strArr.length <= 0) {
            log.debug("undeploy all models {}", Arrays.toString(getLocalDeployedModels()));
            for (String str : getLocalDeployedModels()) {
                hashMap.put(str, "undeployed");
                this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).decrement();
                this.mlStats.createCounterStatIfAbsent(getModelFunctionName(str), ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                this.mlStats.createModelCounterStatIfAbsent(str, ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                removeModel(str);
            }
        } else {
            log.debug("undeploy models {}", Arrays.toString(strArr));
            for (String str2 : strArr) {
                if (this.modelCacheHelper.isModelDeployed(str2)) {
                    hashMap.put(str2, "undeployed");
                    this.mlStats.getStat(MLNodeLevelStat.ML_DEPLOYED_MODEL_COUNT).decrement();
                    this.mlStats.getStat(MLNodeLevelStat.ML_REQUEST_COUNT).increment();
                    this.mlStats.createCounterStatIfAbsent(getModelFunctionName(str2), ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                    this.mlStats.createModelCounterStatIfAbsent(str2, ActionName.UNDEPLOY, MLActionLevelStat.ML_ACTION_REQUEST_COUNT).increment();
                } else {
                    hashMap.put(str2, "not_found");
                }
                removeModel(str2);
            }
        }
        return hashMap;
    }

    private void removeModel(String str) {
        this.modelCacheHelper.removeModel(str);
        this.modelHelper.deleteFileCache(str);
    }

    public String[] getWorkerNodes(String str, FunctionName functionName, boolean z) {
        String[] workerNodes = this.modelCacheHelper.getWorkerNodes(str);
        if (!z) {
            return workerNodes;
        }
        if (workerNodes == null || workerNodes.length == 0) {
            return workerNodes;
        }
        String[] filterEligibleNodes = this.nodeHelper.filterEligibleNodes(functionName, workerNodes);
        if (filterEligibleNodes == null || filterEligibleNodes.length == 0) {
            throw new IllegalArgumentException("No eligible worker node found");
        }
        return filterEligibleNodes;
    }

    public int getWorkerNodesSize(String str, FunctionName functionName, boolean z) {
        return getWorkerNodes(str, functionName, z).length;
    }

    public String[] getWorkerNodes(String str, FunctionName functionName) {
        return getWorkerNodes(str, functionName, false);
    }

    public int getWorkerNodesSize(String str, FunctionName functionName) {
        return getWorkerNodes(str, functionName, false).length;
    }

    public Predictable getPredictor(String str) {
        return this.modelCacheHelper.getPredictor(str);
    }

    public String[] getAllModelIds() {
        return this.modelCacheHelper.getAllModels();
    }

    public String[] getLocalDeployedModels() {
        return this.modelCacheHelper.getDeployedModels();
    }

    public String[] getExpiredModels() {
        return this.modelCacheHelper.getExpiredModels();
    }

    public synchronized void syncModelWorkerNodes(Map<String, Set<String>> map) {
        this.modelCacheHelper.syncWorkerNodes(map);
    }

    public void clearRoutingTable() {
        this.modelCacheHelper.clearWorkerNodes();
    }

    public MLModelProfile getModelProfile(String str) {
        return this.modelCacheHelper.getModelProfile(str);
    }

    public <T> T trackPredictDuration(String str, Supplier<T> supplier) {
        long nanoTime = System.nanoTime();
        T t = supplier.get();
        this.modelCacheHelper.addModelInferenceDuration(str, (System.nanoTime() - nanoTime) / 1000000.0d);
        return t;
    }

    public void trackPredictDuration(String str, long j) {
        this.modelCacheHelper.addModelInferenceDuration(str, (System.nanoTime() - j) / 1000000.0d);
    }

    public FunctionName getModelFunctionName(String str) {
        return this.modelCacheHelper.getFunctionName(str);
    }

    public Optional<FunctionName> getOptionalModelFunctionName(String str) {
        return this.modelCacheHelper.getOptionalFunctionName(str);
    }

    public boolean isModelRunningOnNode(String str) {
        return this.modelCacheHelper.isModelRunningOnNode(str);
    }

    public boolean isModelDeployed(String str) {
        return this.modelCacheHelper.isModelDeployed(str);
    }

    public boolean isNodeEligible(String str, FunctionName functionName) {
        return ((Set) Arrays.stream(this.nodeHelper.getEligibleNodes(functionName)).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet())).contains(str);
    }

    public MLModel addModelToAutoDeployCache(String str, MLModel mLModel) {
        return this.modelCacheHelper.addModelToAutoDeployCache(str, mLModel);
    }

    public void removeAutoDeployModel(String str) {
        this.modelCacheHelper.removeAutoDeployModel(str);
    }
}
