package org.apache.flink.streaming.connectors.kafka.table;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.kafka.common.header.Header;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.class */
public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata {
    private static final String UPSERT_KAFKA_TRANSFORMATION = "upsert-kafka";
    protected List<String> metadataKeys = Collections.emptyList();
    protected DataType consumedDataType;
    protected final DataType physicalDataType;

    @Nullable
    protected final EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat;
    protected final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat;
    protected final int[] keyProjection;
    protected final int[] valueProjection;

    @Nullable
    protected final String keyPrefix;
    private final DeliveryGuarantee deliveryGuarantee;

    @Nullable
    private final String transactionalIdPrefix;
    protected final String topic;
    protected final Properties properties;

    @Nullable
    protected final FlinkKafkaPartitioner<RowData> partitioner;
    protected final boolean upsertMode;
    protected final SinkBufferFlushMode flushMode;

    @Nullable
    protected final Integer parallelism;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink$KafkaHeader.class */
    private static class KafkaHeader implements Header {
        private final String key;
        private final byte[] value;

        KafkaHeader(String str, byte[] bArr) {
            this.key = str;
            this.value = bArr;
        }

        @Override // org.apache.kafka.common.header.Header
        public String key() {
            return this.key;
        }

        @Override // org.apache.kafka.common.header.Header
        public byte[] value() {
            return this.value;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink$MetadataConverter.class */
    public interface MetadataConverter extends Serializable {
        Object read(RowData rowData, int i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink$WritableMetadata.class */
    public enum WritableMetadata {
        HEADERS("headers", DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).nullable(), new MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.WritableMetadata.1
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.MetadataConverter
            public Object read(RowData rowData, int i) {
                if (rowData.isNullAt(i)) {
                    return null;
                }
                MapData map = rowData.getMap(i);
                ArrayData keyArray = map.keyArray();
                ArrayData valueArray = map.valueArray();
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < keyArray.size(); i2++) {
                    if (!keyArray.isNullAt(i2) && !valueArray.isNullAt(i2)) {
                        arrayList.add(new KafkaHeader(keyArray.getString(i2).toString(), valueArray.getBinary(i2)));
                    }
                }
                return arrayList;
            }
        }),
        TIMESTAMP("timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), new MetadataConverter() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.WritableMetadata.2
            private static final long serialVersionUID = 1;

            @Override // org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.MetadataConverter
            public Object read(RowData rowData, int i) {
                if (rowData.isNullAt(i)) {
                    return null;
                }
                return Long.valueOf(rowData.getTimestamp(i, 3).getMillisecond());
            }
        });

        final String key;
        final DataType dataType;
        final MetadataConverter converter;

        WritableMetadata(String str, DataType dataType, MetadataConverter metadataConverter) {
            this.key = str;
            this.dataType = dataType;
            this.converter = metadataConverter;
        }
    }

    public KafkaDynamicSink(DataType dataType, DataType dataType2, @Nullable EncodingFormat<SerializationSchema<RowData>> encodingFormat, EncodingFormat<SerializationSchema<RowData>> encodingFormat2, int[] iArr, int[] iArr2, @Nullable String str, String str2, Properties properties, @Nullable FlinkKafkaPartitioner<RowData> flinkKafkaPartitioner, DeliveryGuarantee deliveryGuarantee, boolean z, SinkBufferFlushMode sinkBufferFlushMode, @Nullable Integer num, @Nullable String str3) {
        this.consumedDataType = (DataType) Preconditions.checkNotNull(dataType, "Consumed data type must not be null.");
        this.physicalDataType = (DataType) Preconditions.checkNotNull(dataType2, "Physical data type must not be null.");
        this.keyEncodingFormat = encodingFormat;
        this.valueEncodingFormat = (EncodingFormat) Preconditions.checkNotNull(encodingFormat2, "Value encoding format must not be null.");
        this.keyProjection = (int[]) Preconditions.checkNotNull(iArr, "Key projection must not be null.");
        this.valueProjection = (int[]) Preconditions.checkNotNull(iArr2, "Value projection must not be null.");
        this.keyPrefix = str;
        this.transactionalIdPrefix = str3;
        this.topic = (String) Preconditions.checkNotNull(str2, "Topic must not be null.");
        this.properties = (Properties) Preconditions.checkNotNull(properties, "Properties must not be null.");
        this.partitioner = flinkKafkaPartitioner;
        this.deliveryGuarantee = (DeliveryGuarantee) Preconditions.checkNotNull(deliveryGuarantee, "DeliveryGuarantee must not be null.");
        this.upsertMode = z;
        this.flushMode = (SinkBufferFlushMode) Preconditions.checkNotNull(sinkBufferFlushMode);
        if (sinkBufferFlushMode.isEnabled() && !z) {
            throw new IllegalArgumentException("Sink buffer flush is only supported in upsert-kafka.");
        }
        this.parallelism = num;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return this.valueEncodingFormat.getChangelogMode();
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(final DynamicTableSink.Context context) {
        SerializationSchema<RowData> createSerialization = createSerialization(context, this.keyEncodingFormat, this.keyProjection, this.keyPrefix);
        SerializationSchema<RowData> createSerialization2 = createSerialization(context, this.valueEncodingFormat, this.valueProjection, null);
        KafkaSinkBuilder builder = KafkaSink.builder();
        List<LogicalType> children = this.physicalDataType.getLogicalType().getChildren();
        if (this.transactionalIdPrefix != null) {
            builder.setTransactionalIdPrefix(this.transactionalIdPrefix);
        }
        final KafkaSink build = builder.setDeliveryGuarantee(this.deliveryGuarantee).setBootstrapServers(this.properties.get("bootstrap.servers").toString()).setKafkaProducerConfig(this.properties).setRecordSerializer(new DynamicKafkaRecordSerializationSchema(this.topic, this.partitioner, createSerialization, createSerialization2, getFieldGetters(children, this.keyProjection), getFieldGetters(children, this.valueProjection), hasMetadata(), getMetadataPositions(children), this.upsertMode)).build();
        return (this.flushMode.isEnabled() && this.upsertMode) ? new DataStreamSinkProvider() { // from class: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.1
            public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                SerializableFunction serializableFunction;
                boolean isObjectReuseEnabled = dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
                KafkaSink kafkaSink = build;
                DataType dataType = KafkaDynamicSink.this.physicalDataType;
                int[] iArr = KafkaDynamicSink.this.keyProjection;
                SinkBufferFlushMode sinkBufferFlushMode = KafkaDynamicSink.this.flushMode;
                if (isObjectReuseEnabled) {
                    TypeSerializer createRowDataTypeSerializer = KafkaDynamicSink.this.createRowDataTypeSerializer(context, dataStream.getExecutionConfig());
                    createRowDataTypeSerializer.getClass();
                    serializableFunction = (v1) -> {
                        return r6.copy(v1);
                    };
                } else {
                    serializableFunction = rowData -> {
                        return rowData;
                    };
                }
                DataStreamSink<?> sinkTo = dataStream.sinkTo(new ReducingUpsertSink(kafkaSink, dataType, iArr, sinkBufferFlushMode, serializableFunction));
                Optional generateUid = providerContext.generateUid("upsert-kafka");
                sinkTo.getClass();
                generateUid.ifPresent(sinkTo::uid);
                if (KafkaDynamicSink.this.parallelism != null) {
                    sinkTo.setParallelism(KafkaDynamicSink.this.parallelism.intValue());
                }
                return sinkTo;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -691790048:
                        if (implMethodName.equals("lambda$consumeDataStream$1f4ac7d$1")) {
                            z = false;
                            break;
                        }
                        break;
                    case 3059573:
                        if (implMethodName.equals("copy")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink$1") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/data/RowData;)Lorg/apache/flink/table/data/RowData;")) {
                            return rowData -> {
                                return rowData;
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/common/typeutils/TypeSerializer") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                            TypeSerializer typeSerializer = (TypeSerializer) serializedLambda.getCapturedArg(0);
                            return (v1) -> {
                                return r0.copy(v1);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        } : SinkV2Provider.of(build, this.parallelism);
    }

    public Map<String, DataType> listWritableMetadata() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Stream.of((Object[]) WritableMetadata.values()).forEachOrdered(writableMetadata -> {
        });
        return linkedHashMap;
    }

    public void applyWritableMetadata(List<String> list, DataType dataType) {
        this.metadataKeys = list;
        this.consumedDataType = dataType;
    }

    public DynamicTableSink copy() {
        KafkaDynamicSink kafkaDynamicSink = new KafkaDynamicSink(this.consumedDataType, this.physicalDataType, this.keyEncodingFormat, this.valueEncodingFormat, this.keyProjection, this.valueProjection, this.keyPrefix, this.topic, this.properties, this.partitioner, this.deliveryGuarantee, this.upsertMode, this.flushMode, this.parallelism, this.transactionalIdPrefix);
        kafkaDynamicSink.metadataKeys = this.metadataKeys;
        return kafkaDynamicSink;
    }

    public String asSummaryString() {
        return "Kafka table sink";
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        KafkaDynamicSink kafkaDynamicSink = (KafkaDynamicSink) obj;
        return Objects.equals(this.metadataKeys, kafkaDynamicSink.metadataKeys) && Objects.equals(this.consumedDataType, kafkaDynamicSink.consumedDataType) && Objects.equals(this.physicalDataType, kafkaDynamicSink.physicalDataType) && Objects.equals(this.keyEncodingFormat, kafkaDynamicSink.keyEncodingFormat) && Objects.equals(this.valueEncodingFormat, kafkaDynamicSink.valueEncodingFormat) && Arrays.equals(this.keyProjection, kafkaDynamicSink.keyProjection) && Arrays.equals(this.valueProjection, kafkaDynamicSink.valueProjection) && Objects.equals(this.keyPrefix, kafkaDynamicSink.keyPrefix) && Objects.equals(this.topic, kafkaDynamicSink.topic) && Objects.equals(this.properties, kafkaDynamicSink.properties) && Objects.equals(this.partitioner, kafkaDynamicSink.partitioner) && Objects.equals(this.deliveryGuarantee, kafkaDynamicSink.deliveryGuarantee) && Objects.equals(Boolean.valueOf(this.upsertMode), Boolean.valueOf(kafkaDynamicSink.upsertMode)) && Objects.equals(this.flushMode, kafkaDynamicSink.flushMode) && Objects.equals(this.transactionalIdPrefix, kafkaDynamicSink.transactionalIdPrefix) && Objects.equals(this.parallelism, kafkaDynamicSink.parallelism);
    }

    public int hashCode() {
        return Objects.hash(this.metadataKeys, this.consumedDataType, this.physicalDataType, this.keyEncodingFormat, this.valueEncodingFormat, this.keyProjection, this.valueProjection, this.keyPrefix, this.topic, this.properties, this.partitioner, this.deliveryGuarantee, Boolean.valueOf(this.upsertMode), this.flushMode, this.transactionalIdPrefix, this.parallelism);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TypeSerializer<RowData> createRowDataTypeSerializer(DynamicTableSink.Context context, ExecutionConfig executionConfig) {
        return context.createTypeInformation(this.consumedDataType).createSerializer(executionConfig);
    }

    private int[] getMetadataPositions(List<LogicalType> list) {
        return Stream.of((Object[]) WritableMetadata.values()).mapToInt(writableMetadata -> {
            int indexOf = this.metadataKeys.indexOf(writableMetadata.key);
            if (indexOf < 0) {
                return -1;
            }
            return list.size() + indexOf;
        }).toArray();
    }

    private boolean hasMetadata() {
        return this.metadataKeys.size() > 0;
    }

    private RowData.FieldGetter[] getFieldGetters(List<LogicalType> list, int[] iArr) {
        return (RowData.FieldGetter[]) Arrays.stream(iArr).mapToObj(i -> {
            return RowData.createFieldGetter((LogicalType) list.get(i), i);
        }).toArray(i2 -> {
            return new RowData.FieldGetter[i2];
        });
    }

    @Nullable
    private SerializationSchema<RowData> createSerialization(DynamicTableSink.Context context, @Nullable EncodingFormat<SerializationSchema<RowData>> encodingFormat, int[] iArr, @Nullable String str) {
        if (encodingFormat == null) {
            return null;
        }
        DataType project = Projection.of(iArr).project(this.physicalDataType);
        if (str != null) {
            project = DataTypeUtils.stripRowPrefix(project, str);
        }
        return (SerializationSchema) encodingFormat.createRuntimeEncoder(context, project);
    }
}
