package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/TimelineCollectionWriter.class */
public class TimelineCollectionWriter<Document extends TimelineDocument> {
    private static final String DOCUMENT_BUFFER_SIZE_CONF = "yarn.timeline-service.document-buffer.size";
    private static final int DEFAULT_BUFFER_SIZE = 1024;
    private static final int AWAIT_TIMEOUT_SECS = 5;
    private final CollectionType collectionType;
    private final DocumentStoreWriter<Document> documentStoreWriter;
    private final Map<String, Document> documentsBuffer;
    private final int maxBufferSize;
    private final ScheduledExecutorService scheduledDocumentsFlusher;
    private final ExecutorService documentsBufferFullFlusher;
    private static final Logger LOG = LoggerFactory.getLogger(TimelineCollectionWriter.class);
    private static final PerNodeAggTimelineCollectorMetrics METRICS = PerNodeAggTimelineCollectorMetrics.getInstance();

    public TimelineCollectionWriter(CollectionType collectionType, Configuration configuration) throws YarnException {
        LOG.info("Initializing TimelineCollectionWriter for collection type : {}", collectionType);
        int i = configuration.getInt("yarn.timeline-service.writer.flush-interval-seconds", 60);
        this.maxBufferSize = configuration.getInt(DOCUMENT_BUFFER_SIZE_CONF, DEFAULT_BUFFER_SIZE);
        this.documentsBuffer = new HashMap(this.maxBufferSize);
        this.collectionType = collectionType;
        this.documentStoreWriter = DocumentStoreFactory.createDocumentStoreWriter(configuration);
        this.scheduledDocumentsFlusher = Executors.newSingleThreadScheduledExecutor();
        this.scheduledDocumentsFlusher.scheduleAtFixedRate(this::flush, i, i, TimeUnit.SECONDS);
        this.documentsBufferFullFlusher = Executors.newSingleThreadExecutor();
    }

    public void writeDocument(Document document) {
        long monotonicNow = Time.monotonicNow();
        synchronized (this.documentsBuffer) {
            if (this.documentsBuffer.size() == this.maxBufferSize) {
                Map<String, Document> copyToFlushBuffer = copyToFlushBuffer();
                this.documentsBufferFullFlusher.execute(() -> {
                    flush(copyToFlushBuffer);
                });
            }
            Document document2 = this.documentsBuffer.get(document.getId());
            if (document2 != null) {
                document2.merge(document);
            } else {
                document2 = document;
            }
            this.documentsBuffer.put(document2.getId(), document2);
        }
        METRICS.addAsyncPutEntitiesLatency(Time.monotonicNow() - monotonicNow, true);
    }

    private Map<String, Document> copyToFlushBuffer() {
        HashMap hashMap = new HashMap();
        synchronized (this.documentsBuffer) {
            if (this.documentsBuffer.size() > 0) {
                hashMap.putAll(this.documentsBuffer);
                this.documentsBuffer.clear();
            }
        }
        return hashMap;
    }

    private void flush(Map<String, Document> map) {
        Iterator<Document> it = map.values().iterator();
        while (it.hasNext()) {
            this.documentStoreWriter.writeDocument(it.next(), this.collectionType);
        }
    }

    public void flush() {
        flush(copyToFlushBuffer());
    }

    public void close() throws Exception {
        this.scheduledDocumentsFlusher.shutdown();
        this.documentsBufferFullFlusher.shutdown();
        flush();
        this.scheduledDocumentsFlusher.awaitTermination(5L, TimeUnit.SECONDS);
        this.documentsBufferFullFlusher.awaitTermination(5L, TimeUnit.SECONDS);
        this.documentStoreWriter.close();
    }
}
