package org.opensearch.ml.task;

import java.time.Instant;
import java.util.Base64;
import java.util.Map;
import java.util.UUID;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.dataframe.DataFrame;
import org.opensearch.ml.common.dataset.DataFrameInputDataset;
import org.opensearch.ml.common.dataset.MLInputDataType;
import org.opensearch.ml.common.parameter.MLInput;
import org.opensearch.ml.common.parameter.MLPredictionOutput;
import org.opensearch.ml.common.parameter.MLTask;
import org.opensearch.ml.common.parameter.MLTaskState;
import org.opensearch.ml.common.parameter.MLTaskType;
import org.opensearch.ml.common.parameter.Model;
import org.opensearch.ml.common.transport.MLTaskResponse;
import org.opensearch.ml.common.transport.prediction.MLPredictionTaskRequest;
import org.opensearch.ml.engine.MLEngine;
import org.opensearch.ml.indices.MLIndicesHandler;
import org.opensearch.ml.indices.MLInputDatasetHandler;
import org.opensearch.ml.permission.AccessController;
import org.opensearch.ml.plugin.MachineLearningPlugin;
import org.opensearch.ml.stats.ActionName;
import org.opensearch.ml.stats.MLStats;
import org.opensearch.ml.stats.StatNames;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/* loaded from: input_file:org/opensearch/ml/task/MLPredictTaskRunner.class */
public class MLPredictTaskRunner extends MLTaskRunner<MLPredictionTaskRequest, MLTaskResponse> {

    @Generated
    private static final Logger log = LogManager.getLogger(MLPredictTaskRunner.class);
    private final ThreadPool threadPool;
    private final ClusterService clusterService;
    private final Client client;
    private final MLInputDatasetHandler mlInputDatasetHandler;

    public MLPredictTaskRunner(ThreadPool threadPool, ClusterService clusterService, Client client, MLTaskManager mLTaskManager, MLStats mLStats, MLInputDatasetHandler mLInputDatasetHandler, MLTaskDispatcher mLTaskDispatcher, MLCircuitBreakerService mLCircuitBreakerService) {
        super(mLTaskManager, mLStats, mLTaskDispatcher, mLCircuitBreakerService);
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.client = client;
        this.mlInputDatasetHandler = mLInputDatasetHandler;
    }

    @Override // org.opensearch.ml.task.MLTaskRunner
    public void executeTask(MLPredictionTaskRequest mLPredictionTaskRequest, TransportService transportService, ActionListener<MLTaskResponse> actionListener) {
        this.mlTaskDispatcher.dispatchTask(ActionListener.wrap(discoveryNode -> {
            if (this.clusterService.localNode().getId().equals(discoveryNode.getId())) {
                log.info("execute ML prediction request {} locally on node {}", mLPredictionTaskRequest.toString(), discoveryNode.getId());
                startPredictionTask(mLPredictionTaskRequest, actionListener);
            } else {
                log.info("execute ML prediction request {} remotely on node {}", mLPredictionTaskRequest.toString(), discoveryNode.getId());
                transportService.sendRequest(discoveryNode, "cluster:admin/opensearch/ml/predict", mLPredictionTaskRequest, new ActionListenerResponseHandler(actionListener, MLTaskResponse::new));
            }
        }, exc -> {
            actionListener.onFailure(exc);
        }));
    }

    public void startPredictionTask(MLPredictionTaskRequest mLPredictionTaskRequest, ActionListener<MLTaskResponse> actionListener) {
        MLInputDataType inputDataType = mLPredictionTaskRequest.getMlInput().getInputDataset().getInputDataType();
        Instant now = Instant.now();
        MLTask build = MLTask.builder().taskId(UUID.randomUUID().toString()).modelId(mLPredictionTaskRequest.getModelId()).taskType(MLTaskType.PREDICTION).inputType(inputDataType).functionName(mLPredictionTaskRequest.getMlInput().getFunctionName()).state(MLTaskState.CREATED).workerNode(this.clusterService.localNode().getId()).createTime(now).lastUpdateTime(now).async(false).build();
        MLInput mlInput = mLPredictionTaskRequest.getMlInput();
        if (mlInput.getInputDataset().getInputDataType().equals(MLInputDataType.SEARCH_QUERY)) {
            this.mlInputDatasetHandler.parseSearchQueryInput(mlInput.getInputDataset(), new ThreadedActionListener(log, this.threadPool, MachineLearningPlugin.TASK_THREAD_POOL, ActionListener.wrap(dataFrame -> {
                predict(build, dataFrame, mLPredictionTaskRequest, actionListener);
            }, exc -> {
                log.error("Failed to generate DataFrame from search query", exc);
                handleAsyncMLTaskFailure(build, exc);
                actionListener.onFailure(exc);
            }), false));
        } else {
            DataFrame parseDataFrameInput = this.mlInputDatasetHandler.parseDataFrameInput(mlInput.getInputDataset());
            this.threadPool.executor(MachineLearningPlugin.TASK_THREAD_POOL).execute(() -> {
                predict(build, parseDataFrameInput, mLPredictionTaskRequest, actionListener);
            });
        }
    }

