package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.timeline.DataSegment;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.class */
public class StreamAppenderatorDriver extends BaseAppenderatorDriver {
    private static final Logger log = new Logger(StreamAppenderatorDriver.class);
    private final SegmentHandoffNotifier handoffNotifier;
    private final FireDepartmentMetrics metrics;
    private final ObjectMapper objectMapper;

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver$SegmentsForSequenceBuilder.class */
    private static class SegmentsForSequenceBuilder {
        private final NavigableMap<SegmentIdWithShardSpec, Pair<SegmentWithState, List<SegmentWithState>>> intervalToSegments = new TreeMap(Comparator.comparing((v0) -> {
            return v0.getInterval();
        }, Comparators.intervalsByStartThenEnd()));
        private final String lastSegmentId;

        SegmentsForSequenceBuilder(String str) {
            this.lastSegmentId = str;
        }

        void add(SegmentWithState segmentWithState) {
            SegmentIdWithShardSpec segmentIdentifier = segmentWithState.getSegmentIdentifier();
            Pair pair = (Pair) this.intervalToSegments.get(segmentIdentifier);
            List arrayList = (pair == null || pair.rhs == null) ? new ArrayList() : (List) pair.rhs;
            if (segmentWithState.getState() != SegmentWithState.SegmentState.APPENDING) {
                SegmentWithState segmentWithState2 = pair == null ? null : (SegmentWithState) pair.lhs;
                arrayList.add(segmentWithState);
                this.intervalToSegments.put(segmentIdentifier, Pair.of(segmentWithState2, arrayList));
            } else {
                if (pair != null && pair.lhs != null) {
                    throw new ISE("appendingSegment[%s] existed before adding an appendingSegment[%s]", new Object[]{pair.lhs, segmentWithState});
                }
                this.intervalToSegments.put(segmentIdentifier, Pair.of(segmentWithState, arrayList));
            }
        }

        BaseAppenderatorDriver.SegmentsForSequence build() {
            TreeMap treeMap = new TreeMap();
            for (Map.Entry<SegmentIdWithShardSpec, Pair<SegmentWithState, List<SegmentWithState>>> entry : this.intervalToSegments.entrySet()) {
                treeMap.put(Long.valueOf(entry.getKey().getInterval().getStartMillis()), new BaseAppenderatorDriver.SegmentsOfInterval(entry.getKey().getInterval(), (SegmentWithState) entry.getValue().lhs, (List) entry.getValue().rhs));
            }
            return new BaseAppenderatorDriver.SegmentsForSequence(treeMap, this.lastSegmentId);
        }
    }

