package org.opensearch.ml.action.forward;

import com.google.common.collect.ImmutableMap;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.output.MLOutput;
import org.opensearch.ml.common.transport.forward.MLForwardInput;
import org.opensearch.ml.common.transport.forward.MLForwardRequest;
import org.opensearch.ml.common.transport.forward.MLForwardRequestType;
import org.opensearch.ml.common.transport.forward.MLForwardResponse;
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
import org.opensearch.ml.common.transport.sync.MLSyncUpAction;
import org.opensearch.ml.common.transport.sync.MLSyncUpInput;
import org.opensearch.ml.common.transport.sync.MLSyncUpNodesRequest;
import org.opensearch.ml.model.MLModelManager;
import org.opensearch.ml.settings.MLCommonsSettings;
import org.opensearch.ml.task.MLTaskCache;
import org.opensearch.ml.task.MLTaskManager;
import org.opensearch.ml.utils.MLExceptionUtils;
import org.opensearch.ml.utils.RestActionUtils;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/* loaded from: input_file:org/opensearch/ml/action/forward/TransportForwardAction.class */
public class TransportForwardAction extends HandledTransportAction<ActionRequest, MLForwardResponse> {

    @Generated
    private static final Logger log = LogManager.getLogger(TransportForwardAction.class);
    private final ClusterService clusterService;
    private MLTaskManager mlTaskManager;
    private Client client;
    private MLModelManager mlModelManager;
    private DiscoveryNodeHelper nodeHelper;
    private final Settings settings;
    private volatile float modelAutoRedeploySuccessRatio;
    private boolean enableAutoReDeployModel;
    private final MLModelAutoReDeployer mlModelAutoReDeployer;

