package org.apache.spark.sql.pulsar;

import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: PulsarSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-g!B\u000f\u001f\u0001yA\u0003\u0002C \u0001\u0005\u0003\u0005\u000b\u0011B!\t\u0011\u0015\u0003!\u0011!Q\u0001\n\u0019C\u0001B\u0013\u0001\u0003\u0002\u0003\u0006Ia\u0013\u0005\t=\u0002\u0011\t\u0011)A\u0005\u0017\"Aq\f\u0001B\u0001B\u0003%\u0011\u000b\u0003\u0005a\u0001\t\u0005\t\u0015!\u0003b\u0011!!\u0007A!A!\u0002\u0013)\u0007\u0002C5\u0001\u0005\u0003\u0005\u000b\u0011\u00026\t\u00115\u0004!\u0011!Q\u0001\nEC\u0001B\u001c\u0001\u0003\u0002\u0003\u0006Ia\u001c\u0005\u0006o\u0002!\t\u0001\u001f\u0005\n\u0003\u0013\u0001!\u0019!C\u0005\u0003\u0017A\u0001\"!\u0006\u0001A\u0003%\u0011Q\u0002\u0005\n\u0003/\u0001!\u0019!C\u0001\u00033A\u0001\"a\n\u0001A\u0003%\u00111\u0004\u0005\n\u0003S\u0001\u0001\u0019!C\u0005\u0003WA\u0011\"!\f\u0001\u0001\u0004%I!a\f\t\u000f\u0005U\u0002\u0001)Q\u0005U\"Q\u0011q\u0007\u0001\t\u0006\u0004%I!!\u000f\t\u0013\u0005\u0005\u0003\u00011A\u0005\n\u0005\r\u0003\"CA1\u0001\u0001\u0007I\u0011BA2\u0011!\t9\u0007\u0001Q!\n\u0005\u0015\u0003BCA5\u0001!\u0015\r\u0011\"\u0001\u0002l!9\u00111\u000f\u0001\u0005B\u0005u\u0004bBAF\u0001\u0011\u0005\u0013Q\u0012\u0005\b\u0003/\u0003A\u0011IAM\u0011\u001d\t\t\r\u0001C!\u0003\u0007Dq!a2\u0001\t\u0003\nIM\u0001\u0007Qk2\u001c\u0018M]*pkJ\u001cWM\u0003\u0002 A\u00051\u0001/\u001e7tCJT!!\t\u0012\u0002\u0007M\fHN\u0003\u0002$I\u0005)1\u000f]1sW*\u0011QEJ\u0001\u0007CB\f7\r[3\u000b\u0003\u001d\n1a\u001c:h'\u0011\u0001\u0011&M\u001d\u0011\u0005)zS\"A\u0016\u000b\u00051j\u0013\u0001\u00027b]\u001eT\u0011AL\u0001\u0005U\u00064\u0018-\u0003\u00021W\t1qJ\u00196fGR\u0004\"AM\u001c\u000e\u0003MR!\u0001N\u001b\u0002\u0013M$(/Z1nS:<'B\u0001\u001c!\u0003%)\u00070Z2vi&|g.\u0003\u00029g\t11k\\;sG\u0016\u0004\"AO\u001f\u000e\u0003mR!\u0001\u0010\u0012\u0002\u0011%tG/\u001a:oC2L!AP\u001e\u0003\u000f1{wmZ5oO\u0006Q1/\u001d7D_:$X\r\u001f;\u0004\u0001A\u0011!iQ\u0007\u0002A%\u0011A\t\t\u0002\u000b'Fc5i\u001c8uKb$\u0018AD7fi\u0006$\u0017\r^1SK\u0006$WM\u001d\t\u0003\u000f\"k\u0011AH\u0005\u0003\u0013z\u0011A\u0003U;mg\u0006\u0014X*\u001a;bI\u0006$\u0018MU3bI\u0016\u0014\u0018AC2mS\u0016tGoQ8oMB!AjT)*\u001b\u0005i%B\u0001(.\u0003\u0011)H/\u001b7\n\u0005Ak%aA'baB\u0011!k\u0017\b\u0003'f\u0003\"\u0001V,\u000e\u0003US!A\u0016!\u0002\rq\u0012xn\u001c;?\u0015\u0005A\u0016!B:dC2\f\u0017B\u0001.X\u0003\u0019\u0001&/\u001a3fM&\u0011A,\u0018\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005i;\u0016A\u0003:fC\u0012,'oQ8oM\u0006aQ.\u001a;bI\u0006$\u0018\rU1uQ\u0006y1\u000f^1si&twm\u00144gg\u0016$8\u000f\u0005\u0002HE&\u00111M\b\u0002\u000f!\u0016\u0014Hk\u001c9jG>3gm]3u\u00035\u0001x\u000e\u001c7US6,w.\u001e;NgB\u0011amZ\u0007\u0002/&\u0011\u0001n\u0016\u0002\u0004\u0013:$\u0018A\u00044bS2|e\u000eR1uC2{7o\u001d\t\u0003M.L!\u0001\\,\u0003\u000f\t{w\u000e\\3b]\u000612/\u001e2tGJL\u0007\u000f^5p]:\u000bW.\u001a)sK\u001aL\u00070A\u0006kg>tw\n\u001d;j_:\u001c\bC\u00019v\u001b\u0005\t(B\u0001:t\u0003\u0011Q7o\u001c8\u000b\u0005Q\u0004\u0013\u0001C2bi\u0006d\u0017p\u001d;\n\u0005Y\f(!\u0005&T\u001f:{\u0005\u000f^5p]NLeNU3bI\u00061A(\u001b8jiz\"r\"\u001f>|yvtx0!\u0001\u0002\u0004\u0005\u0015\u0011q\u0001\t\u0003\u000f\u0002AQaP\u0006A\u0002\u0005CQ!R\u0006A\u0002\u0019CQAS\u0006A\u0002-CQAX\u0006A\u0002-CQaX\u0006A\u0002ECQ\u0001Y\u0006A\u0002\u0005DQ\u0001Z\u0006A\u0002\u0015DQ![\u0006A\u0002)DQ!\\\u0006A\u0002ECQA\\\u0006A\u0002=\f!a]2\u0016\u0005\u00055\u0001\u0003BA\b\u0003#i\u0011AI\u0005\u0004\u0003'\u0011#\u0001D*qCJ\\7i\u001c8uKb$\u0018aA:dA\u0005q!/\u001a9peR$\u0015\r^1M_N\u001cXCAA\u000e!\u00191\u0017QD)\u0002\"%\u0019\u0011qD,\u0003\u0013\u0019+hn\u0019;j_:\f\u0004c\u00014\u0002$%\u0019\u0011QE,\u0003\tUs\u0017\u000e^\u0001\u0010e\u0016\u0004xN\u001d;ECR\fGj\\:tA\u000591\u000f^8qa\u0016$W#\u00016\u0002\u0017M$x\u000e\u001d9fI~#S-\u001d\u000b\u0005\u0003C\t\t\u0004\u0003\u0005\u00024E\t\t\u00111\u0001k\u0003\rAH%M\u0001\tgR|\u0007\u000f]3eA\u0005\u0019\u0012N\\5uS\u0006dGk\u001c9jG>3gm]3ugV\u0011\u00111\b\t\u0004\u000f\u0006u\u0012bAA =\t!2\u000b]3dS\u001aL7\rU;mg\u0006\u0014xJ\u001a4tKR\f1cY;se\u0016tG\u000fV8qS\u000e|eMZ:fiN,\"!!\u0012\u0011\u000b\u0019\f9%a\u0013\n\u0007\u0005%sK\u0001\u0004PaRLwN\u001c\t\u0007%\u00065\u0013+a\u0014\n\u0005Ak\u0006\u0003BA)\u0003;j!!a\u0015\u000b\t\u0005U\u0013qK\u0001\u0004CBL'\u0002BA-\u00037\naa\u00197jK:$(BA\u0010%\u0013\u0011\ty&a\u0015\u0003\u00135+7o]1hK&#\u0017aF2veJ,g\u000e\u001e+pa&\u001cwJ\u001a4tKR\u001cx\fJ3r)\u0011\t\t#!\u001a\t\u0013\u0005MR#!AA\u0002\u0005\u0015\u0013\u0001F2veJ,g\u000e\u001e+pa&\u001cwJ\u001a4tKR\u001c\b%\u0001\u0007qk2\u001c\u0018M]*dQ\u0016l\u0017-\u0006\u0002\u0002nA!\u0011qNA=\u001b\t\t\tH\u0003\u0003\u0002t\u0005U\u0014AB:dQ\u0016l\u0017M\u0003\u0003\u0002x\u0005m\u0013AB2p[6|g.\u0003\u0003\u0002|\u0005E$AC*dQ\u0016l\u0017-\u00138g_R\u0011\u0011q\u0010\t\u0005\u0003\u0003\u000b9)\u0004\u0002\u0002\u0004*\u0019\u0011Q\u0011\u0011\u0002\u000bQL\b/Z:\n\t\u0005%\u00151\u0011\u0002\u000b'R\u0014Xo\u0019;UsB,\u0017!C4fi>3gm]3u+\t\ty\tE\u0003g\u0003\u000f\n\t\nE\u00023\u0003'K1!!&4\u0005\u0019yeMZ:fi\u0006Aq-\u001a;CCR\u001c\u0007\u000e\u0006\u0004\u0002\u001c\u0006e\u0016Q\u0018\t\u0005\u0003;\u000b\u0019L\u0004\u0003\u0002 \u0006=f\u0002BAQ\u0003[sA!a)\u0002,:!\u0011QUAU\u001d\r!\u0016qU\u0005\u0002O%\u0011QEJ\u0005\u0003G\u0011J!!\t\u0012\n\u0007\u0005E\u0006%A\u0004qC\u000e\\\u0017mZ3\n\t\u0005U\u0016q\u0017\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T1!!-!\u0011\u001d\tYL\u0007a\u0001\u0003\u001f\u000bQa\u001d;beRDq!a0\u001b\u0001\u0004\t\t*A\u0002f]\u0012\faaY8n[&$H\u0003BA\u0011\u0003\u000bDq!a0\u001c\u0001\u0004\t\t*\u0001\u0003ti>\u0004HCAA\u0011\u0001")
/* loaded from: input_file:org/apache/spark/sql/pulsar/PulsarSource.class */
public class PulsarSource implements Source, Logging {
    private SpecificPulsarOffset initialTopicOffsets;
    private SchemaInfo pulsarSchema;
    private final SQLContext sqlContext;
    private final PulsarMetadataReader metadataReader;
    private final Map<String, Object> clientConf;
    private final Map<String, Object> readerConf;
    private final String metadataPath;
    private final PerTopicOffset startingOffsets;
    private final int pollTimeoutMs;
    private final boolean failOnDataLoss;
    private final String subscriptionNamePrefix;
    private final JSONOptionsInRead jsonOptions;
    private final SparkContext sc;
    private final Function1<String, BoxedUnit> reportDataLoss;
    private boolean stopped;
    private Option<scala.collection.immutable.Map<String, MessageId>> currentTopicOffsets;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Offset initialOffset() {
        return Source.initialOffset$(this);
    }