    public StreamAppenderatorDriver(Appenderator appenderator, SegmentAllocator segmentAllocator, SegmentHandoffNotifierFactory segmentHandoffNotifierFactory, UsedSegmentChecker usedSegmentChecker, DataSegmentKiller dataSegmentKiller, ObjectMapper objectMapper, FireDepartmentMetrics fireDepartmentMetrics) {
        super(appenderator, segmentAllocator, usedSegmentChecker, dataSegmentKiller);
        this.handoffNotifier = ((SegmentHandoffNotifierFactory) Preconditions.checkNotNull(segmentHandoffNotifierFactory, "handoffNotifierFactory")).createSegmentHandoffNotifier(appenderator.getDataSource());
        this.metrics = (FireDepartmentMetrics) Preconditions.checkNotNull(fireDepartmentMetrics, "metrics");
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper, "objectMapper");
    }

    @Override // org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver
    @Nullable
    public Object startJob(AppenderatorDriverSegmentLockHelper appenderatorDriverSegmentLockHelper) {
        this.handoffNotifier.start();
        AppenderatorDriverMetadata appenderatorDriverMetadata = (AppenderatorDriverMetadata) this.objectMapper.convertValue(this.appenderator.startJob(), AppenderatorDriverMetadata.class);
        if (appenderatorDriverMetadata == null) {
            return null;
        }
        synchronized (this.segments) {
            Map<String, String> lastSegmentIds = appenderatorDriverMetadata.getLastSegmentIds();
            Preconditions.checkState(appenderatorDriverMetadata.getSegments().keySet().equals(lastSegmentIds.keySet()), "Sequences for segment states and last segment IDs are not same");
            TreeMap treeMap = new TreeMap();
            for (Map.Entry<String, List<SegmentWithState>> entry : appenderatorDriverMetadata.getSegments().entrySet()) {
                String key = entry.getKey();
                SegmentsForSequenceBuilder segmentsForSequenceBuilder = new SegmentsForSequenceBuilder(lastSegmentIds.get(key));
                treeMap.put(key, segmentsForSequenceBuilder);
                List<SegmentWithState> value = entry.getValue();
                segmentsForSequenceBuilder.getClass();
                value.forEach(segmentsForSequenceBuilder::add);
                if (appenderatorDriverSegmentLockHelper != null) {
                    for (SegmentWithState segmentWithState : entry.getValue()) {
                        if (segmentWithState.getState() != SegmentWithState.SegmentState.PUSHED_AND_DROPPED && !appenderatorDriverSegmentLockHelper.lock(segmentWithState.getSegmentIdentifier())) {
                            throw new ISE("Failed to lock segment[%s]", new Object[]{segmentWithState.getSegmentIdentifier()});
                        }
                    }
                }
            }
            treeMap.forEach((str, segmentsForSequenceBuilder2) -> {
                this.segments.put(str, segmentsForSequenceBuilder2.build());
            });
        }
        return appenderatorDriverMetadata.getCallerMetadata();
    }

    public AppenderatorDriverAddResult add(InputRow inputRow, String str, Supplier<Committer> supplier) throws IOException {
        return append(inputRow, str, supplier, false, true);
    }

    public AppenderatorDriverAddResult add(InputRow inputRow, String str, Supplier<Committer> supplier, boolean z, boolean z2) throws IOException {
        return append(inputRow, str, supplier, z, z2);
    }

    public void moveSegmentOut(String str, List<SegmentIdWithShardSpec> list) {
        synchronized (this.segments) {
            BaseAppenderatorDriver.SegmentsForSequence segmentsForSequence = this.segments.get(str);
            if (segmentsForSequence == null) {
                throw new ISE("Asked to remove segments for sequenceName[%s], which doesn't exist", new Object[]{str});
            }
            for (SegmentIdWithShardSpec segmentIdWithShardSpec : list) {
                log.info("Moving segment[%s] out of active list.", new Object[]{segmentIdWithShardSpec});
                BaseAppenderatorDriver.SegmentsOfInterval segmentsOfInterval = segmentsForSequence.get(segmentIdWithShardSpec.getInterval().getStartMillis());
                if (segmentsOfInterval == null || segmentsOfInterval.getAppendingSegment() == null || !segmentsOfInterval.getAppendingSegment().getSegmentIdentifier().equals(segmentIdWithShardSpec)) {
                    throw new ISE("Asked to remove segment[%s], which doesn't exist", new Object[]{segmentIdWithShardSpec});
                }
                segmentsOfInterval.finishAppendingToCurrentActiveSegment((v0) -> {
                    v0.finishAppending();
                });
            }
        }
    }

    public Object persist(Committer committer) throws InterruptedException {
        try {
            log.debug("Persisting pending data.", new Object[0]);
            long currentTimeMillis = System.currentTimeMillis();
            Object obj = this.appenderator.persistAll(wrapCommitter(committer)).get();
            log.debug("Persisted pending data in %,dms.", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            return obj;
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public ListenableFuture<Object> persistAsync(Committer committer) {
        return this.appenderator.persistAll(wrapCommitter(committer));
    }

    public ListenableFuture<SegmentsAndCommitMetadata> publish(TransactionalSegmentPublisher transactionalSegmentPublisher, Committer committer, Collection<String> collection) {
        return Futures.transform(Futures.transform(pushInBackground(wrapCommitter(committer), getSegmentIdsWithShardSpecs(collection), true), segmentsAndCommitMetadata -> {
            return publishInBackground(null, segmentsAndCommitMetadata, transactionalSegmentPublisher, Function.identity());
        }), segmentsAndCommitMetadata2 -> {
            synchronized (this.segments) {
                Map<String, BaseAppenderatorDriver.SegmentsForSequence> map = this.segments;
                map.getClass();
                collection.forEach((v1) -> {
                    r1.remove(v1);
                });
            }
            return segmentsAndCommitMetadata2;
        });
    }

    public ListenableFuture<SegmentsAndCommitMetadata> registerHandoff(SegmentsAndCommitMetadata segmentsAndCommitMetadata) {
        if (segmentsAndCommitMetadata == null) {
            return Futures.immediateFuture((Object) null);
        }
        List<SegmentIdWithShardSpec> list = (List) segmentsAndCommitMetadata.getSegments().stream().map(SegmentIdWithShardSpec::fromDataSegment).collect(Collectors.toList());
        Object checkNotNull = Preconditions.checkNotNull(segmentsAndCommitMetadata.getCommitMetadata(), "commitMetadata");
        if (list.isEmpty()) {
            return Futures.immediateFuture(new SegmentsAndCommitMetadata(segmentsAndCommitMetadata.getSegments(), ((AppenderatorDriverMetadata) checkNotNull).getCallerMetadata()));
        }
        log.debug("Register handoff of segments: [%s]", new Object[]{list});
        SettableFuture create = SettableFuture.create();
        AtomicInteger atomicInteger = new AtomicInteger(list.size());
        for (SegmentIdWithShardSpec segmentIdWithShardSpec : list) {
            this.handoffNotifier.registerSegmentHandoffCallback(new SegmentDescriptor(segmentIdWithShardSpec.getInterval(), segmentIdWithShardSpec.getVersion(), segmentIdWithShardSpec.getShardSpec().getPartitionNum()), Execs.directExecutor(), () -> {
                log.debug("Segment[%s] successfully handed off, dropping.", new Object[]{segmentIdWithShardSpec});
                this.metrics.incrementHandOffCount();
                Futures.addCallback(this.appenderator.drop(segmentIdWithShardSpec), new FutureCallback<Object>() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver.1
                    public void onSuccess(Object obj) {
                        if (atomicInteger.decrementAndGet() == 0) {
                            List<DataSegment> segments = segmentsAndCommitMetadata.getSegments();
                            StreamAppenderatorDriver.log.debug("Successfully handed off [%d] segments.", new Object[]{Integer.valueOf(segments.size())});
                            create.set(new SegmentsAndCommitMetadata(segments, ((AppenderatorDriverMetadata) checkNotNull).getCallerMetadata()));
                        }
                    }

                    public void onFailure(Throwable th) {
                        StreamAppenderatorDriver.log.warn(th, "Failed to drop segment[%s]?!", new Object[]{segmentIdWithShardSpec});
                        atomicInteger.decrementAndGet();
                        create.setException(th);
                    }
                });
            });
        }
        return create;
    }

    public ListenableFuture<SegmentsAndCommitMetadata> publishAndRegisterHandoff(TransactionalSegmentPublisher transactionalSegmentPublisher, Committer committer, Collection<String> collection) {
        return Futures.transform(publish(transactionalSegmentPublisher, committer, collection), this::registerHandoff);
    }

    @Override // org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.handoffNotifier.close();
    }
}