    /* renamed from: org.opensearch.ml.action.forward.TransportForwardAction$1, reason: invalid class name */
    /* loaded from: input_file:org/opensearch/ml/action/forward/TransportForwardAction$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$opensearch$ml$common$transport$forward$MLForwardRequestType = new int[MLForwardRequestType.values().length];

        static {
            try {
                $SwitchMap$org$opensearch$ml$common$transport$forward$MLForwardRequestType[MLForwardRequestType.DEPLOY_MODEL_DONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$opensearch$ml$common$transport$forward$MLForwardRequestType[MLForwardRequestType.REGISTER_MODEL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Inject
    public TransportForwardAction(TransportService transportService, ActionFilters actionFilters, MLTaskManager mLTaskManager, Client client, MLModelManager mLModelManager, DiscoveryNodeHelper discoveryNodeHelper, Settings settings, ClusterService clusterService, MLModelAutoReDeployer mLModelAutoReDeployer) {
        super("cluster:admin/opensearch/mlinternal/forward", transportService, actionFilters, MLForwardRequest::new);
        this.mlTaskManager = mLTaskManager;
        this.client = client;
        this.mlModelManager = mLModelManager;
        this.nodeHelper = discoveryNodeHelper;
        this.settings = settings;
        this.clusterService = clusterService;
        this.mlModelAutoReDeployer = mLModelAutoReDeployer;
        this.modelAutoRedeploySuccessRatio = ((Float) MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_SUCCESS_RATIO.get(settings)).floatValue();
        this.enableAutoReDeployModel = ((Boolean) MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE.get(settings)).booleanValue();
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE, bool -> {
            this.enableAutoReDeployModel = bool.booleanValue();
        });
    }

    protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<MLForwardResponse> actionListener) {
        MLModelState mLModelState;
        MLForwardInput forwardInput = MLForwardRequest.fromActionRequest(actionRequest).getForwardInput();
        String modelId = forwardInput.getModelId();
        String taskId = forwardInput.getTaskId();
        MLRegisterModelInput registerModelInput = forwardInput.getRegisterModelInput();
        MLTask mlTask = forwardInput.getMlTask();
        String workerNodeId = forwardInput.getWorkerNodeId();
        MLForwardRequestType requestType = forwardInput.getRequestType();
        String error = forwardInput.getError();
        log.debug("receive forward request: {}", forwardInput.getRequestType());
        try {
            switch (AnonymousClass1.$SwitchMap$org$opensearch$ml$common$transport$forward$MLForwardRequestType[requestType.ordinal()]) {
                case 1:
                    Set<String> workNodes = this.mlTaskManager.getWorkNodes(taskId);
                    MLTaskCache mLTaskCache = this.mlTaskManager.getMLTaskCache(taskId);
                    FunctionName functionName = mLTaskCache.getMlTask().getFunctionName();
                    if (workNodes != null) {
                        workNodes.remove(workerNodeId);
                    }
                    if (error != null) {
                        this.mlTaskManager.addNodeError(taskId, workerNodeId, error);
                    } else {
                        this.mlModelManager.addModelWorkerNode(modelId, workerNodeId);
                        syncModelWorkerNodes(modelId, functionName);
                    }
                    Set<String> hashSet = new HashSet();
                    if (workNodes != null && !workNodes.isEmpty()) {
                        HashSet hashSet2 = new HashSet(List.of((Object[]) RestActionUtils.getAllNodes(this.clusterService)));
                        hashSet = (Set) workNodes.stream().filter(str -> {
                            return !hashSet2.contains(str);
                        }).collect(Collectors.toSet());
                        if (!hashSet.isEmpty()) {
                            workNodes.removeAll(hashSet);
                        }
                    }
                    if (workNodes == null || workNodes.isEmpty()) {
                        if (!hashSet.isEmpty()) {
                            mLTaskCache.updateWorkerNode(hashSet);
                            this.mlModelManager.removeModelWorkerNode(modelId, false, (String[]) hashSet.toArray(new String[0]));
                        }
                        int intValue = mLTaskCache.getWorkerNodeSize().intValue();
                        MLTaskState mLTaskState = mLTaskCache.hasError() ? MLTaskState.COMPLETED_WITH_ERROR : MLTaskState.COMPLETED;
                        if (mLTaskCache.allNodeFailed() || mLTaskCache.getWorkerNodeSize().intValue() == 0) {
                            mLTaskState = MLTaskState.FAILED;
                            intValue = 0;
                        } else {
                            syncModelWorkerNodes(modelId, functionName);
                        }
                        ImmutableMap.Builder builder = ImmutableMap.builder();
                        builder.put("state", mLTaskState);
                        if (mLTaskCache.hasError()) {
                            intValue = mLTaskCache.getWorkerNodeSize().intValue() - mLTaskCache.getErrors().size();
                            builder.put("error", MLExceptionUtils.toJsonString(mLTaskCache.getErrors()));
                        }
                        boolean triggerNextModelDeployAndCheckIfRestRetryTimes = triggerNextModelDeployAndCheckIfRestRetryTimes(workNodes, taskId);
                        this.mlTaskManager.updateMLTask(taskId, builder.build(), MLTaskManager.TASK_SEMAPHORE_TIMEOUT, true);
                        if (mLTaskCache.allNodeFailed() || mLTaskCache.getWorkerNodeSize().intValue() == 0) {
                            mLModelState = MLModelState.DEPLOY_FAILED;
                            log.error("deploy model failed on all nodes, model id: {}", modelId);
                        } else {
                            mLModelState = mLTaskCache.hasError() ? MLModelState.PARTIALLY_DEPLOYED : MLModelState.DEPLOYED;
                        }
                        HashMap hashMap = new HashMap();
                        hashMap.put("model_state", mLModelState);
                        hashMap.put("last_deployed_time", Long.valueOf(Instant.now().toEpochMilli()));
                        hashMap.put("current_worker_node_count", Integer.valueOf(intValue));
                        if (triggerNextModelDeployAndCheckIfRestRetryTimes) {
                            log.debug("Model successfully deployed in cluster, setting the auto retry times to 0");
                            hashMap.put("auto_redeploy_retry_times", 0);
                        }
                        log.info("deploy model done with state: {}, model id: {}", mLModelState, modelId);
                        this.mlModelManager.updateModel(modelId, hashMap, ActionListener.runBefore(ActionListener.wrap(updateResponse -> {
                            if (updateResponse.status() == RestStatus.OK) {
                                log.debug("Updated ML model successfully: {}, model id: {}", updateResponse.status(), modelId);
                            } else {
                                log.error("Failed to update ML model {}, status: {}", modelId, updateResponse.status());
                            }
                        }, exc -> {
                            log.error("Failed to update ML model: " + modelId, exc);
                        }), () -> {
                            this.mlModelManager.removeAutoDeployModel(modelId);
                        }));
                    }
                    actionListener.onResponse(new MLForwardResponse("ok", (MLOutput) null));
                    break;
                case 2:
                    this.mlModelManager.registerMLModel(registerModelInput, mlTask);
                    actionListener.onResponse(new MLForwardResponse("ok", (MLOutput) null));
                    break;
                default:
                    throw new IllegalArgumentException("unsupported request type");
            }
        } catch (Exception e) {
            MLExceptionUtils.logException("Failed to execute forward action " + String.valueOf(forwardInput.getRequestType()), e, log);
            actionListener.onFailure(e);
        }
    }

    private boolean triggerNextModelDeployAndCheckIfRestRetryTimes(Set<String> set, String str) {
        if (!this.enableAutoReDeployModel || set == null || this.mlTaskManager.getMLTaskCache(str) == null) {
            return false;
        }
        if (((r0 - set.size()) - r0.errorNodesCount()) / this.mlTaskManager.getMLTaskCache(str).getWorkerNodeSize().intValue() < this.modelAutoRedeploySuccessRatio) {
            return false;
        }
        this.mlModelAutoReDeployer.redeployAModel();
        return true;
    }

    private void syncModelWorkerNodes(String str, FunctionName functionName) {
        DiscoveryNode[] allNodes = this.nodeHelper.getAllNodes();
        String[] workerNodes = this.mlModelManager.getWorkerNodes(str, functionName);
        if (allNodes.length <= 1 || workerNodes == null || workerNodes.length <= 0) {
            return;
        }
        log.debug("Sync to other nodes about worker nodes of model {}: {}", str, Arrays.toString(workerNodes));
        this.client.execute(MLSyncUpAction.INSTANCE, new MLSyncUpNodesRequest(allNodes, MLSyncUpInput.builder().addedWorkerNodes(ImmutableMap.of(str, workerNodes)).build()), ActionListener.wrap(mLSyncUpNodesResponse -> {
            log.debug("Sync up successfully");
        }, exc -> {
            log.error("Failed to sync up", exc);
        }));
    }
}