    public Offset deserializeOffset(String str) {
        return Source.deserializeOffset$(this, str);
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private SparkContext sc() {
        return this.sc;
    }

    public Function1<String, BoxedUnit> reportDataLoss() {
        return this.reportDataLoss;
    }

    private boolean stopped() {
        return this.stopped;
    }

    private void stopped_$eq(boolean z) {
        this.stopped = z;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.spark.sql.pulsar.PulsarSource] */
    private SpecificPulsarOffset initialTopicOffsets$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.initialTopicOffsets = new PulsarSourceInitialOffsetWriter(this.sqlContext.sparkSession(), this.metadataPath).getInitialOffset(this.metadataReader, this.startingOffsets, this.pollTimeoutMs, reportDataLoss());
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        this.startingOffsets = null;
        return this.initialTopicOffsets;
    }

    private SpecificPulsarOffset initialTopicOffsets() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? initialTopicOffsets$lzycompute() : this.initialTopicOffsets;
    }

    private Option<scala.collection.immutable.Map<String, MessageId>> currentTopicOffsets() {
        return this.currentTopicOffsets;
    }

    private void currentTopicOffsets_$eq(Option<scala.collection.immutable.Map<String, MessageId>> option) {
        this.currentTopicOffsets = option;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.pulsar.PulsarSource] */
    private SchemaInfo pulsarSchema$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.pulsarSchema = this.metadataReader.getPulsarSchema();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.pulsarSchema;
    }

    public SchemaInfo pulsarSchema() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? pulsarSchema$lzycompute() : this.pulsarSchema;
    }

    public StructType schema() {
        return SchemaUtils$.MODULE$.pulsarSourceSchema(pulsarSchema());
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        initialTopicOffsets();
        SpecificPulsarOffset fetchLatestOffsets = this.metadataReader.fetchLatestOffsets();
        currentTopicOffsets_$eq(new Some(fetchLatestOffsets.topicOffsets()));
        logDebug(() -> {
            return new StringBuilder(11).append("GetOffset: ").append(((SeqLike) fetchLatestOffsets.topicOffsets().toSeq().map(tuple2 -> {
                return tuple2.toString();
            }, Seq$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$)).toString();
        });
        return new Some(fetchLatestOffsets);
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> option, org.apache.spark.sql.execution.streaming.Offset offset) {
        scala.collection.immutable.Map<String, MessageId> map;
        initialTopicOffsets();
        logInfo(() -> {
            return new StringBuilder(37).append("getBatch called with start = ").append(option).append(", end = ").append(offset).toString();
        });
        scala.collection.immutable.Map<String, MessageId> topicOffsets = SpecificPulsarOffset$.MODULE$.getTopicOffsets(offset);
        if (currentTopicOffsets().isEmpty()) {
            currentTopicOffsets_$eq(new Some(topicOffsets));
        }
        if (option.isDefined()) {
            Object obj = option.get();
            if (obj != null ? obj.equals(offset) : offset == null) {
                return this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), schema(), true);
            }
        }
        if (option instanceof Some) {
            map = SpecificPulsarOffset$.MODULE$.getTopicOffsets((org.apache.spark.sql.execution.streaming.Offset) ((Some) option).value());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            map = initialTopicOffsets().topicOffsets();
        }
        scala.collection.immutable.Map<String, MessageId> map2 = map;
        String[] sortedExecutorList = PulsarSourceUtils$.MODULE$.getSortedExecutorList(sc());
        int length = sortedExecutorList.length;
        scala.collection.immutable.Map<String, MessageId> fetchEarliestOffsets = this.metadataReader.fetchEarliestOffsets(topicOffsets.keySet().diff(map2.keySet()).toSeq());
        Set diff = map2.keySet().diff(topicOffsets.keySet());
        if (diff.nonEmpty()) {
            reportDataLoss().apply(new StringBuilder(41).append(diff).append(" are gone. Some data may have been missed").toString());
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        scala.collection.immutable.Map $plus$plus = map2.$plus$plus(fetchEarliestOffsets);
        Seq seq = ((SetLike) ((TraversableLike) topicOffsets.keySet().map(str -> {
            return PulsarOffsetRange$.MODULE$.apply(str, (MessageId) $plus$plus.getOrElse(str, () -> {
                throw new IllegalStateException(new StringBuilder(27).append(str).append(" doesn't have a from offset").toString());
            }), (MessageId) topicOffsets.apply(str), length > 0 ? new Some(sortedExecutorList[Math.floorMod(str.hashCode(), length)]) : None$.MODULE$);
        }, Set$.MODULE$.canBuildFrom())).filter(pulsarOffsetRange -> {
            return BoxesRunTime.boxToBoolean($anonfun$getBatch$4(this, pulsarOffsetRange));
        })).toSeq();
        PulsarSourceRDD pulsarSourceRDD = new PulsarSourceRDD(sc(), new SchemaInfoSerializable(pulsarSchema()), this.clientConf, this.readerConf, seq, this.pollTimeoutMs, this.failOnDataLoss, this.subscriptionNamePrefix, this.jsonOptions);
        logInfo(() -> {
            return new StringBuilder(41).append("GetBatch generating RDD of offset range: ").append(((TraversableOnce) seq.sortBy(pulsarOffsetRange2 -> {
                return pulsarOffsetRange2.topic();
            }, Ordering$String$.MODULE$)).mkString(", ")).toString();
        });
        return this.sqlContext.internalCreateDataFrame(pulsarSourceRDD.setName("pulsar"), schema(), true);
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset offset) {
        this.metadataReader.commitCursorToOffset(SpecificPulsarOffset$.MODULE$.getTopicOffsets(offset));
    }

    public synchronized void stop() {
        if (stopped()) {
            return;
        }
        this.metadataReader.removeCursor();
        this.metadataReader.close();
        stopped_$eq(true);
    }

    public static final /* synthetic */ boolean $anonfun$getBatch$4(PulsarSource pulsarSource, PulsarOffsetRange pulsarOffsetRange) {
        if (pulsarOffsetRange.untilOffset().compareTo(pulsarOffsetRange.fromOffset()) < 0) {
            MessageIdImpl messageIdImpl = (MessageIdImpl) pulsarOffsetRange.fromOffset();
            MessageId messageId = MessageId.latest;
            if (messageIdImpl != null ? !messageIdImpl.equals(messageId) : messageId != null) {
                pulsarSource.reportDataLoss().apply(new StringBuilder(64).append(pulsarOffsetRange.topic()).append("'s offset was changed ").append("from ").append(pulsarOffsetRange.fromOffset()).append(" to ").append(pulsarOffsetRange.untilOffset()).append(", ").append("some data might has been missed").toString());
                return false;
            }
        }
        if (pulsarOffsetRange.untilOffset().compareTo(pulsarOffsetRange.fromOffset()) < 0) {
            MessageIdImpl messageIdImpl2 = (MessageIdImpl) pulsarOffsetRange.fromOffset();
            MessageId messageId2 = MessageId.latest;
            if (messageIdImpl2 != null ? messageIdImpl2.equals(messageId2) : messageId2 == null) {
                return false;
            }
        }
        return true;
    }

    public PulsarSource(SQLContext sQLContext, PulsarMetadataReader pulsarMetadataReader, Map<String, Object> map, Map<String, Object> map2, String str, PerTopicOffset perTopicOffset, int i, boolean z, String str2, JSONOptionsInRead jSONOptionsInRead) {
        this.sqlContext = sQLContext;
        this.metadataReader = pulsarMetadataReader;
        this.clientConf = map;
        this.readerConf = map2;
        this.metadataPath = str;
        this.startingOffsets = perTopicOffset;
        this.pollTimeoutMs = i;
        this.failOnDataLoss = z;
        this.subscriptionNamePrefix = str2;
        this.jsonOptions = jSONOptionsInRead;
        Source.$init$(this);
        Logging.$init$(this);
        this.sc = sQLContext.sparkContext();
        this.reportDataLoss = PulsarSourceUtils$.MODULE$.reportDataLossFunc(z);
        this.stopped = false;
        this.currentTopicOffsets = None$.MODULE$;
    }
}
