package org.apache.flink.connector.kinesis.table;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
import org.apache.flink.connector.kinesis.table.KinesisDynamicSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.TableOptionsBuilder;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.class */
class KinesisDynamicTableSinkFactoryTest {
    private static final String STREAM_NAME = "myStream";

    KinesisDynamicTableSinkFactoryTest() {
    }

    @Test
    void testGoodTableSinkForPartitionedTable() {
        ResolvedSchema defaultSinkSchema = defaultSinkSchema();
        DataType physicalRowDataType = defaultSinkSchema.toPhysicalRowDataType();
        Map build = defaultTableOptions().build();
        List asList = Arrays.asList("name", "curr_id");
        KinesisDynamicSink createTableSink = FactoryMocks.createTableSink(defaultSinkSchema, asList, build);
        Assertions.assertThat(createTableSink).isEqualTo(new KinesisDynamicSink.KinesisDynamicTableSinkBuilder().setConsumedDataType(physicalRowDataType).setStream(STREAM_NAME).setKinesisClientProperties(defaultProducerProperties()).setEncodingFormat(new TestFormatFactory.EncodingFormatMock(",")).setPartitioner(new RowDataFieldsKinesisPartitionKeyGenerator(physicalRowDataType.getLogicalType(), asList)).build());
        Assertions.assertThat(createTableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)).createSink()).isInstanceOf(KinesisStreamsSink.class);
    }

    @Test
    void testGoodTableSinkCopyForPartitionedTable() {
        ResolvedSchema defaultSinkSchema = defaultSinkSchema();
        DataType physicalRowDataType = defaultSinkSchema.toPhysicalRowDataType();
        Map build = defaultTableOptions().build();
        List asList = Arrays.asList("name", "curr_id");
        KinesisDynamicSink createTableSink = FactoryMocks.createTableSink(defaultSinkSchema, asList, build);
        KinesisDynamicSink build2 = new KinesisDynamicSink.KinesisDynamicTableSinkBuilder().setConsumedDataType(physicalRowDataType).setStream(STREAM_NAME).setKinesisClientProperties(defaultProducerProperties()).setEncodingFormat(new TestFormatFactory.EncodingFormatMock(",")).setPartitioner(new RowDataFieldsKinesisPartitionKeyGenerator(physicalRowDataType.getLogicalType(), asList)).build();
        Assertions.assertThat(createTableSink).isEqualTo(build2.copy());
        Assertions.assertThat(build2).isNotSameAs(build2.copy());
    }

    @Test
    void testGoodTableSinkForNonPartitionedTable() {
        ResolvedSchema defaultSinkSchema = defaultSinkSchema();
        KinesisDynamicSink createTableSink = FactoryMocks.createTableSink(defaultSinkSchema, defaultTableOptions().build());
        Assertions.assertThat(createTableSink).isEqualTo(new KinesisDynamicSink.KinesisDynamicTableSinkBuilder().setConsumedDataType(defaultSinkSchema.toPhysicalRowDataType()).setStream(STREAM_NAME).setKinesisClientProperties(defaultProducerProperties()).setEncodingFormat(new TestFormatFactory.EncodingFormatMock(",")).setPartitioner(new RandomKinesisPartitionKeyGenerator()).build());
        Assertions.assertThat(createTableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)).createSink()).isInstanceOf(KinesisStreamsSink.class);
    }

    @Test
    void testGoodTableSinkForNonPartitionedTableWithSinkOptions() {
        ResolvedSchema defaultSinkSchema = defaultSinkSchema();
        KinesisDynamicSink createTableSink = FactoryMocks.createTableSink(defaultSinkSchema, defaultTableOptionsWithSinkOptions().build());
        Assertions.assertThat(createTableSink).isEqualTo(getDefaultSinkBuilder().setConsumedDataType(defaultSinkSchema.toPhysicalRowDataType()).setStream(STREAM_NAME).setKinesisClientProperties(defaultProducerProperties()).setEncodingFormat(new TestFormatFactory.EncodingFormatMock(",")).setPartitioner(new RandomKinesisPartitionKeyGenerator()).build());
        Assertions.assertThat(createTableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)).createSink()).isInstanceOf(KinesisStreamsSink.class);
    }

    @Test
    void testGoodTableSinkForNonPartitionedTableWithProducerOptions() {
        ResolvedSchema defaultSinkSchema = defaultSinkSchema();
        KinesisDynamicSink createTableSink = FactoryMocks.createTableSink(defaultSinkSchema, defaultTableOptionsWithDeprecatedOptions().build());
        Assertions.assertThat(createTableSink).isEqualTo(new KinesisDynamicSink.KinesisDynamicTableSinkBuilder().setFailOnError(true).setMaxBatchSize(100).setMaxInFlightRequests(100).setMaxTimeInBufferMS(1000L).setConsumedDataType(defaultSinkSchema.toPhysicalRowDataType()).setStream(STREAM_NAME).setKinesisClientProperties(defaultProducerProperties()).setEncodingFormat(new TestFormatFactory.EncodingFormatMock(",")).setPartitioner(new RandomKinesisPartitionKeyGenerator()).build());
        Assertions.assertThat(createTableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)).createSink()).isInstanceOf(KinesisStreamsSink.class);
    }

    @Test
    void testBadTableSinkForCustomPartitionerForPartitionedTable() {
        ResolvedSchema defaultSinkSchema = defaultSinkSchema();
        Map build = defaultTableOptions().withTableOption(KinesisConnectorOptions.SINK_PARTITIONER, "random").build();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> {
            FactoryMocks.createTableSink(defaultSinkSchema, Arrays.asList("name", "curr_id"), build);
        }).havingCause().withMessageContaining(String.format("Cannot set %s option for a table defined with a PARTITIONED BY clause", KinesisConnectorOptions.SINK_PARTITIONER.key()));
    }

    @Test
    void testBadTableSinkForNonExistingPartitionerClass() {
        ResolvedSchema defaultSinkSchema = defaultSinkSchema();
        Map build = defaultTableOptions().withTableOption(KinesisConnectorOptions.SINK_PARTITIONER, "abc").build();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> {
            FactoryMocks.createTableSink(defaultSinkSchema, build);
        }).havingCause().withMessageContaining("Could not find and instantiate partitioner class 'abc'");
    }

    private ResolvedSchema defaultSinkSchema() {
        return ResolvedSchema.of(new Column[]{Column.physical("name", DataTypes.STRING()), Column.physical("curr_id", DataTypes.BIGINT()), Column.physical("time", DataTypes.TIMESTAMP(3))});
    }

    private TableOptionsBuilder defaultTableOptionsWithSinkOptions() {
        return defaultTableOptions().withTableOption(KinesisConnectorOptions.SINK_FAIL_ON_ERROR.key(), "true").withTableOption(AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(), "100").withTableOption(AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS.key(), "100").withTableOption(AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS.key(), "100").withTableOption(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(), "1000").withTableOption(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(), "1000");
    }

    private TableOptionsBuilder defaultTableOptionsWithDeprecatedOptions() {
        return defaultTableOptions().withTableOption("sink.producer.record-max-buffered-time", "1000").withTableOption("sink.producer.collection-max-size", "100").withTableOption("sink.producer.collection-max-count", "100").withTableOption("sink.producer.fail-on-error", "true");
    }

    private TableOptionsBuilder defaultTableOptions() {
        return new TableOptionsBuilder("kinesis", "test-format").withTableOption(KinesisConnectorOptions.STREAM, STREAM_NAME).withTableOption("aws.region", "us-west-2").withTableOption("aws.credentials.provider", "BASIC").withTableOption("aws.credentials.basic.accesskeyid", "ververicka").withTableOption("aws.credentials.basic.secretkey", "SuperSecretSecretSquirrel").withFormatOption(TestFormatFactory.DELIMITER, ",").withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
    }

    private KinesisDynamicSink.KinesisDynamicTableSinkBuilder getDefaultSinkBuilder() {
        return new KinesisDynamicSink.KinesisDynamicTableSinkBuilder().setFailOnError(true).setMaxBatchSize(100).setMaxInFlightRequests(100).setMaxBufferSizeInBytes(1000L).setMaxBufferedRequests(100).setMaxTimeInBufferMS(1000L);
    }

    private Properties defaultProducerProperties() {
        return new Properties() { // from class: org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactoryTest.1
            {
                setProperty("aws.region", "us-west-2");
                setProperty("aws.credentials.provider", "BASIC");
                setProperty("aws.credentials.provider.basic.accesskeyid", "ververicka");
                setProperty("aws.credentials.provider.basic.secretkey", "SuperSecretSecretSquirrel");
            }
        };
    }
}