    private void predict(MLTask mLTask, DataFrame dataFrame, MLPredictionTaskRequest mLPredictionTaskRequest, ActionListener<MLTaskResponse> actionListener) {
        ActionListener<MLTaskResponse> wrappedCleanupListener = wrappedCleanupListener(actionListener, mLTask.getTaskId());
        this.mlStats.getStat(StatNames.ML_EXECUTING_TASK_COUNT).increment();
        this.mlStats.getStat(StatNames.ML_TOTAL_REQUEST_COUNT).increment();
        this.mlStats.createCounterStatIfAbsent(StatNames.requestCountStat(mLTask.getFunctionName(), ActionName.PREDICT)).increment();
        this.mlTaskManager.add(mLTask);
        if (mLPredictionTaskRequest.getModelId() == null) {
            IllegalArgumentException illegalArgumentException = new IllegalArgumentException("ModelId is invalid");
            log.error("ModelId is invalid", illegalArgumentException);
            handlePredictFailure(mLTask, wrappedCleanupListener, illegalArgumentException, false);
            return;
        }
        try {
            ThreadContext.StoredContext stashContext = this.threadPool.getThreadContext().stashContext();
            try {
                MLInput mlInput = mLPredictionTaskRequest.getMlInput();
                this.client.get(new GetRequest(MLIndicesHandler.ML_MODEL_INDEX, mLTask.getModelId()), ActionListener.runBefore(ActionListener.wrap(getResponse -> {
                    if (getResponse == null || !getResponse.isExists()) {
                        wrappedCleanupListener.onFailure(new ResourceNotFoundException("No model found, please check the modelId.", new Object[0]));
                        return;
                    }
                    Map sourceAsMap = getResponse.getSourceAsMap();
                    User userContext = AccessController.getUserContext(this.client);
                    if (!AccessController.checkUserPermissions(userContext, User.parse((String) sourceAsMap.get("user")), mLPredictionTaskRequest.getModelId())) {
                        handlePredictFailure(mLTask, wrappedCleanupListener, new OpenSearchException("User: " + userContext.getName() + " does not have permissions to run predict by model: " + mLPredictionTaskRequest.getModelId(), new Object[0]), false);
                        return;
                    }
                    Model model = new Model();
                    model.setName((String) sourceAsMap.get("name"));
                    model.setVersion(((Integer) sourceAsMap.get("version")).intValue());
                    model.setContent(Base64.getDecoder().decode((String) sourceAsMap.get("content")));
                    this.mlTaskManager.updateTaskState(mLTask.getTaskId(), MLTaskState.RUNNING, mLTask.isAsync());
                    MLPredictionOutput predict = MLEngine.predict(mlInput.toBuilder().inputDataset(new DataFrameInputDataset(dataFrame)).build(), model);
                    if (predict instanceof MLPredictionOutput) {
                        predict.setStatus(MLTaskState.COMPLETED.name());
                    }
                    handleAsyncMLTaskComplete(mLTask);
                    wrappedCleanupListener.onResponse(MLTaskResponse.builder().output(predict).build());
                }, exc -> {
                    log.error("Failed to predict " + mlInput.getAlgorithm() + ", modelId: " + mLTask.getModelId(), exc);
                    handlePredictFailure(mLTask, wrappedCleanupListener, exc, true);
                }), () -> {
                    stashContext.restore();
                }));
                if (stashContext != null) {
                    stashContext.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to get model " + mLTask.getModelId(), e);
            handlePredictFailure(mLTask, wrappedCleanupListener, e, true);
        }
    }

    private void handlePredictFailure(MLTask mLTask, ActionListener<MLTaskResponse> actionListener, Exception exc, boolean z) {
        if (z) {
            this.mlStats.createCounterStatIfAbsent(StatNames.failureCountStat(mLTask.getFunctionName(), ActionName.PREDICT)).increment();
            this.mlStats.getStat(StatNames.ML_TOTAL_FAILURE_COUNT).increment();
        }
        handleAsyncMLTaskFailure(mLTask, exc);
        actionListener.onFailure(exc);
    }
}
