package org.apache.zeppelin.ksql;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.ksql.BasicKSQLHttpClient;

/* loaded from: input_file:org/apache/zeppelin/ksql/KSQLRestService.class */
public class KSQLRestService {
    private static final String KSQL_ENDPOINT = "%s/ksql";
    private static final String QUERY_ENDPOINT = "%s/query";
    private static final String KSQL_V1_CONTENT_TYPE = "application/vnd.ksql.v1+json; charset=utf-8";
    private static final String KSQL_URL = "ksql.url";
    private final String ksqlUrl;
    private final String queryUrl;
    private final String baseUrl;
    private final Map<String, String> streamsProperties;
    private final Map<String, BasicKSQLHttpClient> clientCache = new ConcurrentHashMap();
    private static final List<String> KSQL_COMMON_FIELDS = Arrays.asList("statementText", "warnings", "@type");
    private static final ObjectMapper json = new ObjectMapper();

    public KSQLRestService(Map<String, String> map) {
        this.baseUrl = ((String) Objects.requireNonNull(map.get(KSQL_URL), KSQL_URL)).toString();
        this.ksqlUrl = String.format(KSQL_ENDPOINT, this.baseUrl);
        this.queryUrl = String.format(QUERY_ENDPOINT, this.baseUrl);
        this.streamsProperties = (Map) map.entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith("ksql.") && !((String) entry.getKey()).equals(KSQL_URL);
        }).collect(Collectors.toMap(entry2 -> {
            return (String) entry2.getKey();
        }, entry3 -> {
            return (String) entry3.getValue();
        }));
    }

    public void executeQuery(String str, String str2, Consumer<KSQLResponse> consumer) throws IOException {
        KSQLRequest kSQLRequest = new KSQLRequest(str2, this.streamsProperties);
        if (isSelect(kSQLRequest)) {
            executeSelect(str, consumer, kSQLRequest);
        } else if (isPrint(kSQLRequest)) {
            executePrint(str, consumer, kSQLRequest);
        } else {
            executeKSQL(str, consumer, kSQLRequest);
        }
    }

    private void executeKSQL(String str, Consumer<KSQLResponse> consumer, KSQLRequest kSQLRequest) throws IOException {
        BasicKSQLHttpClient createNewClient = createNewClient(str, kSQLRequest, this.ksqlUrl);
        Throwable th = null;
        try {
            try {
                List list = (List) json.readValue(createNewClient.connect(), List.class);
                Stream map = list.stream().map(map2 -> {
                    return excludeKSQLCommonFields(map2);
                }).flatMap(map3 -> {
                    return map3.entrySet().stream().filter(entry -> {
                        return entry.getValue() instanceof List;
                    }).flatMap(entry2 -> {
                        return ((List) entry2.getValue()).stream();
                    });
                }).map(KSQLResponse::new);
                consumer.getClass();
                map.forEach((v1) -> {
                    r1.accept(v1);
                });
                Stream map4 = list.stream().map(map5 -> {
                    return excludeKSQLCommonFields(map5);
                }).flatMap(map6 -> {
                    return map6.entrySet().stream().filter(entry -> {
                        return entry.getValue() instanceof Map;
                    }).map(entry2 -> {
                        return (Map) entry2.getValue();
                    });
                }).map(KSQLResponse::new);
                consumer.getClass();
                map4.forEach((v1) -> {
                    r1.accept(v1);
                });
                if (createNewClient != null) {
                    if (0 == 0) {
                        createNewClient.close();
                        return;
                    }
                    try {
                        createNewClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createNewClient != null) {
                if (th != null) {
                    try {
                        createNewClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createNewClient.close();
                }
            }
            throw th4;
        }
    }

    private Map<String, Object> excludeKSQLCommonFields(Map<String, Object> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return !KSQL_COMMON_FIELDS.contains(entry.getKey());
        }).collect(Collectors.toMap(entry2 -> {
            return (String) entry2.getKey();
        }, entry3 -> {
            return entry3.getValue();
        }));
    }

    private BasicKSQLHttpClient createNewClient(String str, KSQLRequest kSQLRequest, String str2) throws IOException {
        BasicKSQLHttpClient build = new BasicKSQLHttpClient.Builder().withUrl(str2).withJson(json.writeValueAsString(kSQLRequest)).withType("POST").withHeader("Content-type", KSQL_V1_CONTENT_TYPE).build();
        BasicKSQLHttpClient put = this.clientCache.put(str, build);
        if (put != null) {
            put.close();
        }
        return build;
    }

    private void executeSelect(String str, final Consumer<KSQLResponse> consumer, KSQLRequest kSQLRequest) throws IOException {
        final List<String> fields = getFields(kSQLRequest);
        if (fields.isEmpty()) {
            throw new RuntimeException("Field are empty");
        }
        final BasicKSQLHttpClient createNewClient = createNewClient(str, kSQLRequest, this.queryUrl);
        Throwable th = null;
        try {
            try {
                createNewClient.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() { // from class: org.apache.zeppelin.ksql.KSQLRestService.1
                    @Override // org.apache.zeppelin.ksql.BasicKSQLHttpClient.BasicHTTPClientResponse
                    public void onMessage(int i, String str2) {
                        try {
                            KSQLResponse kSQLResponse = new KSQLResponse(fields, (Map) KSQLRestService.json.readValue(str2, LinkedHashMap.class));
                            consumer.accept(kSQLResponse);
                            if (kSQLResponse.isTerminal() || StringUtils.isNotBlank(kSQLResponse.getErrorMessage()) || StringUtils.isNotBlank(kSQLResponse.getFinalMessage())) {
                                createNewClient.close();
                            }
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }

                    @Override // org.apache.zeppelin.ksql.BasicKSQLHttpClient.BasicHTTPClientResponse
                    public void onError(int i, String str2) {
                        try {
                            consumer.accept(new KSQLResponse(Collections.singletonMap("error", str2)));
                            createNewClient.close();
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
                if (createNewClient != null) {
                    if (0 == 0) {
                        createNewClient.close();
                        return;
                    }
                    try {
                        createNewClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createNewClient != null) {
                if (th != null) {
                    try {
                        createNewClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createNewClient.close();
                }
            }
            throw th4;
        }
    }

    private void executePrint(String str, final Consumer<KSQLResponse> consumer, KSQLRequest kSQLRequest) throws IOException {
        final BasicKSQLHttpClient createNewClient = createNewClient(str, kSQLRequest, this.queryUrl);
        Throwable th = null;
        try {
            try {
                createNewClient.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() { // from class: org.apache.zeppelin.ksql.KSQLRestService.2
                    @Override // org.apache.zeppelin.ksql.BasicKSQLHttpClient.BasicHTTPClientResponse
                    public void onMessage(int i, String str2) {
                        if (str2.toUpperCase().startsWith("FORMAT:")) {
                            return;
                        }
                        List asList = Arrays.asList(str2.split(","));
                        LinkedHashMap linkedHashMap = new LinkedHashMap();
                        linkedHashMap.put("timestamp", asList.get(0));
                        linkedHashMap.put("offset", asList.get(1));
                        linkedHashMap.put("record", String.join("", asList.subList(2, asList.size())));
                        consumer.accept(new KSQLResponse(linkedHashMap));
                    }

                    @Override // org.apache.zeppelin.ksql.BasicKSQLHttpClient.BasicHTTPClientResponse
                    public void onError(int i, String str2) {
                        try {
                            consumer.accept(new KSQLResponse(Collections.singletonMap("error", str2)));
                            createNewClient.close();
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
                if (createNewClient != null) {
                    if (0 == 0) {
                        createNewClient.close();
                        return;
                    }
                    try {
                        createNewClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createNewClient != null) {
                if (th != null) {
                    try {
                        createNewClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createNewClient.close();
                }
            }
            throw th4;
        }
    }

    private boolean isSelect(KSQLRequest kSQLRequest) {
        return kSQLRequest.getKsql().toUpperCase().startsWith("SELECT");
    }

    private boolean isPrint(KSQLRequest kSQLRequest) {
        return kSQLRequest.getKsql().toUpperCase().startsWith("PRINT");
    }

    public void closeClient(String str) throws IOException {
        BasicKSQLHttpClient remove = this.clientCache.remove(str);
        if (remove != null) {
            remove.close();
        }
    }

    private List<String> getFields(KSQLRequest kSQLRequest) throws IOException {
        return getFields(kSQLRequest, false);
    }

    private List<String> getFields(KSQLRequest kSQLRequest, boolean z) throws IOException {
        if (z) {
            kSQLRequest = new KSQLRequest(kSQLRequest.getKsql().substring(0, kSQLRequest.getKsql().toUpperCase().indexOf("WHERE")), kSQLRequest.getStreamsProperties());
        }
        try {
            BasicKSQLHttpClient build = new BasicKSQLHttpClient.Builder().withUrl(this.ksqlUrl).withJson(json.writeValueAsString(kSQLRequest.toExplainRequest())).withType("POST").withHeader("Content-type", KSQL_V1_CONTENT_TYPE).build();
            Throwable th = null;
            try {
                try {
                    List<String> list = (List) ((List) ((Map) ((Map) ((List) json.readValue(build.connect(), List.class)).get(0)).getOrDefault("queryDescription", Collections.emptyMap())).getOrDefault("fields", Collections.emptyList())).stream().map(map -> {
                        return map.getOrDefault("name", "").toString();
                    }).filter(str -> {
                        return !str.isEmpty();
                    }).collect(Collectors.toList());
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                    return list;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            if (z) {
                throw e;
            }
            return getFields(kSQLRequest, true);
        }
    }

    public void close() {
        this.clientCache.keySet().forEach(str -> {
            try {
                closeClient(str);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }
}
