package org.apache.zeppelin.socket;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.common.Message;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.notebook.AuthorizationService;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.NotebookImportDeserializer;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.util.WatcherSecurityKey;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/socket/ConnectionManager.class */
public class ConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
    private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").registerTypeAdapter(Date.class, new NotebookImportDeserializer()).setPrettyPrinting().registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
    final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue();
    final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap();
    final Map<String, Queue<NotebookSocket>> userSocketMap = new HashMap();
    final Queue<NotebookSocket> watcherSockets = new ConcurrentLinkedQueue();
    private final HashSet<String> collaborativeModeList = new HashSet<>();
    private final Boolean collaborativeModeEnable = ZeppelinConfiguration.create().isZeppelinNotebookCollaborativeModeEnable();
    private final AuthorizationService authorizationService;

    /* loaded from: input_file:org/apache/zeppelin/socket/ConnectionManager$UserIterator.class */
    public interface UserIterator {
        void handleUser(String str, Set<String> set);
    }

    @Inject
    public ConnectionManager(AuthorizationService authorizationService) {
        this.authorizationService = authorizationService;
    }

    public void addConnection(NotebookSocket notebookSocket) {
        this.connectedSockets.add(notebookSocket);
    }

    public void removeConnection(NotebookSocket notebookSocket) {
        this.connectedSockets.remove(notebookSocket);
    }

    public void addNoteConnection(String str, NotebookSocket notebookSocket) {
        LOGGER.debug("Add connection {} to note: {}", notebookSocket, str);
        synchronized (this.noteSocketMap) {
            removeConnectionFromAllNote(notebookSocket);
            List<NotebookSocket> computeIfAbsent = this.noteSocketMap.computeIfAbsent(str, str2 -> {
                return new LinkedList();
            });
            if (!computeIfAbsent.contains(notebookSocket)) {
                computeIfAbsent.add(notebookSocket);
            }
            checkCollaborativeStatus(str, computeIfAbsent);
        }
    }

    public void removeNoteConnection(String str) {
        synchronized (this.noteSocketMap) {
            this.noteSocketMap.remove(str);
        }
    }

    public void removeNoteConnection(String str, NotebookSocket notebookSocket) {
        LOGGER.debug("Remove connection {} from note: {}", notebookSocket, str);
        synchronized (this.noteSocketMap) {
            List<NotebookSocket> list = this.noteSocketMap.get(str);
            if (list != null) {
                list.remove(notebookSocket);
            }
            checkCollaborativeStatus(str, list);
        }
    }

    public void addUserConnection(String str, NotebookSocket notebookSocket) {
        LOGGER.debug("Add user connection {} for user: {}", notebookSocket, str);
        notebookSocket.setUser(str);
        if (this.userSocketMap.containsKey(str)) {
            this.userSocketMap.get(str).add(notebookSocket);
            return;
        }
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(notebookSocket);
        this.userSocketMap.put(str, concurrentLinkedQueue);
    }

    public void removeUserConnection(String str, NotebookSocket notebookSocket) {
        LOGGER.debug("Remove user connection {} for user: {}", notebookSocket, str);
        if (this.userSocketMap.containsKey(str)) {
            this.userSocketMap.get(str).remove(notebookSocket);
        } else {
            LOGGER.warn("Closing connection that is absent in user connections");
        }
    }

    public String getAssociatedNoteId(NotebookSocket notebookSocket) {
        String str = null;
        synchronized (this.noteSocketMap) {
            for (String str2 : this.noteSocketMap.keySet()) {
                if (this.noteSocketMap.get(str2).contains(notebookSocket)) {
                    str = str2;
                }
            }
        }
        return str;
    }

    public void removeConnectionFromAllNote(NotebookSocket notebookSocket) {
        synchronized (this.noteSocketMap) {
            Iterator<String> it = this.noteSocketMap.keySet().iterator();
            while (it.hasNext()) {
                removeConnectionFromNote(it.next(), notebookSocket);
            }
        }
    }

    private void removeConnectionFromNote(String str, NotebookSocket notebookSocket) {
        LOGGER.debug("Remove connection {} from note: {}", notebookSocket, str);
        synchronized (this.noteSocketMap) {
            List<NotebookSocket> list = this.noteSocketMap.get(str);
            if (list != null) {
                list.remove(notebookSocket);
            }
            checkCollaborativeStatus(str, list);
        }
    }

    private void checkCollaborativeStatus(String str, List<NotebookSocket> list) {
        if (this.collaborativeModeEnable.booleanValue()) {
            boolean z = list.size() > 1;
            if (z) {
                this.collaborativeModeList.add(str);
            } else {
                this.collaborativeModeList.remove(str);
            }
            Message message = new Message(Message.OP.COLLABORATIVE_MODE_STATUS);
            message.put("status", Boolean.valueOf(z));
            if (z) {
                HashSet hashSet = new HashSet();
                Iterator<NotebookSocket> it = list.iterator();
                while (it.hasNext()) {
                    hashSet.add(it.next().getUser());
                }
                message.put("users", hashSet);
            }
            broadcast(str, message);
        }
    }

    protected String serializeMessage(Message message) {
        return gson.toJson(message);
    }

    public void broadcast(Message message) {
        synchronized (this.connectedSockets) {
            Iterator<NotebookSocket> it = this.connectedSockets.iterator();
            while (it.hasNext()) {
                try {
                    it.next().send(serializeMessage(message));
                } catch (IOException | WebSocketException e) {
                    LOGGER.error("Send error: " + message, e);
                }
            }
        }
    }

    public void broadcast(String str, Message message) {
        Collections.emptyList();
        synchronized (this.noteSocketMap) {
            broadcastToWatchers(str, "", message);
            List<NotebookSocket> list = this.noteSocketMap.get(str);
            if (list == null || list.size() == 0) {
                return;
            }
            ArrayList arrayList = new ArrayList(list);
            LOGGER.debug("SEND >> " + message);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                try {
                    ((NotebookSocket) it.next()).send(serializeMessage(message));
                } catch (IOException | WebSocketException e) {
                    LOGGER.error("socket error", e);
                }
            }
        }
    }

    private void broadcastToWatchers(String str, String str2, Message message) {
        synchronized (this.watcherSockets) {
            Iterator<NotebookSocket> it = this.watcherSockets.iterator();
            while (it.hasNext()) {
                try {
                    it.next().send(WatcherMessage.builder(str).subject(str2).message(serializeMessage(message)).build().toJson());
                } catch (IOException | WebSocketException e) {
                    LOGGER.error("Cannot broadcast message to watcher", e);
                }
            }
        }
    }

    public void broadcastExcept(String str, Message message, NotebookSocket notebookSocket) {
        Collections.emptyList();
        synchronized (this.noteSocketMap) {
            broadcastToWatchers(str, "", message);
            List<NotebookSocket> list = this.noteSocketMap.get(str);
            if (list == null || list.size() == 0) {
                return;
            }
            ArrayList<NotebookSocket> arrayList = new ArrayList(list);
            LOGGER.debug("SEND >> " + message);
            for (NotebookSocket notebookSocket2 : arrayList) {
                if (!notebookSocket.equals(notebookSocket2)) {
                    try {
                        notebookSocket2.send(serializeMessage(message));
                    } catch (IOException | WebSocketException e) {
                        LOGGER.error("socket error", e);
                    }
                }
            }
        }
    }

    public void broadcastToAllConnections(String str) {
        broadcastToAllConnectionsExcept(null, str);
    }

    public void broadcastToAllConnectionsExcept(NotebookSocket notebookSocket, String str) {
        synchronized (this.connectedSockets) {
            for (NotebookSocket notebookSocket2 : this.connectedSockets) {
                if (notebookSocket == null || !notebookSocket.equals(notebookSocket2)) {
                    try {
                        notebookSocket2.send(str);
                    } catch (IOException | WebSocketException e) {
                        LOGGER.error("Cannot broadcast message to conn", e);
                    }
                }
            }
        }
    }

    public Set<String> getConnectedUsers() {
        HashSet hashSet = new HashSet();
        Iterator<NotebookSocket> it = this.connectedSockets.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getUser());
        }
        return hashSet;
    }

    public void multicastToUser(String str, Message message) {
        if (!this.userSocketMap.containsKey(str)) {
            LOGGER.warn("Multicasting to user {} that is not in connections map", str);
            return;
        }
        Iterator<NotebookSocket> it = this.userSocketMap.get(str).iterator();
        while (it.hasNext()) {
            unicast(message, it.next());
        }
    }

    public void unicast(Message message, NotebookSocket notebookSocket) {
        try {
            notebookSocket.send(serializeMessage(message));
        } catch (IOException | WebSocketException e) {
            LOGGER.error("socket error", e);
        }
        broadcastToWatchers("", "", message);
    }

    public void unicastParagraph(Note note, Paragraph paragraph, String str, String str2) {
        if (!note.isPersonalizedMode() || paragraph == null || str == null) {
            return;
        }
        if (!this.userSocketMap.containsKey(str)) {
            LOGGER.warn("Failed to send unicast. user {} that is not in connections map", str);
            return;
        }
        Iterator<NotebookSocket> it = this.userSocketMap.get(str).iterator();
        while (it.hasNext()) {
            unicast(new Message(Message.OP.PARAGRAPH).withMsgId(str2).put("paragraph", paragraph), it.next());
        }
    }

    public void forAllUsers(UserIterator userIterator) {
        for (String str : this.userSocketMap.keySet()) {
            Set<String> roles = this.authorizationService.getRoles(str);
            roles.add(str);
            userIterator.handleUser(str, roles);
        }
    }

    public void broadcastNoteListExcept(List<NoteInfo> list, AuthenticationInfo authenticationInfo) {
        for (String str : this.userSocketMap.keySet()) {
            if (!authenticationInfo.getUser().equals(str)) {
                this.authorizationService.getRoles(str).add(str);
                multicastToUser(str, new Message(Message.OP.NOTES_INFO).put("notes", list));
            }
        }
    }

    public void broadcastNote(Note note) {
        broadcast(note.getId(), new Message(Message.OP.NOTE).put("note", note));
    }

    public void broadcastParagraph(Note note, Paragraph paragraph) {
        broadcastNoteForms(note);
        if (note.isPersonalizedMode()) {
            broadcastParagraphs(paragraph.getUserParagraphMap());
        } else {
            broadcast(note.getId(), new Message(Message.OP.PARAGRAPH).put("paragraph", paragraph));
        }
    }

    public void broadcastParagraphs(Map<String, Paragraph> map) {
        if (null != map) {
            for (String str : map.keySet()) {
                multicastToUser(str, new Message(Message.OP.PARAGRAPH).put("paragraph", map.get(str)));
            }
        }
    }

    private void broadcastNewParagraph(Note note, Paragraph paragraph) {
        LOGGER.info("Broadcasting paragraph on run call instead of note.");
        broadcast(note.getId(), new Message(Message.OP.PARAGRAPH_ADDED).put("paragraph", paragraph).put("index", Integer.valueOf(note.getParagraphs().indexOf(paragraph))));
    }

    private void broadcastNoteForms(Note note) {
        GUI gui = new GUI();
        gui.setForms(note.getNoteForms());
        gui.setParams(note.getNoteParams());
        broadcast(note.getId(), new Message(Message.OP.SAVE_NOTE_FORMS).put("formsData", gui));
    }

    public void switchConnectionToWatcher(NotebookSocket notebookSocket) {
        if (!isSessionAllowedToSwitchToWatcher(notebookSocket)) {
            LOGGER.error("Cannot switch this client to watcher, invalid security key");
            return;
        }
        LOGGER.info("Going to add {} to watcher socket", notebookSocket);
        if (this.watcherSockets.contains(notebookSocket)) {
            LOGGER.info("connection already present in the watcher");
            return;
        }
        this.watcherSockets.add(notebookSocket);
        removeConnection(notebookSocket);
        removeConnectionFromAllNote(notebookSocket);
        removeUserConnection(notebookSocket.getUser(), notebookSocket);
    }

    private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket notebookSocket) {
        String header = notebookSocket.getRequest().getHeader("X-Watcher-Key");
        return !StringUtils.isBlank(header) && header.equals(WatcherSecurityKey.getKey());
    }
}
