package io.druid.indexing.kafka;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.Committer;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import io.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.kafka.supervisor.KafkaSupervisor;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthorizationUtils;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.ForbiddenException;
import io.druid.server.security.Resource;
import io.druid.server.security.ResourceAction;
import io.druid.server.security.ResourceType;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;

/* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTask.class */
public class KafkaIndexTask extends AbstractTask implements ChatHandler {
    public static final long PAUSE_FOREVER = -1;
    private static final String TYPE = "index_kafka";
    private static final long POLL_TIMEOUT = 100;
    private static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
    private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
    private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
    private final DataSchema dataSchema;
    private final InputRowParser<ByteBuffer> parser;
    private final KafkaTuningConfig tuningConfig;
    private final KafkaIOConfig ioConfig;
    private final AuthorizerMapper authorizerMapper;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    private final Map<Integer, Long> endOffsets;
    private final Map<Integer, Long> nextOffsets;
    private final Map<Integer, Long> maxEndOffsets;
    private final Map<Integer, Long> lastPersistedOffsets;
    private TaskToolbox toolbox;
    private volatile Appenderator appenderator;
    private volatile StreamAppenderatorDriver driver;
    private volatile FireDepartmentMetrics fireDepartmentMetrics;
    private volatile DateTime startTime;
    private volatile Status status;
    private volatile Thread runThread;
    private volatile File sequencesPersistFile;
    private final AtomicBoolean stopRequested;
    private final AtomicBoolean publishOnStop;
    private final Lock pauseLock;
    private final Condition hasPaused;
    private final Condition shouldResume;
    private final Lock pollRetryLock;
    private final Condition isAwaitingRetry;
    private final Object statusLock;
    private volatile boolean pauseRequested;
    private volatile long pauseMillis;
    private long pollRetryMs;
    private final Set<String> publishingSequences;
    private final BlockingQueue<SequenceMetadata> publishQueue;
    private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList;
    private final CountDownLatch waitForPublishes;
    private final AtomicReference<Throwable> throwableAtomicReference;
    private final String topic;
    private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
    private ListeningExecutorService publishExecService;
    private final boolean useLegacy;
    private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
    private static final Random RANDOM = new Random();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTask$SequenceMetadata.class */
    public static class SequenceMetadata {
        private final int sequenceId;
        private final String sequenceName;
        private final Map<Integer, Long> startOffsets;
        private final Map<Integer, Long> endOffsets;
        private final Set<Integer> assignments;
        private final boolean sentinel;
        private volatile boolean checkpointed;

        @JsonCreator
        public SequenceMetadata(@JsonProperty("sequenceId") int i, @JsonProperty("sequenceName") String str, @JsonProperty("startOffsets") Map<Integer, Long> map, @JsonProperty("endOffsets") Map<Integer, Long> map2, @JsonProperty("checkpointed") boolean z) {
            Preconditions.checkNotNull(str);
            Preconditions.checkNotNull(map);
            Preconditions.checkNotNull(map2);
            this.sequenceId = i;
            this.sequenceName = str;
            this.startOffsets = ImmutableMap.copyOf(map);
            this.endOffsets = Maps.newHashMap(map2);
            this.assignments = Sets.newHashSet(map.keySet());
            this.checkpointed = z;
            this.sentinel = false;
        }

        @JsonProperty
        public int getSequenceId() {
            return this.sequenceId;
        }

        @JsonProperty
        public boolean isCheckpointed() {
            return this.checkpointed;
        }

        @JsonProperty
        public String getSequenceName() {
            return this.sequenceName;
        }

        @JsonProperty
        public Map<Integer, Long> getStartOffsets() {
            return this.startOffsets;
        }

        @JsonProperty
        public Map<Integer, Long> getEndOffsets() {
            return this.endOffsets;
        }

        @JsonProperty
        public boolean isSentinel() {
            return this.sentinel;
        }

        public void setEndOffsets(Map<Integer, Long> map) {
            this.endOffsets.putAll(map);
            this.checkpointed = true;
        }

        public void updateAssignments(Map<Integer, Long> map) {
            this.assignments.clear();
            map.entrySet().forEach(entry -> {
                if (Longs.compare(this.endOffsets.get(entry.getKey()).longValue(), ((Long) map.get(entry.getKey())).longValue()) > 0) {
                    this.assignments.add(entry.getKey());
                }
            });
        }

        public boolean isOpen() {
            return !this.assignments.isEmpty();
        }

        boolean canHandle(ConsumerRecord<byte[], byte[]> consumerRecord) {
            return isOpen() && this.endOffsets.get(Integer.valueOf(consumerRecord.partition())) != null && consumerRecord.offset() >= this.startOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue() && consumerRecord.offset() < this.endOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue();
        }

        private SequenceMetadata() {
            this.sequenceId = -1;
            this.sequenceName = null;
            this.startOffsets = null;
            this.endOffsets = null;
            this.assignments = null;
            this.checkpointed = true;
            this.sentinel = true;
        }

        public static SequenceMetadata getSentinelSequenceMetadata() {
            return new SequenceMetadata();
        }

        public String toString() {
            return "SequenceMetadata{sequenceName='" + this.sequenceName + "', sequenceId=" + this.sequenceId + ", startOffsets=" + this.startOffsets + ", endOffsets=" + this.endOffsets + ", assignments=" + this.assignments + ", sentinel=" + this.sentinel + ", checkpointed=" + this.checkpointed + '}';
        }

        public Supplier<Committer> getCommitterSupplier(String str, Map<Integer, Long> map) {
            return () -> {
                return new Committer() { // from class: io.druid.indexing.kafka.KafkaIndexTask.SequenceMetadata.1
                    public Object getMetadata() {
                        Preconditions.checkState(SequenceMetadata.this.assignments.isEmpty(), "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer", new Object[]{SequenceMetadata.this.endOffsets});
                        for (Map.Entry entry : SequenceMetadata.this.endOffsets.entrySet()) {
                            map.put(entry.getKey(), Long.valueOf(Math.max(((Long) entry.getValue()).longValue(), ((Long) map.getOrDefault(entry.getKey(), 0L)).longValue())));
                        }
                        return ImmutableMap.of(KafkaIndexTask.METADATA_NEXT_PARTITIONS, new KafkaPartitions(str, map), KafkaIndexTask.METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(str, SequenceMetadata.this.endOffsets));
                    }

                    public void run() {
                    }
                };
            };
        }

