package org.apache.zeppelin.interpreter.launcher;

import java.io.File;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.class */
public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
    private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
    public static final String SPARK_MASTER_KEY = "spark.master";
    private static final String DEFAULT_MASTER = "local[*]";
    Optional<String> sparkMaster;

    public SparkInterpreterLauncher(ZeppelinConfiguration zeppelinConfiguration, RecoveryStorage recoveryStorage) {
        super(zeppelinConfiguration, recoveryStorage);
        this.sparkMaster = Optional.empty();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v179 */
    /* JADX WARN: Type inference failed for: r0v180, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r18v3, types: [java.lang.Throwable, java.nio.file.DirectoryStream] */
    /* JADX WARN: Type inference failed for: r18v4 */
    /* JADX WARN: Type inference failed for: r18v5 */
    /* JADX WARN: Type inference failed for: r19v7, types: [java.lang.Throwable] */
    @Override // org.apache.zeppelin.interpreter.launcher.StandardInterpreterLauncher
    public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext interpreterLaunchContext) throws IOException {
        Map<String, String> buildEnvFromProperties = super.buildEnvFromProperties(interpreterLaunchContext);
        Properties properties = new Properties();
        String sparkMaster = getSparkMaster();
        if (sparkMaster != null) {
            properties.put(SPARK_MASTER_KEY, sparkMaster);
        }
        for (String str : this.properties.stringPropertyNames()) {
            String property = this.properties.getProperty(str);
            if (RemoteInterpreterUtils.isEnvString(str) && !StringUtils.isBlank(property)) {
                buildEnvFromProperties.put(str, property);
            }
            if (isSparkConf(str, property)) {
                properties.setProperty(str, property);
            }
        }
        if (!properties.containsKey("spark.app.name") || StringUtils.isBlank(properties.getProperty("spark.app.name"))) {
            properties.setProperty("spark.app.name", interpreterLaunchContext.getInterpreterGroupId());
        }
        setupPropertiesForPySpark(properties);
        setupPropertiesForSparkR(properties);
        String property2 = interpreterLaunchContext.getProperties().getProperty("zeppelin.interpreter.conda.env.name");
        if (StringUtils.isNotBlank(property2)) {
            if (!isYarnCluster()) {
                throw new IOException("zeppelin.interpreter.conda.env.name only works for yarn-cluster mode");
            }
            properties.setProperty("spark.pyspark.python", property2 + "/bin/python");
        }
        if (isYarnMode() && getDeployMode().equals("cluster")) {
            buildEnvFromProperties.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
            properties.setProperty("spark.yarn.submit.waitAppCompletion", "false");
        } else if (this.zConf.isOnlyYarnCluster()) {
            throw new IOException("Only yarn-cluster mode is allowed, please set " + ZeppelinConfiguration.ConfVars.ZEPPELIN_SPARK_ONLY_YARN_CLUSTER.getVarName() + " to false if you want to use other modes.");
        }
        if (isYarnMode() && getDeployMode().equals("cluster")) {
            if (properties.containsKey("spark.files")) {
                properties.put("spark.files", properties.getProperty("spark.files") + "," + this.zConf.getConfDir() + "/log4j_yarn_cluster.properties");
            } else {
                properties.put("spark.files", this.zConf.getConfDir() + "/log4j_yarn_cluster.properties");
            }
            properties.put("spark.yarn.maxAppAttempts", "1");
        }
        try {
            String detectSparkScalaVersion = detectSparkScalaVersion(getEnv("SPARK_HOME"), buildEnvFromProperties);
            LOGGER.info("Scala version: {}", detectSparkScalaVersion);
            interpreterLaunchContext.getProperties().put("zeppelin.spark.scala.version", detectSparkScalaVersion);
            if (isYarnMode() && getDeployMode().equals("cluster")) {
                try {
                    ArrayList arrayList = new ArrayList();
                    Path path = Paths.get(this.zConf.getInterpreterLocalRepoPath(), interpreterLaunchContext.getInterpreterSettingId());
                    if (Files.exists(path, new LinkOption[0]) && Files.isDirectory(path, new LinkOption[0])) {
                        DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(path, (DirectoryStream.Filter<? super Path>) path2 -> {
                            return Files.isRegularFile(path2, new LinkOption[0]);
                        });
                        Throwable th = null;
                        try {
                            arrayList.addAll((List) StreamSupport.stream(newDirectoryStream.spliterator(), false).map(path3 -> {
                                return path3.toAbsolutePath().toString();
                            }).collect(Collectors.toList()));
                            if (newDirectoryStream != null) {
                                if (0 != 0) {
                                    try {
                                        newDirectoryStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    newDirectoryStream.close();
                                }
                            }
                        } finally {
                        }
                    }
                    Path path4 = Paths.get(this.zConf.getZeppelinHome(), "/interpreter/spark/scala-" + detectSparkScalaVersion);
                    if (!path4.toFile().exists()) {
                        throw new IOException("spark scala folder " + path4.toFile() + " doesn't exist");
                    }
                    DirectoryStream<Path> newDirectoryStream2 = Files.newDirectoryStream(path4, (DirectoryStream.Filter<? super Path>) path5 -> {
                        return Files.isRegularFile(path5, new LinkOption[0]);
                    });
                    ?? r18 = 0;
                    try {
                        try {
                            List list = (List) StreamSupport.stream(newDirectoryStream2.spliterator(), false).map(path6 -> {
                                return path6.toAbsolutePath().toString();
                            }).collect(Collectors.toList());
                            arrayList.addAll(list);
                            List list2 = list;
                            if (newDirectoryStream2 != null) {
                                if (0 != 0) {
                                    try {
                                        newDirectoryStream2.close();
                                        list2 = list;
                                    } catch (Throwable th3) {
                                        r18.addSuppressed(th3);
                                        list2 = th3;
                                    }
                                } else {
                                    newDirectoryStream2.close();
                                    list2 = list;
                                }
                            }
                            try {
                                DirectoryStream<Path> newDirectoryStream3 = Files.newDirectoryStream(Paths.get(this.zConf.getZeppelinHome(), "/interpreter"), (DirectoryStream.Filter<? super Path>) path7 -> {
                                    return Files.isRegularFile(path7, new LinkOption[0]);
                                });
                                Throwable th4 = null;
                                List list3 = (List) StreamSupport.stream(newDirectoryStream3.spliterator(), false).filter(path8 -> {
                                    return path8.toFile().getName().startsWith("zeppelin-interpreter-shaded") && path8.toFile().getName().endsWith(".jar");
                                }).map(path9 -> {
                                    return path9.toAbsolutePath().toString();
                                }).collect(Collectors.toList());
                                if (list3.isEmpty()) {
                                    throw new IOException("zeppelin-interpreter-shaded jar is not found");
                                }
                                if (list3.size() > 1) {
                                    throw new IOException("more than 1 zeppelin-interpreter-shaded jars are found: " + StringUtils.join(list3, ","));
                                }
                                arrayList.addAll(list3);
                                if (newDirectoryStream3 != null) {
                                    if (0 != 0) {
                                        try {
                                            newDirectoryStream3.close();
                                        } catch (Throwable th5) {
                                            th4.addSuppressed(th5);
                                        }
                                    } else {
                                        newDirectoryStream3.close();
                                    }
                                }
                                if (properties.containsKey("spark.jars")) {
                                    properties.put("spark.jars", properties.getProperty("spark.jars") + "," + StringUtils.join(arrayList, ","));
                                } else {
                                    properties.put("spark.jars", StringUtils.join(arrayList, ","));
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    throw new IOException("Fail to set additional jars for spark interpreter", e);
                }
            }
            StringJoiner stringJoiner = new StringJoiner("|");
            if (interpreterLaunchContext.getOption().isUserImpersonate() && this.zConf.getZeppelinImpersonateSparkProxyUser()) {
                stringJoiner.add("--proxy-user");
                stringJoiner.add(interpreterLaunchContext.getUserName());
                properties.remove("spark.yarn.keytab");
                properties.remove("spark.yarn.principal");
            }
            for (String str2 : properties.stringPropertyNames()) {
                stringJoiner.add("--conf");
                stringJoiner.add(str2 + "=" + properties.getProperty(str2) + "");
            }
            buildEnvFromProperties.put("ZEPPELIN_SPARK_CONF", stringJoiner.toString());
            for (String str3 : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
                String env = getEnv(str3);
                if (!StringUtils.isBlank(env)) {
                    buildEnvFromProperties.put(str3, env);
                }
            }
            String property3 = this.properties.getProperty("spark.yarn.keytab", this.zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB));
            String property4 = this.properties.getProperty("spark.yarn.principal", this.zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL));
            if (StringUtils.isBlank(property3) || StringUtils.isBlank(property4)) {
                LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified");
            } else {
                buildEnvFromProperties.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", property3);
                buildEnvFromProperties.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", property4);
                LOGGER.info("Run Spark under secure mode with keytab: {}, principal: {}", property3, property4);
            }
            buildEnvFromProperties.put("PYSPARK_PIN_THREAD", "true");
            String env2 = getEnv("SPARK_CONF_DIR");
            if (StringUtils.isBlank(env2)) {
                env2 = getEnv("SPARK_HOME") + "/conf";
            }
            Properties properties2 = new Properties();
            File file = new File(env2, "spark-defaults.conf");
            if (file.exists()) {
                properties2.load(new FileInputStream(file));
                String property5 = properties2.getProperty("spark.driver.extraClassPath");
                if (!StringUtils.isBlank(property5)) {
                    buildEnvFromProperties.put("ZEPPELIN_INTP_CLASSPATH", property5);
                }
            } else {
                LOGGER.warn("spark-defaults.conf doesn't exist: {}", file.getAbsolutePath());
            }
            if (isYarnMode()) {
                boolean parseBoolean = Boolean.parseBoolean(interpreterLaunchContext.getProperties().getProperty("zeppelin.spark.run.asLoginUser", "true"));
                String userName = interpreterLaunchContext.getUserName();
                if (parseBoolean && !"anonymous".equals(userName)) {
                    buildEnvFromProperties.put("HADOOP_USER_NAME", userName);
                }
            }
            LOGGER.info("buildEnvFromProperties: {}", buildEnvFromProperties);
            return buildEnvFromProperties;
        } catch (Exception e2) {
            throw new IOException("Fail to detect scala version, the reason is:" + e2.getMessage());
        }
    }

    private String detectSparkScalaVersion(String str, Map<String, String> map) throws Exception {
        LOGGER.info("Detect scala version from SPARK_HOME: {}", str);
        ProcessBuilder processBuilder = new ProcessBuilder(str + "/bin/spark-submit", "--version");
        processBuilder.environment().putAll(map);
        File createTempFile = File.createTempFile("zeppelin-spark", ".out");
        processBuilder.redirectError(createTempFile);
        processBuilder.start().waitFor();
        Matcher matcher = Pattern.compile(".*Using Scala version (.*),.*").matcher(IOUtils.toString(new FileInputStream(createTempFile), StandardCharsets.UTF_8));
        if (!matcher.find()) {
            return detectSparkScalaVersionByReplClass(str);
        }
        String group = matcher.group(1);
        if (group.startsWith("2.10")) {
            return "2.10";
        }
        if (group.startsWith("2.11")) {
            return "2.11";
        }
        if (group.startsWith("2.12")) {
            return "2.12";
        }
        throw new Exception("Unsupported scala version: " + group);
    }

    private String detectSparkScalaVersionByReplClass(String str) throws Exception {
        if (!new File(str + "/lib").exists()) {
            return Stream.of((Object[]) new File(new StringBuilder().append(str).append("/jars").toString()).listFiles()).anyMatch(file -> {
                return file.getName().contains("spark-repl_2.11");
            }) ? "2.11" : "2.10";
        }
        File[] listFiles = new File(str + "/lib").listFiles(new FilenameFilter() { // from class: org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher.1
            @Override // java.io.FilenameFilter
            public boolean accept(File file2, String str2) {
                return str2.contains("spark-assembly");
            }
        });
        if (listFiles.length == 0) {
            throw new Exception("No spark assembly file found in SPARK_HOME: " + str);
        }
        if (listFiles.length > 1) {
            throw new Exception("Multiple spark assembly file found in SPARK_HOME: " + str);
        }
        try {
            URLClassLoader uRLClassLoader = new URLClassLoader(new URL[]{listFiles[0].toURI().toURL()});
            Throwable th = null;
            try {
                uRLClassLoader.loadClass("org.apache.spark.repl.SparkCommandLine");
                if (uRLClassLoader != null) {
                    if (0 != 0) {
                        try {
                            uRLClassLoader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        uRLClassLoader.close();
                    }
                }
                return "2.10";
            } finally {
            }
        } catch (ClassNotFoundException e) {
            return "2.11";
        }
    }

    private String getEnv(String str) {
        String property = this.properties.getProperty(str);
        if (property == null) {
            property = System.getenv(str);
        }
        return property;
    }

    private boolean isSparkConf(String str, String str2) {
        return (StringUtils.isEmpty(str) || !str.startsWith("spark.") || StringUtils.isEmpty(str2)) ? false : true;
    }

    private void setupPropertiesForPySpark(Properties properties) {
        if (isYarnMode()) {
            properties.setProperty("spark.yarn.isPython", "true");
        }
    }

    private void mergeSparkProperty(Properties properties, String str, String str2) {
        if (!properties.containsKey(str)) {
            properties.setProperty(str, str2);
        } else {
            properties.setProperty(str, properties.getProperty(str) + "," + str2);
        }
    }

    private void setupPropertiesForSparkR(Properties properties) {
        File file;
        if (isYarnMode()) {
            String env = getEnv("SPARK_HOME");
            if (env != null) {
                file = new File(env, "R" + File.separator + "lib");
            } else {
                if (!getSparkMaster().startsWith("local")) {
                    throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting for non-local mode, if you specify it in zeppelin-env.sh, please move that into  interpreter setting");
                }
                file = new File(this.zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME), "interpreter" + File.separator + "spark" + File.separator + "R");
            }
            File file2 = new File(file, "sparkr.zip");
            if (file2.exists() && file2.isFile()) {
                mergeSparkProperty(properties, "spark.yarn.dist.archives", file2.getAbsolutePath() + "#sparkr");
            } else {
                LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
            }
        }
    }

    private String getSparkMaster() {
        if (!this.sparkMaster.isPresent()) {
            String property = this.properties.getProperty(SPARK_MASTER_KEY);
            if (property == null) {
                property = this.properties.getProperty("master");
                if (property == null) {
                    String str = System.getenv("SPARK_MASTER");
                    property = str == null ? DEFAULT_MASTER : str;
                }
                this.properties.put(SPARK_MASTER_KEY, property);
            }
            this.sparkMaster = Optional.of(property);
        }
        return this.sparkMaster.get();
    }

    private String getDeployMode() {
        if (getSparkMaster().equals("yarn-client")) {
            return "client";
        }
        if (getSparkMaster().equals("yarn-cluster")) {
            return "cluster";
        }
        if (getSparkMaster().startsWith("local")) {
            return "client";
        }
        String property = this.properties.getProperty("spark.submit.deployMode");
        if (property == null) {
            throw new RuntimeException("master is set as yarn, but spark.submit.deployMode is not specified");
        }
        if (property.equals("client") || property.equals("cluster")) {
            return property;
        }
        throw new RuntimeException("Invalid value for spark.submit.deployMode: " + property);
    }

    private boolean isYarnMode() {
        return getSparkMaster().startsWith("yarn");
    }

    private boolean isYarnCluster() {
        return isYarnMode() && "cluster".equalsIgnoreCase(getDeployMode());
    }

    private boolean isYarnClient() {
        return isYarnMode() && "client".equalsIgnoreCase(getDeployMode());
    }
}
