package org.apache.spark.sql.pulsar;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.AnalysisException$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.CalendarIntervalType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.ObjectType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType$;
import org.apache.spark.sql.types.UserDefinedType;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: PulsarSinks.scala */
/* loaded from: input_file:org/apache/spark/sql/pulsar/PulsarSinks$.class */
public final class PulsarSinks$ implements Logging {
    public static PulsarSinks$ MODULE$;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new PulsarSinks$();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public void checkForUnsupportedType(Seq<DataType> seq) {
        seq.map(dataType -> {
            $anonfun$checkForUnsupportedType$1(dataType);
            return BoxedUnit.UNIT;
        }, Seq$.MODULE$.canBuildFrom());
    }

    public void validateQuery(Seq<Attribute> seq, Option<String> option) {
        if (!StringType$.MODULE$.equals(((Expression) seq.find(attribute -> {
            return BoxesRunTime.boxToBoolean($anonfun$validateQuery$1(attribute));
        }).getOrElse(() -> {
            if (option.isEmpty()) {
                throw new AnalysisException(new StringBuilder(91).append("topic option required when no ").append("'").append(PulsarOptions$.MODULE$.TopicAttributeName()).append("' attribute is present. Use the ").append(PulsarOptions$.MODULE$.TopicSingle()).append(" option for setting a topic.").toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
            }
            return new Literal(option.get(), StringType$.MODULE$);
        })).dataType())) {
            throw new AnalysisException(new StringBuilder(21).append("Topic type must be a ").append(StringType$.MODULE$.catalogString()).toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        DataType dataType = ((Expression) seq.find(attribute2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$validateQuery$3(attribute2));
        }).getOrElse(() -> {
            return new Literal((Object) null, StringType$.MODULE$);
        })).dataType();
        if (!(StringType$.MODULE$.equals(dataType) ? true : BinaryType$.MODULE$.equals(dataType))) {
            throw new AnalysisException(new StringBuilder(30).append(PulsarOptions$.MODULE$.KeyAttributeName()).append(" attribute type ").append("must be a ").append(StringType$.MODULE$.catalogString()).append(" or ").append(BinaryType$.MODULE$.catalogString()).toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
        }
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        DataType dataType2 = ((Expression) seq.find(attribute3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$validateQuery$5(attribute3));
        }).getOrElse(() -> {
            return new Literal((Object) null, LongType$.MODULE$);
        })).dataType();
        if (!(LongType$.MODULE$.equals(dataType2) ? true : TimestampType$.MODULE$.equals(dataType2))) {
            throw new AnalysisException(new StringBuilder(30).append(PulsarOptions$.MODULE$.EventTimeName()).append(" attribute type ").append("must be a ").append(LongType$.MODULE$.catalogString()).append(" or ").append(TimestampType$.MODULE$.catalogString()).toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        seq.find(attribute4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$validateQuery$7(attribute4));
        }).map(attribute5 -> {
            $anonfun$validateQuery$8(attribute5);
            return BoxedUnit.UNIT;
        });
        Seq seq2 = (Seq) seq.filter(attribute6 -> {
            return BoxesRunTime.boxToBoolean($anonfun$validateQuery$10(attribute6));
        });
        if (seq2.length() == 0) {
            throw new AnalysisException("Schema should have at least one non-key/non-topic field", AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
        }
        checkForUnsupportedType((Seq) seq2.map(attribute7 -> {
            return attribute7.dataType();
        }, Seq$.MODULE$.canBuildFrom()));
    }

    public void write(SparkSession sparkSession, QueryExecution queryExecution, Map<String, Object> map, Map<String, Object> map2, Option<String> option, String str) {
        Seq<Attribute> output = queryExecution.analyzed().output();
        validateQuery(output, option);
        queryExecution.toRdd().foreachPartition(iterator -> {
            $anonfun$write$1(map, map2, option, output, str, iterator);
            return BoxedUnit.UNIT;
        });
    }

    public <T> Producer<T> createProducer(Map<String, Object> map, Map<String, Object> map2, String str, Schema<T> schema) {
        return CachedPulsarClient$.MODULE$.getOrCreate(map).newProducer(schema).topic(str).loadConf(map2).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(Commands.DEFAULT_MAX_MESSAGE_SIZE).create();
    }

    public StructType toStructType(Seq<Attribute> seq) {
        return org.apache.spark.sql.catalyst.expressions.package$.MODULE$.AttributeSeq(seq).toStructType();
    }

    public static final /* synthetic */ void $anonfun$checkForUnsupportedType$1(DataType dataType) {
        if (CalendarIntervalType$.MODULE$.equals(dataType)) {
            throw new AnalysisException("CalendarIntervalType not supported by pulsar sink yet", AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
        }
        if (dataType instanceof UserDefinedType) {
            throw new AnalysisException(new StringBuilder(33).append((UserDefinedType) dataType).append(" not supported by pulsar sink yet").toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
        }
        if (dataType instanceof ObjectType) {
            throw new AnalysisException(new StringBuilder(33).append((ObjectType) dataType).append(" not supported by pulsar sink yet").toString(), AnalysisException$.MODULE$.$lessinit$greater$default$2(), AnalysisException$.MODULE$.$lessinit$greater$default$3(), AnalysisException$.MODULE$.$lessinit$greater$default$4(), AnalysisException$.MODULE$.$lessinit$greater$default$5());
        }
        if (!(dataType instanceof StructType)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            MODULE$.checkForUnsupportedType(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(((StructType) dataType).fields())).map(structField -> {
                return structField.dataType();
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(DataType.class))))).toSeq());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$1(Attribute attribute) {
        String name = attribute.name();
        String TopicAttributeName = PulsarOptions$.MODULE$.TopicAttributeName();
        return name != null ? name.equals(TopicAttributeName) : TopicAttributeName == null;
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$3(Attribute attribute) {
        String name = attribute.name();
        String KeyAttributeName = PulsarOptions$.MODULE$.KeyAttributeName();
        return name != null ? name.equals(KeyAttributeName) : KeyAttributeName == null;
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$5(Attribute attribute) {
        String name = attribute.name();
        String EventTimeName = PulsarOptions$.MODULE$.EventTimeName();
        return name != null ? name.equals(EventTimeName) : EventTimeName == null;
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$7(Attribute attribute) {
        String name = attribute.name();
        String MessageIdName = PulsarOptions$.MODULE$.MessageIdName();
        if (name != null ? !name.equals(MessageIdName) : MessageIdName != null) {
            String name2 = attribute.name();
            String PublishTimeName = PulsarOptions$.MODULE$.PublishTimeName();
            if (name2 != null ? !name2.equals(PublishTimeName) : PublishTimeName != null) {
                return false;
            }
        }
        return true;
    }

    public static final /* synthetic */ void $anonfun$validateQuery$8(Attribute attribute) {
        MODULE$.logWarning(() -> {
            return new StringBuilder(196).append(attribute.name()).append(" attribute exists in schema,").append("it's reserved by Pulsar Source and generated automatically by pulsar for each record.").append("Choose another name if you want to keep this field or it will be ignored by pulsar.").toString();
        });
    }

    public static final /* synthetic */ boolean $anonfun$validateQuery$10(Attribute attribute) {
        return !PulsarOptions$.MODULE$.MetaFieldNames().contains(attribute.name());
    }

    public static final /* synthetic */ void $anonfun$write$1(Map map, Map map2, Option option, Seq seq, String str, Iterator iterator) {
        PulsarWriteTask pulsarWriteTask = new PulsarWriteTask(map, map2, option, seq, str);
        Utils$.MODULE$.tryWithSafeFinally(() -> {
            pulsarWriteTask.execute(iterator);
        }, () -> {
            pulsarWriteTask.close();
        });
    }

    private PulsarSinks$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}