        public TransactionalSegmentPublisher getPublisher(TaskToolbox taskToolbox, boolean z) {
            return (set, obj) -> {
                KafkaPartitions kafkaPartitions = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) Preconditions.checkNotNull(obj, "commitMetadata")).get(KafkaIndexTask.METADATA_PUBLISH_PARTITIONS), KafkaPartitions.class);
                if (!getEndOffsets().equals(kafkaPartitions.getPartitionOffsetMap())) {
                    throw new ISE("WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].", new Object[]{toString(), obj});
                }
                SegmentTransactionalInsertAction segmentTransactionalInsertAction = z ? new SegmentTransactionalInsertAction(set, new KafkaDataSourceMetadata(new KafkaPartitions(kafkaPartitions.getTopic(), getStartOffsets())), new KafkaDataSourceMetadata(kafkaPartitions)) : new SegmentTransactionalInsertAction(set, (DataSourceMetadata) null, (DataSourceMetadata) null);
                KafkaIndexTask.log.info("Publishing with isTransaction[%s].", new Object[]{Boolean.valueOf(z)});
                return ((SegmentPublishResult) taskToolbox.getTaskActionClient().submit(segmentTransactionalInsertAction)).isSuccess();
            };
        }
    }

    /* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTask$Status.class */
    public enum Status {
        NOT_STARTED,
        STARTING,
        READING,
        PAUSED,
        PUBLISHING
    }

    @JsonCreator
    public KafkaIndexTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") KafkaTuningConfig kafkaTuningConfig, @JsonProperty("ioConfig") KafkaIOConfig kafkaIOConfig, @JsonProperty("context") Map<String, Object> map, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper) {
        super(str == null ? makeTaskId(dataSchema.getDataSource(), RANDOM.nextInt()) : str, StringUtils.format("%s_%s", new Object[]{TYPE, dataSchema.getDataSource()}), taskResource, dataSchema.getDataSource(), map);
        this.endOffsets = new ConcurrentHashMap();
        this.nextOffsets = new ConcurrentHashMap();
        this.maxEndOffsets = new HashMap();
        this.lastPersistedOffsets = new ConcurrentHashMap();
        this.appenderator = null;
        this.driver = null;
        this.fireDepartmentMetrics = null;
        this.status = Status.NOT_STARTED;
        this.runThread = null;
        this.sequencesPersistFile = null;
        this.stopRequested = new AtomicBoolean(false);
        this.publishOnStop = new AtomicBoolean(false);
        this.pauseLock = new ReentrantLock();
        this.hasPaused = this.pauseLock.newCondition();
        this.shouldResume = this.pauseLock.newCondition();
        this.pollRetryLock = new ReentrantLock();
        this.isAwaitingRetry = this.pollRetryLock.newCondition();
        this.statusLock = new Object();
        this.pauseRequested = false;
        this.pauseMillis = 0L;
        this.pollRetryMs = 30000L;
        this.publishingSequences = Sets.newConcurrentHashSet();
        this.publishQueue = new LinkedBlockingQueue();
        this.handOffWaitList = new CopyOnWriteArrayList();
        this.waitForPublishes = new CountDownLatch(1);
        this.throwableAtomicReference = new AtomicReference<>();
        this.dataSchema = (DataSchema) Preconditions.checkNotNull(dataSchema, "dataSchema");
        this.parser = (InputRowParser) Preconditions.checkNotNull(dataSchema.getParser(), "parser");
        this.tuningConfig = (KafkaTuningConfig) Preconditions.checkNotNull(kafkaTuningConfig, "tuningConfig");
        this.ioConfig = (KafkaIOConfig) Preconditions.checkNotNull(kafkaIOConfig, "ioConfig");
        this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
        this.authorizerMapper = authorizerMapper;
        this.endOffsets.putAll(kafkaIOConfig.getEndPartitions().getPartitionOffsetMap());
        this.maxEndOffsets.putAll((Map) this.endOffsets.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.MAX_VALUE;
        })));
        this.topic = kafkaIOConfig.getStartPartitions().getTopic();
        this.sequences = new CopyOnWriteArrayList<>();
        if (map == null || map.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) == null || !((Boolean) map.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED)).booleanValue()) {
            this.useLegacy = true;
        } else {
            this.useLegacy = false;
        }
    }

    @VisibleForTesting
    void setPollRetryMs(long j) {
        this.pollRetryMs = j;
    }

    private static String makeTaskId(String str, int i) {
        StringBuilder sb = new StringBuilder(8);
        for (int i2 = 0; i2 < 8; i2++) {
            sb.append((char) (97 + ((i >>> (i2 * 4)) & 15)));
        }
        return Joiner.on("_").join(TYPE, str, new Object[]{sb});
    }

    public int getPriority() {
        return ((Integer) getContextValue("priority", 75)).intValue();
    }

    public String getType() {
        return TYPE;
    }

    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        return true;
    }

    @JsonProperty
    public DataSchema getDataSchema() {
        return this.dataSchema;
    }

    @JsonProperty
    public KafkaTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @JsonProperty("ioConfig")
    public KafkaIOConfig getIOConfig() {
        return this.ioConfig;
    }

    private void createAndStartPublishExecutor() {
        this.publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver"));
        this.publishExecService.submit(() -> {
            while (true) {
                try {
                    SequenceMetadata take = this.publishQueue.take();
                    Preconditions.checkNotNull(this.driver);
                    if (take.isSentinel()) {
                        this.waitForPublishes.countDown();
                        return;
                    }
                    log.info("Publishing segments for sequence [%s]", new Object[]{take});
                    SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata) this.driver.publish(take.getPublisher(this.toolbox, this.ioConfig.isUseTransaction()), (Committer) take.getCommitterSupplier(this.topic, this.lastPersistedOffsets).get(), ImmutableList.of(take.getSequenceName())).get();
                    if (segmentsAndMetadata == null) {
                        throw new ISE("Transaction failure publishing segments for sequence [%s]", new Object[]{take});
                    }
                    log.info("Published segments[%s] with metadata[%s].", new Object[]{Joiner.on(", ").join((Iterable) segmentsAndMetadata.getSegments().stream().map((v0) -> {
                        return v0.getIdentifier();
                    }).collect(Collectors.toList())), Preconditions.checkNotNull(segmentsAndMetadata.getCommitMetadata(), "commitMetadata")});
                    this.sequences.remove(take);
                    this.publishingSequences.remove(take.getSequenceName());
                    try {
                        persistSequences();
                    } catch (IOException e) {
                        log.error(e, "Unable to persist state, dying", new Object[0]);
                        Throwables.propagate(e);
                    }
                    this.handOffWaitList.add(this.driver.registerHandoff(segmentsAndMetadata));
                } catch (Throwable th) {
                    if ((th instanceof InterruptedException) || ((th instanceof RejectedExecutionException) && (th.getCause() instanceof InterruptedException))) {
                        log.warn("Stopping publish thread as we are interrupted, probably we are shutting down", new Object[0]);
                    } else {
                        log.makeAlert(th, "Error in publish thread, dying", new Object[0]).emit();
                        this.throwableAtomicReference.set(th);
                    }
                    Futures.allAsList(this.handOffWaitList).cancel(true);
                    this.waitForPublishes.countDown();
                    return;
                }
            }
        });
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v206, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v255, types: [java.util.List] */
    public TaskStatus run(TaskToolbox taskToolbox) throws Exception {
        Map.Entry entry;
        if (this.useLegacy) {
            return runLegacy(taskToolbox);
        }
        log.info("Starting up!", new Object[0]);
        this.startTime = DateTimes.nowUtc();
        this.status = Status.STARTING;
        this.toolbox = taskToolbox;
        if (getContext() == null || getContext().get("checkpoints") == null) {
            this.sequences.add(new SequenceMetadata(0, StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), 0}), this.ioConfig.getStartPartitions().getPartitionOffsetMap(), this.maxEndOffsets, false));
        } else {
            log.info("Got checkpoints [%s]", new Object[]{(String) getContext().get("checkpoints")});
            Iterator it = ((TreeMap) taskToolbox.getObjectMapper().readValue((String) getContext().get("checkpoints"), new TypeReference<TreeMap<Integer, Map<Integer, Long>>>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.1
            })).entrySet().iterator();
            Map.Entry entry2 = (Map.Entry) it.next();
            while (true) {
                entry = entry2;
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry entry3 = (Map.Entry) it.next();
                this.sequences.add(new SequenceMetadata(((Integer) entry.getKey()).intValue(), StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), entry.getKey()}), (Map) entry.getValue(), (Map) entry3.getValue(), true));
                entry2 = entry3;
            }
            this.sequences.add(new SequenceMetadata(((Integer) entry.getKey()).intValue(), StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), entry.getKey()}), (Map) entry.getValue(), this.maxEndOffsets, false));
        }
        this.sequencesPersistFile = new File(taskToolbox.getPersistDir(), "sequences.json");
        restoreSequences();
        log.info("Starting with sequences:  %s", new Object[]{this.sequences});
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider) this.chatHandlerProvider.get()).getClass().getName()});
            ((ChatHandlerProvider) this.chatHandlerProvider.get()).register(getId(), this, false);
        } else {
            log.warn("No chat handler detected", new Object[0]);
        }
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartment = new FireDepartment(this.dataSchema, new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null, (FirehoseFactoryV2) null), (RealtimeTuningConfig) null);
        this.fireDepartmentMetrics = fireDepartment.getMetrics();
        taskToolbox.getMonitorScheduler().addMonitor(new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment), ImmutableMap.of("taskId", new String[]{getId()})));
        LookupNodeService lookupNodeService = getContextValue("lookupTier") == null ? taskToolbox.getLookupNodeService() : new LookupNodeService((String) getContextValue("lookupTier"));
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(taskToolbox.getDruidNode(), "peon", ImmutableMap.of(taskToolbox.getDataNodeService().getName(), taskToolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService));
        try {
            try {
                KafkaConsumer<byte[], byte[]> newConsumer = newConsumer();
                Throwable th = null;
                try {
                    taskToolbox.getDataSegmentServerAnnouncer().announce();
                    taskToolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
                    this.appenderator = newAppenderator(this.fireDepartmentMetrics, taskToolbox);
                    this.driver = newDriver(this.appenderator, taskToolbox, this.fireDepartmentMetrics);
                    createAndStartPublishExecutor();
                    String topic = this.ioConfig.getStartPartitions().getTopic();
                    Object startJob = this.driver.startJob();
                    if (startJob == null) {
                        Preconditions.checkState(this.sequences.get(0).startOffsets.entrySet().stream().allMatch(entry4 -> {
                            return Longs.compare(((Long) entry4.getValue()).longValue(), this.ioConfig.getStartPartitions().getPartitionOffsetMap().get(entry4.getKey()).longValue()) >= 0;
                        }), "Sequence offsets are not compatible with start offsets of task");
                        this.nextOffsets.putAll(this.sequences.get(0).startOffsets);
                    } else {
                        KafkaPartitions kafkaPartitions = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) startJob).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                        this.nextOffsets.putAll(kafkaPartitions.getPartitionOffsetMap());
                        if (!kafkaPartitions.getTopic().equals(this.ioConfig.getStartPartitions().getTopic())) {
                            throw new ISE("WTF?! Restored topic[%s] but expected topic[%s]", new Object[]{kafkaPartitions.getTopic(), this.ioConfig.getStartPartitions().getTopic()});
                        }
                        if (!this.nextOffsets.keySet().equals(this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
                            throw new ISE("WTF?! Restored partitions[%s] but expected partitions[%s]", new Object[]{this.nextOffsets.keySet(), this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()});
                        }
                        if (this.sequences.size() == 0 || this.sequences.get(this.sequences.size() - 1).isCheckpointed()) {
                            this.endOffsets.putAll(this.sequences.size() == 0 ? this.nextOffsets : this.sequences.get(this.sequences.size() - 1).getEndOffsets());
                            log.info("End offsets changed to [%s]", new Object[]{this.endOffsets});
                        }
                    }
                    Supplier<Committer> supplier = () -> {
                        final Map<? extends Integer, ? extends Long> copyOf = ImmutableMap.copyOf(this.nextOffsets);
                        this.lastPersistedOffsets.clear();
                        this.lastPersistedOffsets.putAll(copyOf);
                        return new Committer() { // from class: io.druid.indexing.kafka.KafkaIndexTask.2
                            public Object getMetadata() {
                                return ImmutableMap.of(KafkaIndexTask.METADATA_NEXT_PARTITIONS, new KafkaPartitions(KafkaIndexTask.this.ioConfig.getStartPartitions().getTopic(), copyOf));
                            }

                            public void run() {
                            }
                        };
                    };
                    maybePersistAndPublishSequences(supplier);
                    Set<Integer> assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
                    boolean z = !assignPartitionsAndSeekToNext.isEmpty();
                    this.status = Status.READING;
                    while (z) {
                        try {
                            if (possiblyPause(assignPartitionsAndSeekToNext)) {
                                assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
                                if (assignPartitionsAndSeekToNext.isEmpty()) {
                                    log.info("All partitions have been fully read", new Object[0]);
                                    this.publishOnStop.set(true);
                                    this.stopRequested.set(true);
                                }
                            }
                            if (this.stopRequested.get() || (this.sequences.get(this.sequences.size() - 1).isCheckpointed() && !this.ioConfig.isPauseAfterRead())) {
                                this.status = Status.PUBLISHING;
                            }
                            if (this.stopRequested.get()) {
                                break;
                            }
                            checkAndMaybeThrowException();
                            if (!this.ioConfig.isPauseAfterRead()) {
                                maybePersistAndPublishSequences(supplier);
                            }
                            ConsumerRecords empty = ConsumerRecords.empty();
                            try {
                                empty = newConsumer.poll(POLL_TIMEOUT);
                            } catch (OffsetOutOfRangeException e) {
                                log.warn("OffsetOutOfRangeException with message [%s]", new Object[]{e.getMessage()});
                                possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), newConsumer, taskToolbox);
                                z = this.ioConfig.isPauseAfterRead() || !assignPartitionsAndSeekToNext.isEmpty();
                            }
                            SequenceMetadata sequenceMetadata = null;
                            Iterator it2 = empty.iterator();
                            while (it2.hasNext()) {
                                ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it2.next();
                                if (log.isTraceEnabled()) {
                                    log.trace("Got topic[%s] partition[%d] offset[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                                }
                                if (consumerRecord.offset() < this.endOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                                    if (consumerRecord.offset() != this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                                        if (!this.ioConfig.isSkipOffsetGaps()) {
                                            throw new ISE("WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", new Object[]{Long.valueOf(consumerRecord.offset()), this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())), Integer.valueOf(consumerRecord.partition())});
                                        }
                                        log.warn("Skipped to offset[%,d] after offset[%,d] in partition[%d].", new Object[]{Long.valueOf(consumerRecord.offset()), this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())), Integer.valueOf(consumerRecord.partition())});
                                    }
                                    try {
                                        byte[] bArr = (byte[]) consumerRecord.value();
                                        boolean z2 = false;
                                        for (InputRow inputRow : bArr == null ? Utils.nullableListOf(new InputRow[]{(InputRow) null}) : this.parser.parseBatch(ByteBuffer.wrap(bArr))) {
                                            if (inputRow == null || !withinMinMaxRecordTime(inputRow)) {
                                                this.fireDepartmentMetrics.incrementThrownAway();
                                            } else {
                                                SequenceMetadata sequenceMetadata2 = null;
                                                Iterator<SequenceMetadata> it3 = this.sequences.iterator();
                                                while (it3.hasNext()) {
                                                    SequenceMetadata next = it3.next();
                                                    if (next.canHandle(consumerRecord)) {
                                                        sequenceMetadata2 = next;
                                                    }
                                                }
                                                if (sequenceMetadata2 == null) {
                                                    throw new ISE("WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), this.sequences});
                                                }
                                                AppenderatorDriverAddResult add = this.driver.add(inputRow, sequenceMetadata2.getSequenceName(), supplier, true, false);
                                                if (!add.isOk()) {
                                                    throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{inputRow.getTimestamp()});
                                                }
                                                if (add.getNumRowsInSegment() > this.tuningConfig.getMaxRowsPerSegment() && !sequenceMetadata2.isCheckpointed()) {
                                                    sequenceMetadata = sequenceMetadata2;
                                                }
                                                z2 |= add.isPersistRequired();
                                                this.fireDepartmentMetrics.incrementProcessed();
                                            }
                                        }
                                        if (z2) {
                                            Futures.addCallback(this.driver.persistAsync((Committer) supplier.get()), new FutureCallback<Object>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.3
                                                public void onSuccess(@Nullable Object obj) {
                                                    KafkaIndexTask.log.info("Persist completed with metadata [%s]", new Object[]{obj});
                                                }

                                                public void onFailure(Throwable th2) {
                                                    KafkaIndexTask.log.error("Persist failed, dying", new Object[0]);
                                                    KafkaIndexTask.this.throwableAtomicReference.set(th2);
                                                }
                                            });
                                        }
                                    } catch (ParseException e2) {
                                        if (this.tuningConfig.isReportParseExceptions()) {
                                            throw e2;
                                        }
                                        log.debug(e2, "Dropping unparseable row from partition[%d] offset[%,d].", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                                        this.fireDepartmentMetrics.incrementUnparseable();
                                    }
                                    this.nextOffsets.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1));
                                }
                                if (this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).equals(this.endOffsets.get(Integer.valueOf(consumerRecord.partition()))) && assignPartitionsAndSeekToNext.remove(Integer.valueOf(consumerRecord.partition()))) {
                                    log.info("Finished reading topic[%s], partition[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition())});
                                    assignPartitions(newConsumer, topic, assignPartitionsAndSeekToNext);
                                    z = this.ioConfig.isPauseAfterRead() || !assignPartitionsAndSeekToNext.isEmpty();
                                }
                            }
                            if (sequenceMetadata != null && !this.ioConfig.isPauseAfterRead()) {
                                Preconditions.checkArgument(this.sequences.get(this.sequences.size() - 1).getSequenceName().equals(sequenceMetadata.getSequenceName()), "Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s", new Object[]{sequenceMetadata, this.sequences});
                                requestPause(-1L);
                                if (!((Boolean) taskToolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(getDataSource(), this.ioConfig.getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceMetadata.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, this.nextOffsets))))).booleanValue()) {
                                    throw new ISE("Checkpoint request with offsets [%s] failed, dying", new Object[]{this.nextOffsets});
                                }
                            }
                        } catch (Throwable th2) {
                            log.info("Persisting all pending data", new Object[0]);
                            this.driver.persist((Committer) supplier.get());
                            throw th2;
                        }
                    }
                    log.info("Persisting all pending data", new Object[0]);
                    this.driver.persist((Committer) supplier.get());
                    synchronized (this.statusLock) {
                        if (this.stopRequested.get() && !this.publishOnStop.get()) {
                            throw new InterruptedException("Stopping without publishing");
                        }
                        this.status = Status.PUBLISHING;
                    }
                    Iterator<SequenceMetadata> it4 = this.sequences.iterator();
                    while (it4.hasNext()) {
                        SequenceMetadata next2 = it4.next();
                        if (!this.publishingSequences.contains(next2.getSequenceName())) {
                            next2.setEndOffsets(this.nextOffsets);
                            next2.updateAssignments(this.nextOffsets);
                            this.publishingSequences.add(next2.getSequenceName());
                            this.publishQueue.add(next2);
                        }
                    }
                    this.publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata());
                    this.waitForPublishes.await();
                    checkAndMaybeThrowException();
                    ArrayList<SegmentsAndMetadata> newArrayList = Lists.newArrayList();
                    if (this.tuningConfig.getHandoffConditionTimeout() == 0) {
                        newArrayList = (List) Futures.allAsList(this.handOffWaitList).get();
                    } else {
                        try {
                            newArrayList = (List) Futures.allAsList(this.handOffWaitList).get(this.tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
                        } catch (TimeoutException e3) {
                            log.makeAlert("Timed out after [%d] millis waiting for handoffs", new Object[]{Long.valueOf(this.tuningConfig.getHandoffConditionTimeout())}).addData("TaskId", getId()).emit();
                        }
                    }
                    for (SegmentsAndMetadata segmentsAndMetadata : newArrayList) {
                        if (segmentsAndMetadata == null) {
                            log.warn("Handoff failed for segments %s", new Object[]{segmentsAndMetadata.getSegments()});
                        } else {
                            log.info("Handoff completed for segments[%s] with metadata[%s].", new Object[]{Joiner.on(", ").join((Iterable) segmentsAndMetadata.getSegments().stream().map((v0) -> {
                                return v0.getIdentifier();
                            }).collect(Collectors.toList())), Preconditions.checkNotNull(segmentsAndMetadata.getCommitMetadata(), "commitMetadata")});
                        }
                    }
                    if (newConsumer != null) {
                        if (0 != 0) {
                            try {
                                newConsumer.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            newConsumer.close();
                        }
                    }
                    if (this.appenderator != null) {
                        if (this.throwableAtomicReference.get() != null) {
                            this.appenderator.closeNow();
                        } else {
                            this.appenderator.close();
                        }
                    }
                    if (this.driver != null) {
                        this.driver.close();
                    }
                    if (this.chatHandlerProvider.isPresent()) {
                        ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
                    }
                    if (this.publishExecService != null) {
                        this.publishExecService.shutdownNow();
                    }
                    taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                } catch (Throwable th4) {
                    if (newConsumer != null) {
                        if (0 != 0) {
                            try {
                                newConsumer.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            newConsumer.close();
                        }
                    }
                    throw th4;
                }
            } catch (InterruptedException | RejectedExecutionException e4) {
                this.appenderator.closeNow();
                if ((e4 instanceof RejectedExecutionException) && (e4.getCause() == null || !(e4.getCause() instanceof InterruptedException))) {
                    throw e4;
                }
                if (!this.stopRequested.get()) {
                    Thread.currentThread().interrupt();
                    throw e4;
                }
                log.info("The task was asked to stop before completing", new Object[0]);
                if (this.appenderator != null) {
                    if (this.throwableAtomicReference.get() != null) {
                        this.appenderator.closeNow();
                    } else {
                        this.appenderator.close();
                    }
                }
                if (this.driver != null) {
                    this.driver.close();
                }
                if (this.chatHandlerProvider.isPresent()) {
                    ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
                }
                if (this.publishExecService != null) {
                    this.publishExecService.shutdownNow();
                }
                taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                taskToolbox.getDataSegmentServerAnnouncer().unannounce();
            }
            return success();
        } catch (Throwable th6) {
            if (this.appenderator != null) {
                if (this.throwableAtomicReference.get() != null) {
                    this.appenderator.closeNow();
                } else {
                    this.appenderator.close();
                }
            }
            if (this.driver != null) {
                this.driver.close();
            }
            if (this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
            }
            if (this.publishExecService != null) {
                this.publishExecService.shutdownNow();
            }
            taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
            taskToolbox.getDataSegmentServerAnnouncer().unannounce();
            throw th6;
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r16v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r21v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r21v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 16, insn: 0x0830: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:247:0x0830 */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x0835: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:249:0x0835 */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x07d5: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:227:0x07d5 */
    /* JADX WARN: Not initialized variable reg: 19, insn: 0x07da: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r19 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:229:0x07da */
    /* JADX WARN: Not initialized variable reg: 20, insn: 0x077e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r20 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:211:0x077e */
    /* JADX WARN: Not initialized variable reg: 21, insn: 0x0783: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r21 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:213:0x0783 */
    /* JADX WARN: Type inference failed for: r16v1, types: [io.druid.segment.realtime.appenderator.Appenderator] */
    /* JADX WARN: Type inference failed for: r17v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r18v0, types: [io.druid.segment.realtime.appenderator.StreamAppenderatorDriver] */
    /* JADX WARN: Type inference failed for: r19v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r20v0, types: [org.apache.kafka.clients.consumer.KafkaConsumer] */
    /* JADX WARN: Type inference failed for: r21v0, types: [java.lang.Throwable] */
    private TaskStatus runLegacy(TaskToolbox taskToolbox) throws Exception {
        ?? r16;
        ?? r17;
        Appenderator newAppenderator;
        Throwable th;
        ?? r18;
        ?? r19;
        ?? r20;
        ?? r21;
        log.info("Starting up!", new Object[0]);
        this.startTime = DateTimes.nowUtc();
        this.status = Status.STARTING;
        this.toolbox = taskToolbox;
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider) this.chatHandlerProvider.get()).getClass().getName()});
            ((ChatHandlerProvider) this.chatHandlerProvider.get()).register(getId(), this, false);
        } else {
            log.warn("No chat handler detected", new Object[0]);
        }
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartment = new FireDepartment(this.dataSchema, new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null, (FirehoseFactoryV2) null), (RealtimeTuningConfig) null);
        this.fireDepartmentMetrics = fireDepartment.getMetrics();
        taskToolbox.getMonitorScheduler().addMonitor(new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment), ImmutableMap.of("taskId", new String[]{getId()})));
        LookupNodeService lookupNodeService = getContextValue("lookupTier") == null ? taskToolbox.getLookupNodeService() : new LookupNodeService((String) getContextValue("lookupTier"));
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(taskToolbox.getDruidNode(), "peon", ImmutableMap.of(taskToolbox.getDataNodeService().getName(), taskToolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService));
        try {
            try {
                try {
                    newAppenderator = newAppenderator(this.fireDepartmentMetrics, taskToolbox);
                    th = null;
                } catch (Throwable th2) {
                    if (this.chatHandlerProvider.isPresent()) {
                        ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
                    }
                    taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                    throw th2;
                }
            } catch (Throwable th3) {
                if (r16 != 0) {
                    if (r17 != 0) {
                        try {
                            r16.close();
                        } catch (Throwable th4) {
                            r17.addSuppressed(th4);
                        }
                    } else {
                        r16.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException | RejectedExecutionException e) {
            if ((e instanceof RejectedExecutionException) && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
                throw e;
            }
            if (!this.stopRequested.get()) {
                Thread.currentThread().interrupt();
                throw e;
            }
            log.info("The task was asked to stop before completing", new Object[0]);
            if (this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
            }
            taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
            taskToolbox.getDataSegmentServerAnnouncer().unannounce();
        }
        try {
            StreamAppenderatorDriver newDriver = newDriver(newAppenderator, taskToolbox, this.fireDepartmentMetrics);
            Throwable th5 = null;
            try {
                KafkaConsumer<byte[], byte[]> newConsumer = newConsumer();
                Throwable th6 = null;
                taskToolbox.getDataSegmentServerAnnouncer().announce();
                taskToolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
                this.appenderator = newAppenderator;
                String topic = this.ioConfig.getStartPartitions().getTopic();
                Object startJob = newDriver.startJob();
                if (startJob == null) {
                    this.nextOffsets.putAll(this.ioConfig.getStartPartitions().getPartitionOffsetMap());
                } else {
                    KafkaPartitions kafkaPartitions = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) startJob).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                    this.nextOffsets.putAll(kafkaPartitions.getPartitionOffsetMap());
                    if (!kafkaPartitions.getTopic().equals(this.ioConfig.getStartPartitions().getTopic())) {
                        throw new ISE("WTF?! Restored topic[%s] but expected topic[%s]", new Object[]{kafkaPartitions.getTopic(), this.ioConfig.getStartPartitions().getTopic()});
                    }
                    if (!this.nextOffsets.keySet().equals(this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
                        throw new ISE("WTF?! Restored partitions[%s] but expected partitions[%s]", new Object[]{this.nextOffsets.keySet(), this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()});
                    }
                }
                HashMap newHashMap = Maps.newHashMap();
                for (Integer num : this.nextOffsets.keySet()) {
                    newHashMap.put(num, StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), num}));
                }
                Supplier<Committer> supplier = new Supplier<Committer>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.4
                    /* renamed from: get, reason: merged with bridge method [inline-methods] */
                    public Committer m1get() {
                        final ImmutableMap copyOf = ImmutableMap.copyOf(KafkaIndexTask.this.nextOffsets);
                        return new Committer() { // from class: io.druid.indexing.kafka.KafkaIndexTask.4.1
                            public Object getMetadata() {
                                return ImmutableMap.of(KafkaIndexTask.METADATA_NEXT_PARTITIONS, new KafkaPartitions(KafkaIndexTask.this.ioConfig.getStartPartitions().getTopic(), copyOf));
                            }

                            public void run() {
                            }
                        };
                    }
                };
                Set<Integer> assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
                boolean z = !assignPartitionsAndSeekToNext.isEmpty();
                this.status = Status.READING;
                loop1: while (z) {
                    try {
                        if (possiblyPause(assignPartitionsAndSeekToNext)) {
                            assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
                            if (assignPartitionsAndSeekToNext.isEmpty()) {
                                log.info("All partitions have been fully read", new Object[0]);
                                this.publishOnStop.set(true);
                                this.stopRequested.set(true);
                            }
                        }
                        if (this.stopRequested.get()) {
                            break;
                        }
                        ConsumerRecords empty = ConsumerRecords.empty();
                        try {
                            empty = newConsumer.poll(POLL_TIMEOUT);
                        } catch (OffsetOutOfRangeException e2) {
                            log.warn("OffsetOutOfRangeException with message [%s]", new Object[]{e2.getMessage()});
                            possiblyResetOffsetsOrWait(e2.offsetOutOfRangePartitions(), newConsumer, taskToolbox);
                            z = this.ioConfig.isPauseAfterRead() || !assignPartitionsAndSeekToNext.isEmpty();
                        }
                        Iterator it = empty.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            if (log.isTraceEnabled()) {
                                log.trace("Got topic[%s] partition[%d] offset[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                            }
                            if (consumerRecord.offset() < this.endOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                                if (consumerRecord.offset() != this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                                    if (!this.ioConfig.isSkipOffsetGaps()) {
                                        throw new ISE("WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", new Object[]{Long.valueOf(consumerRecord.offset()), this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())), Integer.valueOf(consumerRecord.partition())});
                                    }
                                    log.warn("Skipped to offset[%,d] after offset[%,d] in partition[%d].", new Object[]{Long.valueOf(consumerRecord.offset()), this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())), Integer.valueOf(consumerRecord.partition())});
                                }
                                try {
                                    byte[] bArr = (byte[]) consumerRecord.value();
                                    List<InputRow> nullableListOf = bArr == null ? Utils.nullableListOf(new InputRow[]{(InputRow) null}) : this.parser.parseBatch(ByteBuffer.wrap(bArr));
                                    boolean z2 = false;
                                    HashMap hashMap = new HashMap();
                                    for (InputRow inputRow : nullableListOf) {
                                        if (inputRow != null && withinMinMaxRecordTime(inputRow)) {
                                            String str = (String) newHashMap.get(Integer.valueOf(consumerRecord.partition()));
                                            AppenderatorDriverAddResult add = newDriver.add(inputRow, str, supplier, false, false);
                                            if (!add.isOk()) {
                                                throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{inputRow.getTimestamp()});
                                                break loop1;
                                            }
                                            if (add.getNumRowsInSegment() > this.tuningConfig.getMaxRowsPerSegment()) {
                                                ((Set) hashMap.computeIfAbsent(str, str2 -> {
                                                    return new HashSet();
                                                })).add(add.getSegmentIdentifier());
                                            }
                                            z2 |= add.isPersistRequired();
                                            this.fireDepartmentMetrics.incrementProcessed();
                                        } else {
                                            this.fireDepartmentMetrics.incrementThrownAway();
                                        }
                                    }
                                    if (z2) {
                                        newDriver.persist((Committer) supplier.get());
                                    }
                                    hashMap.entrySet().forEach(entry -> {
                                        newDriver.moveSegmentOut((String) entry.getKey(), (List) ((Set) entry.getValue()).stream().collect(Collectors.toList()));
                                    });
                                } catch (ParseException e3) {
                                    if (this.tuningConfig.isReportParseExceptions()) {
                                        throw e3;
                                    }
                                    log.debug(e3, "Dropping unparseable row from partition[%d] offset[%,d].", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                                    this.fireDepartmentMetrics.incrementUnparseable();
                                }
                                this.nextOffsets.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1));
                            }
                            if (this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).equals(this.endOffsets.get(Integer.valueOf(consumerRecord.partition()))) && assignPartitionsAndSeekToNext.remove(Integer.valueOf(consumerRecord.partition()))) {
                                log.info("Finished reading topic[%s], partition[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition())});
                                assignPartitions(newConsumer, topic, assignPartitionsAndSeekToNext);
                                z = this.ioConfig.isPauseAfterRead() || !assignPartitionsAndSeekToNext.isEmpty();
                            }
                        }
                    } catch (Throwable th7) {
                        newDriver.persist((Committer) supplier.get());
                        throw th7;
                    }
                }
                newDriver.persist((Committer) supplier.get());
                synchronized (this.statusLock) {
                    if (this.stopRequested.get() && !this.publishOnStop.get()) {
                        throw new InterruptedException("Stopping without publishing");
                    }
                    this.status = Status.PUBLISHING;
                }
                ListenableFuture registerHandoff = newDriver.registerHandoff((SegmentsAndMetadata) newDriver.publish((set, obj) -> {
                    KafkaPartitions kafkaPartitions2 = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) Preconditions.checkNotNull(obj, "commitMetadata")).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                    if (!this.endOffsets.equals(kafkaPartitions2.getPartitionOffsetMap())) {
                        throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", new Object[]{obj});
                    }
                    SegmentTransactionalInsertAction segmentTransactionalInsertAction = this.ioConfig.isUseTransaction() ? new SegmentTransactionalInsertAction(set, new KafkaDataSourceMetadata(this.ioConfig.getStartPartitions()), new KafkaDataSourceMetadata(kafkaPartitions2)) : new SegmentTransactionalInsertAction(set, (DataSourceMetadata) null, (DataSourceMetadata) null);
                    log.info("Publishing with isTransaction[%s].", new Object[]{Boolean.valueOf(this.ioConfig.isUseTransaction())});
                    return ((SegmentPublishResult) taskToolbox.getTaskActionClient().submit(segmentTransactionalInsertAction)).isSuccess();
                }, (Committer) supplier.get(), newHashMap.values()).get());
                SegmentsAndMetadata segmentsAndMetadata = this.tuningConfig.getHandoffConditionTimeout() == 0 ? (SegmentsAndMetadata) registerHandoff.get() : (SegmentsAndMetadata) registerHandoff.get(this.tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
                if (segmentsAndMetadata == null) {
                    throw new ISE("Transaction failure publishing segments, aborting", new Object[0]);
                }
                log.info("Published segments[%s] with metadata[%s].", new Object[]{Joiner.on(", ").join(Iterables.transform(segmentsAndMetadata.getSegments(), new Function<DataSegment, String>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.5
                    public String apply(DataSegment dataSegment) {
                        return dataSegment.getIdentifier();
                    }
                })), Preconditions.checkNotNull(segmentsAndMetadata.getCommitMetadata(), "commitMetadata")});
                if (newConsumer != null) {
                    if (0 != 0) {
                        try {
                            newConsumer.close();
                        } catch (Throwable th8) {
                            th6.addSuppressed(th8);
                        }
                    } else {
                        newConsumer.close();
                    }
                }
                if (newDriver != null) {
                    if (0 != 0) {
                        try {
                            newDriver.close();
                        } catch (Throwable th9) {
                            th5.addSuppressed(th9);
                        }
                    } else {
                        newDriver.close();
                    }
                }
                if (newAppenderator != null) {
                    if (0 != 0) {
                        try {
                            newAppenderator.close();
                        } catch (Throwable th10) {
                            th.addSuppressed(th10);
                        }
                    } else {
                        newAppenderator.close();
                    }
                }
                if (this.chatHandlerProvider.isPresent()) {
                    ((ChatHandlerProvider) this.chatHandlerProvider.get()).unregister(getId());
                }
                taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                return success();
            } catch (Throwable th11) {
                if (r20 != 0) {
                    if (r21 != 0) {
                        try {
                            r20.close();
                        } catch (Throwable th12) {
                            r21.addSuppressed(th12);
                        }
                    } else {
                        r20.close();
                    }
                }
                throw th11;
            }
        } catch (Throwable th13) {
            if (r18 != 0) {
                if (r19 != 0) {
                    try {
                        r18.close();
                    } catch (Throwable th14) {
                        r19.addSuppressed(th14);
                    }
                } else {
                    r18.close();
                }
            }
            throw th13;
        }
    }

    private void checkAndMaybeThrowException() {
        if (this.throwableAtomicReference.get() != null) {
            Throwables.propagate(this.throwableAtomicReference.get());
        }
    }

    private void maybePersistAndPublishSequences(Supplier<Committer> supplier) throws InterruptedException {
        Iterator<SequenceMetadata> it = this.sequences.iterator();
        while (it.hasNext()) {
            SequenceMetadata next = it.next();
            next.updateAssignments(this.nextOffsets);
            if (!next.isOpen() && !this.publishingSequences.contains(next.getSequenceName())) {
                this.publishingSequences.add(next.getSequenceName());
                try {
                    log.info("Persist completed with results: [%s], adding sequence [%s] to publish queue", new Object[]{this.driver.persist((Committer) supplier.get()), next});
                    this.publishQueue.add(next);
                } catch (InterruptedException e) {
                    log.warn("Interrupted while persisting sequence [%s]", new Object[]{next});
                    throw e;
                }
            }
        }
    }

    private void restoreSequences() throws IOException {
        Preconditions.checkNotNull(this.sequencesPersistFile);
        if (this.sequencesPersistFile.exists()) {
            this.sequences = new CopyOnWriteArrayList<>((Collection) this.toolbox.getObjectMapper().readValue(this.sequencesPersistFile, new TypeReference<List<SequenceMetadata>>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.6
            }));
        }
    }

    private synchronized void persistSequences() throws IOException {
        log.info("Persisting Sequences Metadata [%s]", new Object[]{this.sequences});
        this.toolbox.getObjectMapper().writerWithType(new TypeReference<List<SequenceMetadata>>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.7
        }).writeValue(this.sequencesPersistFile, this.sequences);
    }

    public boolean canRestore() {
        return true;
    }

    private Access authorizationCheck(HttpServletRequest httpServletRequest, Action action) {
        Access authorizeResourceAction = AuthorizationUtils.authorizeResourceAction(httpServletRequest, new ResourceAction(new Resource(this.dataSchema.getDataSource(), ResourceType.DATASOURCE), action), this.authorizerMapper);
        if (authorizeResourceAction.isAllowed()) {
            return authorizeResourceAction;
        }
        throw new ForbiddenException(authorizeResourceAction.toString());
    }

    @VisibleForTesting
    Appenderator getAppenderator() {
        return this.appenderator;
    }

    public void stopGracefully() {
        log.info("Stopping gracefully (status: [%s])", new Object[]{this.status});
        this.stopRequested.set(true);
        synchronized (this.statusLock) {
            if (this.status == Status.PUBLISHING) {
                this.runThread.interrupt();
                return;
            }
            try {
                if (!this.pauseLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                    log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread", new Object[0]);
                    this.runThread.interrupt();
                    return;
                }
                try {
                    if (this.pauseRequested) {
                        this.pauseRequested = false;
                        this.shouldResume.signalAll();
                    }
                    this.pauseLock.unlock();
                    if (this.pollRetryLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                        try {
                            this.isAwaitingRetry.signalAll();
                            this.pollRetryLock.unlock();
                        } catch (Throwable th) {
                            this.pollRetryLock.unlock();
                            throw th;
                        }
                    } else {
                        log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread", new Object[0]);
                        this.runThread.interrupt();
                    }
                } catch (Throwable th2) {
                    this.pauseLock.unlock();
                    throw th2;
                }
            } catch (Exception e) {
                Throwables.propagate(e);
            }
        }
    }

    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return this.appenderator == null ? new NoopQueryRunner() : new QueryRunner<T>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.8
            public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> map) {
                return queryPlus.run(KafkaIndexTask.this.appenderator, map);
            }
        };
    }

    @POST
    @Path("/stop")
    public Response stop(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.WRITE);
        stopGracefully();
        return Response.status(Response.Status.OK).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/status")
    public Status getStatusHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return this.status;
    }

    public Status getStatus() {
        return this.status;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/current")
    public Map<Integer, Long> getCurrentOffsets(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getCurrentOffsets();
    }

    public Map<Integer, Long> getCurrentOffsets() {
        return this.nextOffsets;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/end")
    public Map<Integer, Long> getEndOffsetsHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getEndOffsets();
    }

    public Map<Integer, Long> getEndOffsets() {
        return this.endOffsets;
    }

    @Path("/offsets/end")
    @Consumes({"application/json"})
    @POST
    @Produces({"application/json"})
    public Response setEndOffsetsHTTP(Map<Integer, Long> map, @QueryParam("resume") @DefaultValue("false") boolean z, @QueryParam("finish") @DefaultValue("true") boolean z2, @Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return setEndOffsets(map, z, z2);
    }

    public Response setEndOffsets(Map<Integer, Long> map, boolean z, boolean z2) throws InterruptedException {
        SequenceMetadata sequenceMetadata;
        if (this.useLegacy) {
            return setEndOffsetsLegacy(map, z);
        }
        if (map == null) {
            return Response.status(Response.Status.BAD_REQUEST).entity("Request body must contain a map of { partition:endOffset }").build();
        }
        try {
            if (!this.endOffsets.keySet().containsAll(map.keySet())) {
                return Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("Request contains partitions not being handled by this task, my partitions: %s", new Object[]{this.endOffsets.keySet()})).build();
            }
            try {
                this.pauseLock.lockInterruptibly();
                Preconditions.checkState(this.sequences.size() > 0, "WTH?! No Sequences found to set end offsets");
                sequenceMetadata = this.sequences.get(this.sequences.size() - 1);
            } catch (Exception e) {
                log.error(e, "Unable to set end offsets, dying", new Object[0]);
                this.throwableAtomicReference.set(e);
                Throwables.propagate(e);
                this.pauseLock.unlock();
            }
            if ((sequenceMetadata.getStartOffsets().equals(map) && !z2) || (sequenceMetadata.getEndOffsets().equals(map) && z2)) {
                log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", new Object[]{this.sequences});
                Response build = Response.ok(map).build();
                this.pauseLock.unlock();
                return build;
            }
            if (sequenceMetadata.isCheckpointed() && !this.ioConfig.isPauseAfterRead()) {
                Response build2 = Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]", new Object[]{sequenceMetadata, map})).build();
                this.pauseLock.unlock();
                return build2;
            }
            if (!isPaused()) {
                Response build3 = Response.status(Response.Status.BAD_REQUEST).entity("Task must be paused before changing the end offsets").build();
                this.pauseLock.unlock();
                return build3;
            }
            for (Map.Entry<Integer, Long> entry : map.entrySet()) {
                if (entry.getValue().compareTo(this.nextOffsets.get(entry.getKey())) < 0) {
                    Response build4 = Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("End offset must be >= current offset for partition [%s] (current: %s)", new Object[]{entry.getKey(), this.nextOffsets.get(entry.getKey())})).build();
                    this.pauseLock.unlock();
                    return build4;
                }
            }
            sequenceMetadata.setEndOffsets(map);
            if (z2) {
                log.info("Updating endOffsets from [%s] to [%s]", new Object[]{this.endOffsets, map});
                this.endOffsets.putAll(map);
            } else {
                Preconditions.checkState(!this.ioConfig.isPauseAfterRead());
                this.sequences.add(new SequenceMetadata(sequenceMetadata.getSequenceId() + 1, StringUtils.format("%s_%d", new Object[]{this.ioConfig.getBaseSequenceName(), Integer.valueOf(sequenceMetadata.getSequenceId() + 1)}), map, this.maxEndOffsets, false));
            }
            persistSequences();
            this.pauseLock.unlock();
            if (z) {
                resume();
            }
            return Response.ok(map).build();
        } catch (Throwable th) {
            this.pauseLock.unlock();
            throw th;
        }
    }

    private Response setEndOffsetsLegacy(Map<Integer, Long> map, boolean z) throws InterruptedException {
        if (map == null) {
            return Response.status(Response.Status.BAD_REQUEST).entity("Request body must contain a map of { partition:endOffset }").build();
        }
        if (!this.endOffsets.keySet().containsAll(map.keySet())) {
            return Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("Request contains partitions not being handled by this task, my partitions: %s", new Object[]{this.endOffsets.keySet()})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            if (!isPaused()) {
                Response build = Response.status(Response.Status.BAD_REQUEST).entity("Task must be paused before changing the end offsets").build();
                this.pauseLock.unlock();
                return build;
            }
            for (Map.Entry<Integer, Long> entry : map.entrySet()) {
                if (entry.getValue().compareTo(this.nextOffsets.get(entry.getKey())) < 0) {
                    Response build2 = Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("End offset must be >= current offset for partition [%s] (current: %s)", new Object[]{entry.getKey(), this.nextOffsets.get(entry.getKey())})).build();
                    this.pauseLock.unlock();
                    return build2;
                }
            }
            this.endOffsets.putAll(map);
            log.info("endOffsets changed to %s", new Object[]{this.endOffsets});
            this.pauseLock.unlock();
            if (z) {
                resume();
            }
            return Response.ok(this.endOffsets).build();
        } catch (Throwable th) {
            this.pauseLock.unlock();
            throw th;
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("/checkpoints")
    public Map<Integer, Map<Integer, Long>> getCheckpointsHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getCheckpoints();
    }

    public Map<Integer, Map<Integer, Long>> getCheckpoints() {
        TreeMap treeMap = new TreeMap();
        treeMap.putAll((Map) this.sequences.stream().collect(Collectors.toMap((v0) -> {
            return v0.getSequenceId();
        }, (v0) -> {
            return v0.getStartOffsets();
        })));
        return treeMap;
    }

    @POST
    @Produces({"application/json"})
    @Path("/pause")
    public Response pauseHTTP(@QueryParam("timeout") @DefaultValue("0") long j, @Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return pause(j);
    }

    public Response pause(long j) throws InterruptedException {
        if (this.status != Status.PAUSED && this.status != Status.READING) {
            return Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", new Object[]{this.status})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseMillis = j <= 0 ? -1L : j;
            this.pauseRequested = true;
            this.pollRetryLock.lockInterruptibly();
            try {
                this.isAwaitingRetry.signalAll();
                this.pollRetryLock.unlock();
                if (isPaused()) {
                    this.shouldResume.signalAll();
                }
                long nanos = TimeUnit.SECONDS.toNanos(2L);
                while (!isPaused()) {
                    if (nanos <= 0) {
                        Response build = Response.status(Response.Status.ACCEPTED).entity("Request accepted but task has not yet paused").build();
                        this.pauseLock.unlock();
                        return build;
                    }
                    nanos = this.hasPaused.awaitNanos(nanos);
                }
                try {
                    return Response.ok().entity(this.toolbox.getObjectMapper().writeValueAsString(getCurrentOffsets())).build();
                } catch (JsonProcessingException e) {
                    throw Throwables.propagate(e);
                }
            } catch (Throwable th) {
                this.pollRetryLock.unlock();
                throw th;
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @POST
    @Path("/resume")
    public Response resumeHTTP(@Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        resume();
        return Response.status(Response.Status.OK).build();
    }

    public void resume() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = false;
            this.shouldResume.signalAll();
            long nanos = TimeUnit.SECONDS.toNanos(5L);
            while (isPaused()) {
                if (nanos <= 0) {
                    throw new RuntimeException("Resume command was not accepted within 5 seconds");
                }
                nanos = this.shouldResume.awaitNanos(nanos);
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("/time/start")
    public DateTime getStartTime(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return this.startTime;
    }

    @VisibleForTesting
    FireDepartmentMetrics getFireDepartmentMetrics() {
        return this.fireDepartmentMetrics;
    }

    private boolean isPaused() {
        return this.status == Status.PAUSED;
    }

    private void requestPause(long j) {
        this.pauseMillis = j;
        this.pauseRequested = true;
    }

    private Appenderator newAppenderator(FireDepartmentMetrics fireDepartmentMetrics, TaskToolbox taskToolbox) {
        return Appenderators.createRealtime(this.dataSchema, this.tuningConfig.withBasePersistDirectory(taskToolbox.getPersistDir()), fireDepartmentMetrics, taskToolbox.getSegmentPusher(), taskToolbox.getObjectMapper(), taskToolbox.getIndexIO(), taskToolbox.getIndexMergerV9(), taskToolbox.getQueryRunnerFactoryConglomerate(), taskToolbox.getSegmentAnnouncer(), taskToolbox.getEmitter(), taskToolbox.getQueryExecutorService(), taskToolbox.getCache(), taskToolbox.getCacheConfig());
    }

    private StreamAppenderatorDriver newDriver(Appenderator appenderator, TaskToolbox taskToolbox, FireDepartmentMetrics fireDepartmentMetrics) {
        return new StreamAppenderatorDriver(appenderator, new ActionBasedSegmentAllocator(taskToolbox.getTaskActionClient(), this.dataSchema), taskToolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(taskToolbox.getTaskActionClient()), taskToolbox.getObjectMapper(), fireDepartmentMetrics);
    }

    private KafkaConsumer<byte[], byte[]> newConsumer() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            Properties properties = new Properties();
            for (Map.Entry<String, String> entry : this.ioConfig.getConsumerProperties().entrySet()) {
                properties.setProperty(entry.getKey(), entry.getValue());
            }
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.offset.reset", "none");
            properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
            properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private static void assignPartitions(KafkaConsumer kafkaConsumer, String str, Set<Integer> set) {
        kafkaConsumer.assign(Lists.newArrayList((Iterable) set.stream().map(num -> {
            return new TopicPartition(str, num.intValue());
        }).collect(Collectors.toList())));
    }

    private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer kafkaConsumer, String str) {
        HashSet newHashSet = Sets.newHashSet();
        for (Map.Entry<Integer, Long> entry : this.nextOffsets.entrySet()) {
            long longValue = this.endOffsets.get(entry.getKey()).longValue();
            if (entry.getValue().longValue() < longValue) {
                newHashSet.add(entry.getKey());
            } else {
                if (entry.getValue().longValue() != longValue) {
                    throw new ISE("WTF?! Cannot start from offset[%,d] > endOffset[%,d]", new Object[]{entry.getValue(), Long.valueOf(longValue)});
                }
                log.info("Finished reading partition[%d].", new Object[]{entry.getKey()});
            }
        }
        assignPartitions(kafkaConsumer, str, newHashSet);
        Iterator it = newHashSet.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            long longValue2 = this.nextOffsets.get(Integer.valueOf(intValue)).longValue();
            log.info("Seeking partition[%d] to offset[%,d].", new Object[]{Integer.valueOf(intValue), Long.valueOf(longValue2)});
            kafkaConsumer.seek(new TopicPartition(str, intValue), longValue2);
        }
        return newHashSet;
    }

    private boolean possiblyPause(Set<Integer> set) throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            if (this.ioConfig.isPauseAfterRead() && set.isEmpty()) {
                this.pauseMillis = -1L;
                this.pauseRequested = true;
            }
            if (!this.pauseRequested) {
                this.pauseLock.unlock();
                return false;
            }
            this.status = Status.PAUSED;
            long j = 0;
            this.hasPaused.signalAll();
            while (this.pauseRequested) {
                if (this.pauseMillis == -1) {
                    log.info("Pausing ingestion until resumed", new Object[0]);
                    this.shouldResume.await();
                } else {
                    if (this.pauseMillis > 0) {
                        log.info("Pausing ingestion for [%,d] ms", new Object[]{Long.valueOf(this.pauseMillis)});
                        j = TimeUnit.MILLISECONDS.toNanos(this.pauseMillis);
                        this.pauseMillis = 0L;
                    }
                    if (j <= 0) {
                        this.pauseRequested = false;
                    }
                    j = this.shouldResume.awaitNanos(j);
                }
            }
            this.status = Status.READING;
            this.shouldResume.signalAll();
            log.info("Ingestion loop resumed", new Object[0]);
            this.pauseLock.unlock();
            return true;
        } catch (Throwable th) {
            this.pauseLock.unlock();
            throw th;
        }
    }

    private void possiblyResetOffsetsOrWait(Map<TopicPartition, Long> map, KafkaConsumer<byte[], byte[]> kafkaConsumer, TaskToolbox taskToolbox) throws InterruptedException, IOException {
        HashMap newHashMap = Maps.newHashMap();
        boolean z = false;
        if (this.tuningConfig.isResetOffsetAutomatically()) {
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                TopicPartition key = entry.getKey();
                long longValue = entry.getValue().longValue();
                kafkaConsumer.seekToBeginning(Collections.singletonList(key));
                long position = kafkaConsumer.position(key);
                kafkaConsumer.seek(key, longValue);
                if (position > longValue) {
                    z = true;
                    newHashMap.put(key, Long.valueOf(longValue));
                }
            }
        }
        if (z) {
            sendResetRequestAndWait(newHashMap, taskToolbox);
            return;
        }
        log.warn("Retrying in %dms", new Object[]{Long.valueOf(this.pollRetryMs)});
        this.pollRetryLock.lockInterruptibly();
        try {
            long nanos = TimeUnit.MILLISECONDS.toNanos(this.pollRetryMs);
            while (nanos > 0) {
                if (this.pauseRequested || this.stopRequested.get()) {
                    break;
                } else {
                    nanos = this.isAwaitingRetry.awaitNanos(nanos);
                }
            }
        } finally {
            this.pollRetryLock.unlock();
        }
    }

    private void sendResetRequestAndWait(Map<TopicPartition, Long> map, TaskToolbox taskToolbox) throws IOException {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            newHashMap.put(Integer.valueOf(entry.getKey().partition()), entry.getValue());
        }
        if (!((Boolean) taskToolbox.getTaskActionClient().submit(new ResetDataSourceMetadataAction(getDataSource(), new KafkaDataSourceMetadata(new KafkaPartitions(this.ioConfig.getStartPartitions().getTopic(), newHashMap))))).booleanValue()) {
            log.makeAlert("Failed to send reset request for partitions [%s]", new Object[]{newHashMap.keySet()}).emit();
        } else {
            log.makeAlert("Resetting Kafka offsets for datasource [%s]", new Object[]{getDataSource()}).addData("partitions", newHashMap.keySet()).emit();
            requestPause(-1L);
        }
    }

    private boolean withinMinMaxRecordTime(InputRow inputRow) {
        boolean z = this.ioConfig.getMinimumMessageTime().isPresent() && ((DateTime) this.ioConfig.getMinimumMessageTime().get()).isAfter(inputRow.getTimestamp());
        boolean z2 = this.ioConfig.getMaximumMessageTime().isPresent() && ((DateTime) this.ioConfig.getMaximumMessageTime().get()).isBefore(inputRow.getTimestamp());
        if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
            String format = StringUtils.format("Encountered row with timestamp that cannot be represented as a long: [%s]", new Object[]{inputRow});
            log.debug(format, new Object[0]);
            if (this.tuningConfig.isReportParseExceptions()) {
                throw new ParseException(format, new Object[0]);
            }
            return false;
        }
        if (log.isDebugEnabled()) {
            if (z) {
                log.debug("CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", new Object[]{inputRow.getTimestamp(), this.ioConfig.getMinimumMessageTime().get()});
            } else if (z2) {
                log.debug("CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", new Object[]{inputRow.getTimestamp(), this.ioConfig.getMaximumMessageTime().get()});
            }
        }
        return (z || z2) ? false : true;
    }
}
