package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;

import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.NoDocumentFoundException;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

/* loaded from: input_file:org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.class */
public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument> implements DocumentStoreReader<TimelineDoc> {
    private static final int DEFAULT_DOCUMENTS_SIZE = 1;
    private static AsyncDocumentClient client;
    private final String databaseName;
    private static final String COLLECTION_LINK = "/dbs/%s/colls/%s";
    private static final String SELECT_TOP_FROM_COLLECTION = "SELECT TOP %d * FROM %s c";
    private static final String SELECT_ALL_FROM_COLLECTION = "SELECT  * FROM %s c";
    private static final String SELECT_DISTINCT_TYPES_FROM_COLLECTION = "SELECT  distinct c.type FROM %s c";
    private static final String ENTITY_TYPE_COLUMN = "type";
    private static final String WHERE_CLAUSE = " WHERE ";
    private static final String AND_OPERATOR = " AND ";
    private static final String CONTAINS_FUNC_FOR_ID = " CONTAINS(c.id, \"%s\") ";
    private static final String CONTAINS_FUNC_FOR_TYPE = " CONTAINS(c.type, \"%s\") ";
    private static final String ORDER_BY_CLAUSE = " ORDER BY c.createdTime";
    private static final Logger LOG = LoggerFactory.getLogger(CosmosDBDocumentStoreReader.class);
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
    private static Scheduler schedulerForBlockingWork = Schedulers.from(executorService);

    public CosmosDBDocumentStoreReader(Configuration configuration) {
        LOG.info("Initializing Cosmos DB DocumentStoreReader...");
        this.databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(configuration);
        initCosmosDBClient(configuration);
    }

    private synchronized void initCosmosDBClient(Configuration configuration) {
        if (client == null) {
            LOG.info("Creating Cosmos DB Reader Async Client...");
            client = DocumentStoreUtils.createCosmosDBAsyncClient(configuration);
            addShutdownHook();
        }
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader
    public List<TimelineDoc> readDocumentList(String str, TimelineReaderContext timelineReaderContext, Class<TimelineDoc> cls, long j) throws NoDocumentFoundException {
        List<TimelineDoc> queryDocuments = queryDocuments(str, timelineReaderContext, cls, j);
        if (queryDocuments.size() > 0) {
            return queryDocuments;
        }
        throw new NoDocumentFoundException("No documents were found while querying Collection : " + str);
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader
    public Set<String> fetchEntityTypes(String str, TimelineReaderContext timelineReaderContext) {
        StringBuilder sb = new StringBuilder();
        sb.append(String.format(SELECT_DISTINCT_TYPES_FROM_COLLECTION, str));
        String addPredicates = addPredicates(timelineReaderContext, str, sb);
        LOG.debug("Querying Collection : {} , with query {}", str, addPredicates);
        return Sets.newHashSet((Iterable) client.queryDocuments(String.format(COLLECTION_LINK, this.databaseName, str), addPredicates, new FeedOptions()).map((v0) -> {
            return v0.getResults();
        }).concatMap((v0) -> {
            return Observable.from(v0);
        }).map(document -> {
            return String.valueOf(document.get(ENTITY_TYPE_COLUMN));
        }).toList().subscribeOn(schedulerForBlockingWork).toBlocking().single());
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader
    public TimelineDoc readDocument(String str, TimelineReaderContext timelineReaderContext, Class<TimelineDoc> cls) throws NoDocumentFoundException {
        List<TimelineDoc> queryDocuments = queryDocuments(str, timelineReaderContext, cls, 1L);
        if (queryDocuments.size() > 0) {
            return queryDocuments.get(0);
        }
        throw new NoDocumentFoundException("No documents were found while querying Collection : " + str);
    }

    private List<TimelineDoc> queryDocuments(String str, TimelineReaderContext timelineReaderContext, Class<TimelineDoc> cls, long j) {
        String buildQueryWithPredicates = buildQueryWithPredicates(timelineReaderContext, str, j);
        LOG.debug("Querying Collection : {} , with query {}", str, buildQueryWithPredicates);
        return (List) client.queryDocuments(String.format(COLLECTION_LINK, this.databaseName, str), buildQueryWithPredicates, new FeedOptions()).map((v0) -> {
            return v0.getResults();
        }).concatMap((v0) -> {
            return Observable.from(v0);
        }).map(document -> {
            TimelineDocument timelineDocument = (TimelineDocument) document.toObject(cls);
            if (timelineDocument.getCreatedTime() == 0 && document.getTimestamp() != null) {
                timelineDocument.setCreatedTime(document.getTimestamp().getTime());
            }
            return timelineDocument;
        }).toList().subscribeOn(schedulerForBlockingWork).toBlocking().single();
    }

    private String buildQueryWithPredicates(TimelineReaderContext timelineReaderContext, String str, long j) {
        StringBuilder sb = new StringBuilder();
        if (j == -1) {
            sb.append(String.format(SELECT_ALL_FROM_COLLECTION, str));
        } else {
            sb.append(String.format(SELECT_TOP_FROM_COLLECTION, Long.valueOf(j), str));
        }
        return addPredicates(timelineReaderContext, str, sb);
    }

    @VisibleForTesting
    String addPredicates(TimelineReaderContext timelineReaderContext, String str, StringBuilder sb) {
        boolean z = false;
        sb.append(WHERE_CLAUSE);
        if (!DocumentStoreUtils.isNullOrEmpty(timelineReaderContext.getClusterId())) {
            z = DEFAULT_DOCUMENTS_SIZE;
            sb.append(String.format(CONTAINS_FUNC_FOR_ID, timelineReaderContext.getClusterId()));
        }
        if (!DocumentStoreUtils.isNullOrEmpty(timelineReaderContext.getUserId())) {
            z = DEFAULT_DOCUMENTS_SIZE;
            sb.append(AND_OPERATOR).append(String.format(CONTAINS_FUNC_FOR_ID, timelineReaderContext.getUserId()));
        }
        if (!DocumentStoreUtils.isNullOrEmpty(timelineReaderContext.getFlowName())) {
            z = DEFAULT_DOCUMENTS_SIZE;
            sb.append(AND_OPERATOR).append(String.format(CONTAINS_FUNC_FOR_ID, timelineReaderContext.getFlowName()));
        }
        if (!DocumentStoreUtils.isNullOrEmpty(timelineReaderContext.getAppId())) {
            z = DEFAULT_DOCUMENTS_SIZE;
            sb.append(AND_OPERATOR).append(String.format(CONTAINS_FUNC_FOR_ID, timelineReaderContext.getAppId()));
        }
        if (!DocumentStoreUtils.isNullOrEmpty(timelineReaderContext.getEntityId())) {
            z = DEFAULT_DOCUMENTS_SIZE;
            sb.append(AND_OPERATOR).append(String.format(CONTAINS_FUNC_FOR_ID, timelineReaderContext.getEntityId()));
        }
        if (timelineReaderContext.getFlowRunId() != null) {
            z = DEFAULT_DOCUMENTS_SIZE;
            sb.append(AND_OPERATOR).append(String.format(CONTAINS_FUNC_FOR_ID, timelineReaderContext.getFlowRunId()));
        }
        if (!DocumentStoreUtils.isNullOrEmpty(timelineReaderContext.getEntityType())) {
            z = DEFAULT_DOCUMENTS_SIZE;
            sb.append(AND_OPERATOR).append(String.format(CONTAINS_FUNC_FOR_TYPE, timelineReaderContext.getEntityType()));
        }
        if (!z) {
            throw new IllegalArgumentException("The TimelineReaderContext does not have enough information to query documents for Collection : " + str);
        }
        sb.append(ORDER_BY_CLAUSE);
        LOG.debug("CosmosDB Sql Query with predicates : {}", sb);
        return sb.toString();
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (client != null) {
            LOG.info("Closing Cosmos DB Reader Async Client...");
            client.close();
            client = null;
        }
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (executorService != null) {
                executorService.shutdown();
            }
        }));
    }
}
