package org.apache.hadoop.hive.llap.tezplugins;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.class */
public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
    private final ConcurrentMap<LlapDaemonProtocolProtos.QueryIdentifierProto, ByteBuffer> credentialMap;
    private final EntityTracker entityTracker;
    private final SourceStateTracker sourceStateTracker;
    private final Set<LlapNodeId> nodesForQuery;
    private LlapTaskSchedulerService scheduler;
    private LlapProtocolClientProxy communicator;
    private long deleteDelayOnDagComplete;
    private final LlapTaskUmbilicalProtocol umbilical;
    private final String user;
    private String amHost;
    private String timelineServerUri;
    private final ConcurrentMap<LlapNodeId, Long> knownNodeMap;
    private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap;
    private final LlapRegistryService serviceRegistry;
    private volatile LlapDaemonProtocolProtos.QueryIdentifierProto currentQueryIdentifierProto;
    private volatile String currentHiveQueryId;
    private final AtomicLong nodeNotFoundLogTime;
    private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class);
    private static final Joiner JOINER = Joiner.on("");
    private static final Joiner PATH_JOINER = Joiner.on("/");
    static final Object pluginInitLock = new Object();
    static LlapTaskCommunicator instance = null;
    private static final String LLAP_TOKEN_NAME = LlapTokenIdentifier.KIND_NAME.toString();

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator$EntityTracker.class */
    public static final class EntityTracker {

        @VisibleForTesting
        final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap();

        @VisibleForTesting
        final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap();

        @VisibleForTesting
        final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap();
        private final ConcurrentMap<TezTaskAttemptID, String> uniqueNodeMap = new ConcurrentHashMap();

        EntityTracker() {
        }

        void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID tezTaskAttemptID, String str, int i) {
            if (LlapTaskCommunicator.LOG.isDebugEnabled()) {
                LlapTaskCommunicator.LOG.debug("Registering " + containerId + ", " + tezTaskAttemptID + " for node: " + str + ":" + i);
            }
            LlapNodeId llapNodeId = LlapNodeId.getInstance(str, i);
            this.attemptToNodeMap.putIfAbsent(tezTaskAttemptID, llapNodeId);
            registerContainer(containerId, str, i);
            BiMap<ContainerId, TezTaskAttemptID> create = HashBiMap.create();
            BiMap<ContainerId, TezTaskAttemptID> putIfAbsent = this.nodeMap.putIfAbsent(llapNodeId, create);
            BiMap<ContainerId, TezTaskAttemptID> biMap = putIfAbsent == null ? create : putIfAbsent;
            synchronized (biMap) {
                biMap.put(containerId, tezTaskAttemptID);
            }
            this.nodeMap.putIfAbsent(llapNodeId, biMap);
        }

        public String getUniqueNodeId(TezTaskAttemptID tezTaskAttemptID) {
            return this.uniqueNodeMap.get(tezTaskAttemptID);
        }

        public void registerTaskSubmittedToNode(TezTaskAttemptID tezTaskAttemptID, String str) {
            String putIfAbsent;
            synchronized (this.attemptToNodeMap) {
                if (this.attemptToNodeMap.containsKey(tezTaskAttemptID) && (putIfAbsent = this.uniqueNodeMap.putIfAbsent(tezTaskAttemptID, str)) != null) {
                    LlapTaskCommunicator.LOG.warn("Replaced the unique node mapping for task from " + putIfAbsent + " to " + str);
                }
            }
        }

        void unregisterTaskAttempt(TezTaskAttemptID tezTaskAttemptID) {
            this.uniqueNodeMap.remove(tezTaskAttemptID);
            synchronized (this.attemptToNodeMap) {
                LlapNodeId remove = this.attemptToNodeMap.remove(tezTaskAttemptID);
                if (remove == null) {
                    return;
                }
                BiMap<ContainerId, TezTaskAttemptID> biMap = this.nodeMap.get(remove);
                ContainerId containerId = null;
                if (biMap != null) {
                    synchronized (biMap) {
                        containerId = (ContainerId) biMap.inverse().remove(tezTaskAttemptID);
                    }
                    if (biMap.isEmpty()) {
                        this.nodeMap.remove(remove);
                    }
                }
                if (containerId != null) {
                    this.containerToNodeMap.remove(containerId);
                }
            }
        }

        void registerContainer(ContainerId containerId, String str, int i) {
            if (LlapTaskCommunicator.LOG.isDebugEnabled()) {
                LlapTaskCommunicator.LOG.debug("Registering " + containerId + " for node: " + str + ":" + i);
            }
            this.containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(str, i));
        }

        LlapNodeId getNodeIdForContainer(ContainerId containerId) {
            return this.containerToNodeMap.get(containerId);
        }

        LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID tezTaskAttemptID) {
            return this.attemptToNodeMap.get(tezTaskAttemptID);
        }

        ContainerId getContainerIdForAttempt(TezTaskAttemptID tezTaskAttemptID) {
            BiMap inverse;
            ContainerId containerId;
            LlapNodeId nodeIdForTaskAttempt = getNodeIdForTaskAttempt(tezTaskAttemptID);
            if (nodeIdForTaskAttempt == null || (inverse = this.nodeMap.get(nodeIdForTaskAttempt).inverse()) == null) {
                return null;
            }
            synchronized (inverse) {
                containerId = (ContainerId) inverse.get(tezTaskAttemptID);
            }
            return containerId;
        }

        TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) {
            BiMap<ContainerId, TezTaskAttemptID> biMap;
            TezTaskAttemptID tezTaskAttemptID;
            LlapNodeId nodeIdForContainer = getNodeIdForContainer(containerId);
            if (nodeIdForContainer == null || (biMap = this.nodeMap.get(nodeIdForContainer)) == null) {
                return null;
            }
            synchronized (biMap) {
                tezTaskAttemptID = (TezTaskAttemptID) biMap.get(containerId);
            }
            return tezTaskAttemptID;
        }

        void unregisterContainer(ContainerId containerId) {
            LlapNodeId remove = this.containerToNodeMap.remove(containerId);
            if (remove == null) {
                return;
            }
            BiMap<ContainerId, TezTaskAttemptID> biMap = this.nodeMap.get(remove);
            TezTaskAttemptID tezTaskAttemptID = null;
            if (biMap != null) {
                synchronized (biMap) {
                    tezTaskAttemptID = (TezTaskAttemptID) biMap.remove(containerId);
                }
                if (biMap.isEmpty()) {
                    this.nodeMap.remove(remove);
                }
            }
            if (tezTaskAttemptID != null) {
                this.attemptToNodeMap.remove(tezTaskAttemptID);
                this.uniqueNodeMap.remove(tezTaskAttemptID);
            }
        }

        BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
            return this.nodeMap.get(llapNodeId);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator$LlapTaskUmbilicalProtocolImpl.class */
    protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol {
        private final TezTaskUmbilicalProtocol tezUmbilical;

        public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) {
            this.tezUmbilical = tezTaskUmbilicalProtocol;
        }

        public boolean canCommit(TezTaskAttemptID tezTaskAttemptID) throws IOException {
            return this.tezUmbilical.canCommit(tezTaskAttemptID);
        }

        public TezHeartbeatResponse heartbeat(TezHeartbeatRequest tezHeartbeatRequest) throws IOException, TezException {
            return this.tezUmbilical.heartbeat(tezHeartbeatRequest);
        }

        public void nodeHeartbeat(Text text, Text text2, int i, LlapTaskUmbilicalProtocol.TezAttemptArray tezAttemptArray, LlapTaskUmbilicalProtocol.BooleanArray booleanArray) throws IOException {
            if (LlapTaskCommunicator.LOG.isDebugEnabled()) {
                LlapTaskCommunicator.LOG.debug("Received heartbeat from [" + text + ":" + i + " (" + text2 + ")]");
            }
            LlapTaskCommunicator.this.nodePinged(text.toString(), text2.toString(), i, tezAttemptArray, booleanArray);
        }

        public void taskKilled(TezTaskAttemptID tezTaskAttemptID) throws IOException {
            LlapTaskCommunicator.this.getContext().taskKilled(tezTaskAttemptID, TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
            LlapTaskCommunicator.this.entityTracker.unregisterTaskAttempt(tezTaskAttemptID);
        }

        public long getProtocolVersion(String str, long j) throws IOException {
            return 1L;
        }

        public ProtocolSignature getProtocolSignature(String str, long j, int i) throws IOException {
            return ProtocolSignature.getProtocolSignature(this, str, j, i);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator$OperationCallback.class */
    public interface OperationCallback<ResultType, CtxType> {
        void setDone(CtxType ctxtype, ResultType resulttype);

        void setError(CtxType ctxtype, Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator$PingingNodeInfo.class */
    public static class PingingNodeInfo {
        final AtomicLong logTimestamp;
        final AtomicInteger pingCount = new AtomicInteger(1);

        PingingNodeInfo(long j) {
            this.logTimestamp = new AtomicLong(j);
        }
    }

    public LlapTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        super(taskCommunicatorContext);
        this.entityTracker = new EntityTracker();
        this.nodesForQuery = new HashSet();
        this.knownNodeMap = new ConcurrentHashMap();
        this.pingedNodeMap = new ConcurrentHashMap();
        this.nodeNotFoundLogTime = new AtomicLong(0L);
        taskCommunicatorContext.getAMCredentials();
        this.serviceRegistry = LlapRegistryService.getClient(this.conf);
        this.umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
        this.user = System.getenv(ApplicationConstants.Environment.USER.name());
        this.credentialMap = new ConcurrentHashMap();
        this.sourceStateTracker = new SourceStateTracker(getContext(), this);
        synchronized (pluginInitLock) {
            LlapTaskSchedulerService llapTaskSchedulerService = LlapTaskSchedulerService.instance;
            if (llapTaskSchedulerService != null) {
                llapTaskSchedulerService.setTaskCommunicator(this);
                setScheduler(llapTaskSchedulerService);
                LlapTaskSchedulerService.instance = null;
            } else {
                instance = this;
            }
        }
    }

    private Token<LlapTokenIdentifier> getLlapToken() {
        Token<LlapTokenIdentifier> token = null;
        Credentials aMCredentials = getContext().getAMCredentials();
        if (aMCredentials != null) {
            token = aMCredentials.getToken(LlapTokenIdentifier.KIND_NAME);
        }
        Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled());
        if (token != null) {
            LOG.info("Task communicator with a token {}", token);
        }
        return token;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setScheduler(LlapTaskSchedulerService llapTaskSchedulerService) {
        this.scheduler = llapTaskSchedulerService;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processSendError(Throwable th) {
        Throwable th2 = th;
        while (true) {
            Throwable th3 = th2;
            if (th3 == null) {
                return;
            }
            if (isInvalidTokenError(th3)) {
                handleInvalidToken();
                return;
            }
            th2 = th3.getCause();
        }
    }

    private boolean isInvalidTokenError(Throwable th) {
        LOG.debug("Checking for invalid token error, cause: {}, cause.getCause(): {}", th, th.getCause());
        return ((th instanceof SecretManager.InvalidToken) && th.getMessage() != null) || ((th instanceof RemoteException) && th.getCause() == null && th.getMessage() != null && th.getMessage().contains(LLAP_TOKEN_NAME) && (th.getMessage().contains("InvalidToken") || th.getMessage().contains("can't be found in cache")));
    }

    public void initialize() throws Exception {
        super.initialize();
        Configuration conf = getConf();
        int intVar = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
        this.communicator = createLlapProtocolClientProxy(intVar, conf);
        this.deleteDelayOnDagComplete = HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
        LOG.info("Running LlapTaskCommunicator with fileCleanupDelay=" + this.deleteDelayOnDagComplete + ", numCommunicatorThreads=" + intVar);
        this.communicator.init(conf);
        this.timelineServerUri = WebAppUtils.getURLWithScheme(WebAppUtils.getHttpSchemePrefix(conf), WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
    }

    public void start() {
        super.start();
        this.communicator.start();
    }

    public void shutdown() {
        super.shutdown();
        if (this.communicator != null) {
            this.communicator.stop();
        }
    }

    protected void startRpcServer() {
        Configuration conf = getConf();
        try {
            JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
            jobTokenSecretManager.addTokenForJob(this.tokenIdentifier, this.sessionToken);
            int intVar = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT);
            int intVar2 = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT);
            if (intVar2 <= 0) {
                intVar2 = 0;
            }
            this.server = new RPC.Builder(conf).setProtocol(LlapTaskUmbilicalProtocol.class).setBindAddress("0.0.0.0").setPort(intVar2).setInstance(this.umbilical).setNumHandlers(intVar).setSecretManager(jobTokenSecretManager).build();
            if (conf.getBoolean("hadoop.security.authorization", false)) {
                this.server.refreshServiceAcl(conf, new LlapUmbilicalPolicyProvider());
            }
            this.server.start();
            this.address = NetUtils.getConnectAddress(this.server);
            this.amHost = LlapUtil.getAmHostNameFromAddress(this.address, conf);
            LOG.info("Started LlapUmbilical: " + this.umbilical.getClass().getName() + " at address: " + this.address + " with numHandlers=" + intVar + " using the host name " + this.amHost);
        } catch (IOException e) {
            throw new TezUncheckedException(e);
        }
    }

    @VisibleForTesting
    protected LlapProtocolClientProxy createLlapProtocolClientProxy(int i, Configuration configuration) {
        return new LlapProtocolClientProxy(i, configuration, getLlapToken());
    }

    public void registerRunningContainer(ContainerId containerId, String str, int i) {
        super.registerRunningContainer(containerId, str, i);
        this.entityTracker.registerContainer(containerId, str, i);
    }

    public void registerContainerEnd(ContainerId containerId, ContainerEndReason containerEndReason, String str) {
        super.registerContainerEnd(containerId, containerEndReason, str);
        if (containerEndReason == ContainerEndReason.INTERNAL_PREEMPTION) {
            LOG.info("Processing containerEnd for container {} caused by internal preemption", containerId);
            TezTaskAttemptID taskAttemptIdForContainer = this.entityTracker.getTaskAttemptIdForContainer(containerId);
            if (taskAttemptIdForContainer != null) {
                sendTaskTerminated(taskAttemptIdForContainer, true);
            }
        }
        this.entityTracker.unregisterContainer(containerId);
    }

    public boolean registerDag(final LlapTaskSchedulerService.NodeInfo nodeInfo, final OperationCallback<LlapDaemonProtocolProtos.QueryIdentifierProto, Void> operationCallback) {
        LlapDaemonProtocolProtos.RegisterDagRequestProto.Builder newBuilder = LlapDaemonProtocolProtos.RegisterDagRequestProto.newBuilder();
        if (this.currentQueryIdentifierProto == null) {
            return false;
        }
        try {
            this.communicator.registerDag(newBuilder.setQueryIdentifier(this.currentQueryIdentifierProto).setUser(this.user).setCredentialsBinary(getCredentials(getContext().getCurrentDagInfo().getCredentials())).build(), nodeInfo.getHost(), nodeInfo.getRpcPort(), new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.RegisterDagResponseProto>() { // from class: org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.1
                public void setResponse(LlapDaemonProtocolProtos.RegisterDagResponseProto registerDagResponseProto) {
                    operationCallback.setDone(null, LlapTaskCommunicator.this.currentQueryIdentifierProto);
                }

                public void indicateError(Throwable th) {
                    LlapTaskCommunicator.LOG.info("Error registering dag with appId=" + LlapTaskCommunicator.this.currentQueryIdentifierProto.getApplicationIdString() + " dagId=" + LlapTaskCommunicator.this.currentQueryIdentifierProto.getDagIndex() + " to node " + nodeInfo.getHost());
                    LlapTaskCommunicator.this.processSendError(th);
                    operationCallback.setError(null, th);
                }
            });
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public <T> void startUpdateGuaranteed(final TezTaskAttemptID tezTaskAttemptID, LlapTaskSchedulerService.NodeInfo nodeInfo, boolean z, final OperationCallback<Boolean, T> operationCallback, final T t) {
        LlapNodeId nodeIdForTaskAttempt = this.entityTracker.getNodeIdForTaskAttempt(tezTaskAttemptID);
        if (nodeIdForTaskAttempt == null) {
            if (nodeInfo != null) {
                nodeIdForTaskAttempt = LlapNodeId.getInstance(nodeInfo.getHost(), nodeInfo.getRpcPort());
            }
            LOG.warn("Untracked node for " + tezTaskAttemptID + "; NodeInfo points to " + nodeIdForTaskAttempt);
            if (nodeIdForTaskAttempt == null) {
                operationCallback.setDone(t, false);
                return;
            }
        }
        this.communicator.sendUpdateFragment(LlapDaemonProtocolProtos.UpdateFragmentRequestProto.newBuilder().setIsGuaranteed(z).setFragmentIdentifierString(tezTaskAttemptID.toString()).setQueryIdentifier(constructQueryIdentifierProto(tezTaskAttemptID.getDAGID().getId())).build(), nodeIdForTaskAttempt.getHostname(), nodeIdForTaskAttempt.getPort(), new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.UpdateFragmentResponseProto>() { // from class: org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.2
            public void setResponse(LlapDaemonProtocolProtos.UpdateFragmentResponseProto updateFragmentResponseProto) {
                operationCallback.setDone(t, Boolean.valueOf(updateFragmentResponseProto.getResult()));
            }

            public void indicateError(Throwable th) {
                LlapTaskCommunicator.LOG.warn("Failed to send update fragment request for {}", tezTaskAttemptID.toString());
                LlapTaskCommunicator.this.processSendError(th);
                operationCallback.setError(t, th);
            }
        });
    }

    public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, Map<String, LocalResource> map, Credentials credentials, boolean z, int i) {
        String extractQueryId;
        String str;
        int i2;
        super.registerRunningTaskAttempt(containerId, taskSpec, map, credentials, z, i);
        int id = taskSpec.getTaskAttemptID().getDAGID().getId();
        if (this.currentQueryIdentifierProto == null || id != this.currentQueryIdentifierProto.getDagIndex()) {
            String extractQueryIdFromContext = extractQueryIdFromContext();
            if (extractQueryIdFromContext == null) {
                try {
                    extractQueryId = extractQueryId(taskSpec);
                } catch (IOException e) {
                    throw new RuntimeException("Failed to extract query id from task spec: " + taskSpec, e);
                }
            } else {
                extractQueryId = extractQueryIdFromContext;
            }
            String str2 = extractQueryId;
            Preconditions.checkNotNull(str2, "Unexpected null query id");
            resetCurrentDag(id, str2);
        }
        TezTaskCommunicatorImpl.ContainerInfo containerInfo = getContainerInfo(containerId);
        if (containerInfo == null) {
            throw new RuntimeException("ContainerInfo not found for container: " + containerId + ", while trying to launch task: " + taskSpec.getTaskAttemptID());
        }
        synchronized (containerInfo) {
            str = containerInfo.host;
            i2 = containerInfo.port;
        }
        LlapNodeId llapNodeId = LlapNodeId.getInstance(str, i2);
        registerKnownNode(llapNodeId);
        this.entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), str, i2);
        this.nodesForQuery.add(llapNodeId);
        this.sourceStateTracker.registerTaskForStateUpdates(str, i2, taskSpec.getInputs());
        try {
            try {
                LlapDaemonProtocolProtos.SubmitWorkRequestProto constructSubmitWorkRequest = constructSubmitWorkRequest(containerId, taskSpec, this.sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), i), this.currentHiveQueryId);
                getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
                this.communicator.sendSubmitWork(constructSubmitWorkRequest, str, i2, new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() { // from class: org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.3
                    public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWorkResponseProto) {
                        if (!submitWorkResponseProto.hasSubmissionState()) {
                            throw new RuntimeException("SubmissionState in response is expected!");
                        }
                        if (submitWorkResponseProto.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
                            LlapTaskCommunicator.LOG.info("Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Service Busy");
                            LlapTaskCommunicator.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
                        } else {
                            if (submitWorkResponseProto.hasUniqueNodeId()) {
                                LlapTaskCommunicator.this.entityTracker.registerTaskSubmittedToNode(taskSpec.getTaskAttemptID(), submitWorkResponseProto.getUniqueNodeId());
                            }
                            LlapTaskCommunicator.LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
                            LlapTaskCommunicator.this.scheduler.notifyStarted(taskSpec.getTaskAttemptID());
                        }
                    }

                    public void indicateError(Throwable th) {
                        if (th instanceof ServiceException) {
                            th = ((ServiceException) th).getCause();
                        }
                        if (th instanceof RemoteException) {
                            LlapTaskCommunicator.LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, th);
                            LlapTaskCommunicator.this.processSendError(th);
                            LlapTaskCommunicator.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, th.toString());
                        } else if (th instanceof IOException) {
                            LlapTaskCommunicator.LOG.info("Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Communication Error");
                            LlapTaskCommunicator.this.processSendError(th);
                            LlapTaskCommunicator.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
                        } else {
                            LlapTaskCommunicator.LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, th);
                            LlapTaskCommunicator.this.processSendError(th);
                            LlapTaskCommunicator.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, th.getMessage());
                        }
                    }
                });
            } catch (IOException e2) {
                throw new RuntimeException("Failed to construct request", e2);
            }
        } catch (Exception e3) {
            LOG.error("Error while trying to get runtimeFragmentInfo for fragmentId={}, containerId={}, currentQI={}, currentQueryId={}", new Object[]{taskSpec.getTaskAttemptID(), containerId, this.currentQueryIdentifierProto, this.currentHiveQueryId, e3});
            if (!(e3 instanceof RuntimeException)) {
                throw new RuntimeException(e3);
            }
            throw ((RuntimeException) e3);
        }
    }

    private void handleInvalidToken() {
        this.communicator.refreshToken();
    }

    public void unregisterRunningTaskAttempt(TezTaskAttemptID tezTaskAttemptID, TaskAttemptEndReason taskAttemptEndReason, String str) {
        super.unregisterRunningTaskAttempt(tezTaskAttemptID, taskAttemptEndReason, str);
        if (taskAttemptEndReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) {
            LOG.info("Processing taskEnd for task {} caused by internal preemption", tezTaskAttemptID);
            sendTaskTerminated(tezTaskAttemptID, false);
        }
        this.entityTracker.unregisterTaskAttempt(tezTaskAttemptID);
    }

    private void sendTaskTerminated(final TezTaskAttemptID tezTaskAttemptID, boolean z) {
        LOG.info("Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", tezTaskAttemptID.toString(), z ? "containerEnd" : "taskEnd");
        LlapNodeId nodeIdForTaskAttempt = this.entityTracker.getNodeIdForTaskAttempt(tezTaskAttemptID);
        if (nodeIdForTaskAttempt != null) {
            this.communicator.sendTerminateFragment(LlapDaemonProtocolProtos.TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(constructQueryIdentifierProto(tezTaskAttemptID.getDAGID().getId())).setFragmentIdentifierString(tezTaskAttemptID.toString()).build(), nodeIdForTaskAttempt.getHostname(), nodeIdForTaskAttempt.getPort(), new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.TerminateFragmentResponseProto>() { // from class: org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.4
                public void setResponse(LlapDaemonProtocolProtos.TerminateFragmentResponseProto terminateFragmentResponseProto) {
                }

                public void indicateError(Throwable th) {
                    LlapTaskCommunicator.LOG.warn("Failed to send terminate fragment request for {}", tezTaskAttemptID.toString());
                    LlapTaskCommunicator.this.processSendError(th);
                }
            });
        } else {
            LOG.info("Not sending terminate request for fragment {} since it's node is not known. Already unregistered", tezTaskAttemptID.toString());
        }
    }

    public void dagComplete(final int i) {
        LlapDaemonProtocolProtos.QueryCompleteRequestProto build = LlapDaemonProtocolProtos.QueryCompleteRequestProto.newBuilder().setQueryIdentifier(constructQueryIdentifierProto(i)).setDeleteDelay(this.deleteDelayOnDagComplete).build();
        for (final LlapNodeId llapNodeId : this.nodesForQuery) {
            LOG.info("Sending dagComplete message for {}, to {}", Integer.valueOf(i), llapNodeId);
            this.communicator.sendQueryComplete(build, llapNodeId.getHostname(), llapNodeId.getPort(), new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() { // from class: org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.5
                public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto queryCompleteResponseProto) {
                }

                public void indicateError(Throwable th) {
                    LlapTaskCommunicator.LOG.warn("Failed to indicate dag complete dagId={} to node {}", Integer.valueOf(i), llapNodeId);
                    LlapTaskCommunicator.this.processSendError(th);
                }
            });
        }
        this.nodesForQuery.clear();
    }

    public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        this.sourceStateTracker.sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState());
    }

    public void sendStateUpdate(final LlapNodeId llapNodeId, final LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequestProto) {
        this.communicator.sendSourceStateUpdate(sourceStateUpdatedRequestProto, llapNodeId, new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto>() { // from class: org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.6
            public void setResponse(LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdatedResponseProto) {
            }

            public void indicateError(Throwable th) {
                LlapTaskCommunicator.LOG.error("Failed to send state update to node: {}, Killing all attempts running on node. Attempted StateUpdate={}", new Object[]{llapNodeId, sourceStateUpdatedRequestProto, th});
                LlapTaskCommunicator.this.processSendError(th);
                BiMap<ContainerId, TezTaskAttemptID> containerAttemptMapForNode = LlapTaskCommunicator.this.entityTracker.getContainerAttemptMapForNode(llapNodeId);
                if (containerAttemptMapForNode != null) {
                    synchronized (containerAttemptMapForNode) {
                        for (Map.Entry entry : containerAttemptMapForNode.entrySet()) {
                            LlapTaskCommunicator.LOG.info("Sending a kill for attempt {}, due to a communication failure while sending a finishable state update", entry.getValue());
                            LlapTaskCommunicator.this.getContext().taskKilled((TezTaskAttemptID) entry.getValue(), TaskAttemptEndReason.NODE_FAILED, "Failed to send finishable state update to node " + llapNodeId);
                        }
                    }
                }
            }
        });
    }

    public String getInProgressLogsUrl(TezTaskAttemptID tezTaskAttemptID, NodeId nodeId) {
        return constructLogUrl(tezTaskAttemptID, nodeId, false);
    }

    public String getCompletedLogsUrl(TezTaskAttemptID tezTaskAttemptID, NodeId nodeId) {
        return constructLogUrl(tezTaskAttemptID, nodeId, true);
    }

    private String constructLogUrl(TezTaskAttemptID tezTaskAttemptID, NodeId nodeId, boolean z) {
        if (this.timelineServerUri == null || nodeId == null) {
            return null;
        }
        try {
            Set byHost = this.serviceRegistry.getInstances().getByHost(nodeId.getHost());
            if (byHost == null) {
                return null;
            }
            LlapServiceInstance llapServiceInstance = null;
            Iterator it = byHost.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                LlapServiceInstance llapServiceInstance2 = (LlapServiceInstance) it.next();
                if (llapServiceInstance2.getRpcPort() == nodeId.getPort()) {
                    llapServiceInstance = llapServiceInstance2;
                    break;
                }
            }
            if (llapServiceInstance == null) {
                return null;
            }
            String str = (String) llapServiceInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
            String str2 = (String) llapServiceInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_NM_ADDRESS.varname);
            if (StringUtils.isBlank(str) || StringUtils.isBlank(str2)) {
                return null;
            }
            return constructLlapLogUrl(tezTaskAttemptID, str, z, str2);
        } catch (IOException e) {
            LOG.warn("Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", nodeId, e.getMessage());
            return null;
        }
    }

    private String constructLlapLogUrl(TezTaskAttemptID tezTaskAttemptID, String str, boolean z, String str2) {
        String tezDAGID = tezTaskAttemptID.getDAGID().toString();
        Joiner joiner = JOINER;
        String str3 = this.currentHiveQueryId;
        Object[] objArr = new Object[5];
        objArr[0] = tezDAGID;
        objArr[1] = ".log";
        objArr[2] = z ? ".done" : "";
        objArr[3] = "?nm.id=";
        objArr[4] = str2;
        return PATH_JOINER.join(this.timelineServerUri, "ws", new Object[]{"v1", "applicationhistory", "containers", str, "logs", joiner.join(str3, "-", objArr)});
    }

    public void registerKnownNode(LlapNodeId llapNodeId) {
        if (this.knownNodeMap.putIfAbsent(llapNodeId, Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS))) == null && LOG.isInfoEnabled()) {
            LOG.info("Added new known node: {}", llapNodeId);
        }
    }

    public void registerPingingNode(LlapNodeId llapNodeId, String str) {
        long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
        PingingNodeInfo pingingNodeInfo = new PingingNodeInfo(convert);
        PingingNodeInfo put = this.pingedNodeMap.put(llapNodeId, pingingNodeInfo);
        if (put != null) {
            put.pingCount.incrementAndGet();
        } else if (LOG.isInfoEnabled()) {
            LOG.info("Added new pinging node: [{}] with uniqueId: {}", llapNodeId, str);
        }
        if (this.knownNodeMap.containsKey(llapNodeId)) {
            return;
        }
        if (put == null) {
            LOG.warn("Received ping from unknownNode: [{}], count={}", llapNodeId, Integer.valueOf(pingingNodeInfo.pingCount.get()));
        } else if (convert > put.logTimestamp.get() + 5000) {
            LOG.warn("Received ping from unknownNode: [{}], count={}", llapNodeId, Integer.valueOf(put.pingCount.get()));
            put.logTimestamp.set(convert);
        }
    }

    void nodePinged(String str, String str2, int i, LlapTaskUmbilicalProtocol.TezAttemptArray tezAttemptArray, LlapTaskUmbilicalProtocol.BooleanArray booleanArray) {
        LlapNodeId llapNodeId = LlapNodeId.getInstance(str, i);
        registerPingingNode(llapNodeId, str2);
        BiMap<ContainerId, TezTaskAttemptID> containerAttemptMapForNode = this.entityTracker.getContainerAttemptMapForNode(llapNodeId);
        if (containerAttemptMapForNode == null) {
            long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
            if (convert > this.nodeNotFoundLogTime.get() + 5000) {
                LOG.warn("Received ping from node without any registered tasks or containers: " + str + ":" + i + ". Could be caused by pre-emption by the AM, or a mismatched hostname. Enable debug logging for mismatched host names");
                this.nodeNotFoundLogTime.set(convert);
                return;
            }
            return;
        }
        HashSet<TezTaskAttemptID> hashSet = new HashSet();
        synchronized (containerAttemptMapForNode) {
            for (int i2 = 0; i2 < tezAttemptArray.get().length; i2++) {
                boolean z = booleanArray != null ? booleanArray.get()[i2].get() : false;
                TezTaskAttemptID tezTaskAttemptID = tezAttemptArray.get()[i2];
                if (containerAttemptMapForNode.containsValue(tezTaskAttemptID)) {
                    String uniqueNodeId = this.entityTracker.getUniqueNodeId(tezTaskAttemptID);
                    if (uniqueNodeId == null || !uniqueNodeId.equals(str2)) {
                        hashSet.add(tezTaskAttemptID);
                    } else {
                        getContext().taskAlive(tezTaskAttemptID);
                        this.scheduler.taskInfoUpdated(tezTaskAttemptID, z);
                        getContext().containerAlive((ContainerId) containerAttemptMapForNode.inverse().get(tezTaskAttemptID));
                    }
                }
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        LOG.info("The tasks we expected to be on the node are not there: " + hashSet);
        for (TezTaskAttemptID tezTaskAttemptID2 : hashSet) {
            LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but registered with different unique ID", tezTaskAttemptID2);
            getContext().taskKilled(tezTaskAttemptID2, TaskAttemptEndReason.NODE_FAILED, "Node with same host and port but with new unique ID pinged");
        }
    }

    private void resetCurrentDag(int i, String str) {
        this.currentQueryIdentifierProto = constructQueryIdentifierProto(i);
        this.currentHiveQueryId = str;
        this.sourceStateTracker.resetState(this.currentQueryIdentifierProto);
        this.nodesForQuery.clear();
        LOG.info("CurrentDagId set to: " + i + ", name=" + getContext().getCurrentDagInfo().getName() + ", queryId=" + str);
    }

    private String extractQueryId(TaskSpec taskSpec) throws IOException {
        return HiveConf.getVar(TezUtils.createConfFromUserPayload(taskSpec.getProcessorDescriptor().getUserPayload()), HiveConf.ConfVars.HIVEQUERYID);
    }

    private String extractQueryIdFromContext() {
        DAG currentDagInfo = getContext().getCurrentDagInfo();
        if (currentDagInfo instanceof DAG) {
            return currentDagInfo.getConf().get(HiveConf.ConfVars.HIVEQUERYID.varname);
        }
        return null;
    }

    private LlapDaemonProtocolProtos.SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec, LlapDaemonProtocolProtos.FragmentRuntimeInfo fragmentRuntimeInfo, String str) throws IOException {
        LlapDaemonProtocolProtos.SubmitWorkRequestProto.Builder newBuilder = LlapDaemonProtocolProtos.SubmitWorkRequestProto.newBuilder();
        newBuilder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
        newBuilder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
        newBuilder.setContainerIdString(containerId.toString());
        newBuilder.setAmHost(getAmHostString());
        newBuilder.setAmPort(getAddress().getPort());
        Preconditions.checkState(this.currentQueryIdentifierProto.getDagIndex() == taskSpec.getTaskAttemptID().getDAGID().getId());
        newBuilder.setCredentialsBinary(getCredentials(getContext().getCurrentDagInfo().getCredentials()));
        newBuilder.setWorkSpec(LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex(Converters.constructSignableVertexSpec(taskSpec, this.currentQueryIdentifierProto, getTokenIdentifier(), this.user, str)).build());
        newBuilder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
        newBuilder.setIsGuaranteed(ContainerFactory.isContainerInitializedAsGuaranteed(containerId));
        return newBuilder.build();
    }

    private ByteString getCredentials(Credentials credentials) throws IOException {
        ByteBuffer duplicate;
        ByteBuffer byteBuffer = this.credentialMap.get(this.currentQueryIdentifierProto);
        if (byteBuffer == null) {
            duplicate = LlapTezUtils.serializeCredentials(credentials);
            this.credentialMap.putIfAbsent(this.currentQueryIdentifierProto, duplicate.duplicate());
        } else {
            duplicate = byteBuffer.duplicate();
        }
        return ByteString.copyFrom(duplicate);
    }

    private LlapDaemonProtocolProtos.QueryIdentifierProto constructQueryIdentifierProto(int i) {
        return LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder().setApplicationIdString(getContext().getCurrentAppIdentifier()).setDagIndex(i).setAppAttemptNumber(getContext().getApplicationAttemptId().getAttemptId()).build();
    }

    public String getAmHostString() {
        return this.amHost;
    }
}
