package org.apache.zeppelin.submarine.job.thread;

import com.google.common.io.Resources;
import com.hubspot.jinjava.Jinjava;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecuteResultHandler;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.LogOutputStream;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.submarine.commons.SubmarineConstants;
import org.apache.zeppelin.submarine.commons.SubmarineUI;
import org.apache.zeppelin.submarine.commons.SubmarineUtils;
import org.apache.zeppelin.submarine.hadoop.HdfsClient;
import org.apache.zeppelin.submarine.job.SubmarineJob;
import org.apache.zeppelin.submarine.job.SubmarineJobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/submarine/job/thread/JobRunThread.class */
public class JobRunThread extends Thread {
    private SubmarineJob submarineJob;
    private Logger LOGGER = LoggerFactory.getLogger(JobRunThread.class);
    private AtomicBoolean running = new AtomicBoolean(false);
    private Lock lockRunning = new ReentrantLock();

    public JobRunThread(SubmarineJob submarineJob) {
        this.submarineJob = submarineJob;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        if (false == this.lockRunning.tryLock()) {
            this.LOGGER.warn("Can not get JobRunThread lockRunning!");
            return;
        }
        final SubmarineUI submarineUI = this.submarineJob.getSubmarineUI();
        try {
            try {
                InterpreterContext intpContext = this.submarineJob.getIntpContext();
                String noteId = intpContext.getNoteId();
                String user = intpContext.getAuthenticationInfo().getUser();
                final String jobName = SubmarineUtils.getJobName(user, noteId);
                if (true == this.running.get()) {
                    String format = String.format("Job %s already running.", jobName);
                    submarineUI.outputLog("WARN", format);
                    this.LOGGER.warn(format);
                    this.running.set(false);
                    this.lockRunning.unlock();
                    return;
                }
                this.running.set(true);
                Properties properties = this.submarineJob.getProperties();
                HdfsClient hdfsClient = this.submarineJob.getHdfsClient();
                File pythonWorkDir = this.submarineJob.getPythonWorkDir();
                this.submarineJob.setCurrentJobState(SubmarineJobStatus.EXECUTE_SUBMARINE);
                String property = properties.getProperty(SubmarineConstants.SUBMARINE_ALGORITHM_HDFS_PATH, "");
                if (!property.startsWith("hdfs://")) {
                    submarineUI.outputLog("Configuration error", "Algorithm file upload HDFS path, Must be `hdfs://` prefix. now setting " + property);
                    this.running.set(false);
                    this.lockRunning.unlock();
                    return;
                }
                String saveParagraphToFiles = hdfsClient.saveParagraphToFiles(noteId, intpContext.getIntpEventClient().getParagraphList(user, noteId), pythonWorkDir == null ? "" : pythonWorkDir.getAbsolutePath(), properties);
                if (!StringUtils.isEmpty(saveParagraphToFiles)) {
                    submarineUI.outputLog("Save algorithm file", saveParagraphToFiles);
                }
                HashMap propertiesToJinjaParams = SubmarineUtils.propertiesToJinjaParams(properties, this.submarineJob, true);
                String render = new Jinjava().render(Resources.toString(Resources.getResource(SubmarineJob.SUBMARINE_JOBRUN_TF_JINJA), Charsets.UTF_8), propertiesToJinjaParams);
                if (render.indexOf("\n") == 0) {
                    render = render.replaceFirst("\n", "");
                }
                submarineUI.outputLog("Submarine submit command", new StringBuffer(render).toString());
                long longValue = Long.valueOf(properties.getProperty(SubmarineJob.TIMEOUT_PROPERTY, SubmarineJob.defaultTimeout)).longValue();
                CommandLine parse = CommandLine.parse(SubmarineJob.shell);
                parse.addArgument(render, false);
                DefaultExecutor defaultExecutor = new DefaultExecutor();
                ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(longValue);
                defaultExecutor.setWatchdog(executeWatchdog);
                final StringBuffer stringBuffer = new StringBuffer();
                defaultExecutor.setStreamHandler(new PumpStreamHandler(new LogOutputStream() { // from class: org.apache.zeppelin.submarine.job.thread.JobRunThread.1
                    protected void processLine(String str, int i) {
                        String trim = str.trim();
                        if (StringUtils.isEmpty(trim)) {
                            return;
                        }
                        stringBuffer.append(trim + "\n");
                    }
                }));
                if (Boolean.valueOf(properties.getProperty(SubmarineJob.DIRECTORY_USER_HOME)).booleanValue()) {
                    defaultExecutor.setWorkingDirectory(new File(System.getProperty("user.home")));
                }
                HashMap hashMap = new HashMap();
                if (StringUtils.equals((String) propertiesToJinjaParams.get(SubmarineConstants.INTERPRETER_LAUNCH_MODE), "yarn")) {
                    String str = (String) propertiesToJinjaParams.get(SubmarineConstants.DOCKER_JAVA_HOME);
                    String str2 = (String) propertiesToJinjaParams.get(SubmarineConstants.DOCKER_HADOOP_HDFS_HOME);
                    String str3 = (String) propertiesToJinjaParams.get(SubmarineConstants.SUBMARINE_HADOOP_CONF_DIR);
                    hashMap.put("JAVA_HOME", str);
                    hashMap.put("HADOOP_HOME", str2);
                    hashMap.put("HADOOP_HDFS_HOME", str2);
                    hashMap.put("HADOOP_CONF_DIR", str3);
                    hashMap.put("YARN_CONF_DIR", str3);
                    hashMap.put("CLASSPATH", "`$HADOOP_HDFS_HOME/bin/hadoop classpath --glob`");
                    hashMap.put("ZEPPELIN_FORCE_STOP", "true");
                }
                this.LOGGER.info("Execute EVN: {}, Command: {} ", hashMap.toString(), render);
                final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                defaultExecutor.execute(parse, hashMap, new DefaultExecuteResultHandler() { // from class: org.apache.zeppelin.submarine.job.thread.JobRunThread.2
                    public void onProcessComplete(int i) {
                        String format2 = String.format("jobName %s ProcessComplete exit value is : %d", jobName, Integer.valueOf(i));
                        JobRunThread.this.LOGGER.info(format2);
                        submarineUI.outputLog("JOR RUN COMPLETE", format2);
                        atomicBoolean.set(false);
                        JobRunThread.this.submarineJob.setCurrentJobState(SubmarineJobStatus.EXECUTE_SUBMARINE_FINISHED);
                    }

                    public void onProcessFailed(ExecuteException executeException) {
                        String format2 = String.format("jobName %s ProcessFailed exit value is : %d, exception is : %s", jobName, Integer.valueOf(executeException.getExitValue()), executeException.getMessage());
                        JobRunThread.this.LOGGER.error(format2);
                        submarineUI.outputLog("JOR RUN FAILED", format2);
                        atomicBoolean.set(false);
                        JobRunThread.this.submarineJob.setCurrentJobState(SubmarineJobStatus.EXECUTE_SUBMARINE_ERROR);
                    }
                });
                int i = 100;
                while (true) {
                    int i2 = i;
                    i--;
                    if (i2 <= 0 || !atomicBoolean.get() || !this.running.get()) {
                        break;
                    } else {
                        Thread.sleep(1000L);
                    }
                }
                if (executeWatchdog.isWatching()) {
                    executeWatchdog.destroyProcess();
                    Thread.sleep(1000L);
                }
                if (executeWatchdog.isWatching()) {
                    executeWatchdog.killedProcess();
                }
                Map<String, Object> jobStateByYarn = this.submarineJob.getJobStateByYarn(jobName);
                int i3 = 50;
                while (true) {
                    int i4 = i3;
                    i3--;
                    if (i4 <= 0 || jobStateByYarn.containsKey(SubmarineConstants.YARN_APP_STATE_NAME) || !this.running.get()) {
                        break;
                    }
                    Thread.sleep(3000L);
                    jobStateByYarn = this.submarineJob.getJobStateByYarn(jobName);
                }
                if (!jobStateByYarn.containsKey(SubmarineConstants.YARN_APP_STATE_NAME)) {
                    String format2 = String.format("JOB %s was not submitted to YARN!", jobName);
                    this.LOGGER.error(format2);
                    submarineUI.outputLog("JOR RUN FAILED", format2);
                    this.submarineJob.setCurrentJobState(SubmarineJobStatus.EXECUTE_SUBMARINE_ERROR);
                }
                this.running.set(false);
                this.lockRunning.unlock();
            } catch (Exception e) {
                this.LOGGER.error(e.getMessage(), e);
                this.submarineJob.setCurrentJobState(SubmarineJobStatus.EXECUTE_SUBMARINE_ERROR);
                submarineUI.outputLog("Exception", e.getMessage());
                this.running.set(false);
                this.lockRunning.unlock();
            }
        } catch (Throwable th) {
            this.running.set(false);
            this.lockRunning.unlock();
            throw th;
        }
    }

    public void stopRunning() {
        try {
            this.running.set(false);
            boolean tryLock = this.lockRunning.tryLock();
            int i = 0;
            while (false == tryLock) {
                int i2 = i;
                i++;
                if (i2 >= 100) {
                    break;
                }
                this.LOGGER.warn("Can not get the JobRunThread lockRunning [{}] !", Integer.valueOf(i));
                Thread.sleep(500L);
                tryLock = this.lockRunning.tryLock();
            }
        } catch (Exception e) {
            this.LOGGER.error(e.getMessage(), e);
        } finally {
            this.lockRunning.unlock();
        }
    }
}
