package org.apache.flink.connector.jdbc.dialect.oracle;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.class */
public class OracleTableSinkITCase extends AbstractTestBase {
    private static final OracleContainer container = new OracleContainer();
    private static String containerUrl;
    public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
    public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend";
    public static final String OUTPUT_TABLE3 = "dynamicSinkForBatch";
    public static final String OUTPUT_TABLE4 = "REAL_TABLE";
    public static final String OUTPUT_TABLE5 = "checkpointTable";
    public static final String USER_TABLE = "USER_TABLE";

    @BeforeClass
    public static void beforeAll() throws ClassNotFoundException, SQLException {
        container.start();
        containerUrl = container.getJdbcUrl();
        Class.forName(container.getDriverClassName());
        Connection connection = DriverManager.getConnection(containerUrl);
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.executeUpdate("CREATE TABLE dynamicSinkForUpsert (cnt NUMBER(38,2) DEFAULT 0 NOT NULL,lencnt NUMBER(38,2) DEFAULT 0 NOT NULL,cTag INT DEFAULT 0 NOT NULL,ts TIMESTAMP,PRIMARY KEY (cnt, cTag))");
                    createStatement.executeUpdate("CREATE TABLE dynamicSinkForAppend (id INT DEFAULT 0 NOT NULL,num NUMBER DEFAULT 0 NOT NULL,ts TIMESTAMP)");
                    createStatement.executeUpdate("CREATE TABLE dynamicSinkForBatch (NAME VARCHAR(20) NOT NULL,SCORE NUMBER DEFAULT 0 NOT NULL)");
                    createStatement.executeUpdate("CREATE TABLE REAL_TABLE (real_data REAL)");
                    createStatement.executeUpdate("CREATE TABLE checkpointTable (id NUMBER DEFAULT 0 NOT NULL)");
                    createStatement.executeUpdate("CREATE TABLE USER_TABLE (user_id VARCHAR(20) NOT NULL,user_name VARCHAR(20) NOT NULL,email VARCHAR(255),balance DECIMAL(18,2),balance2 DECIMAL(18,2),PRIMARY KEY (user_id))");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (connection != null) {
                        if (0 == 0) {
                            connection.close();
                            return;
                        }
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    connection.close();
                }
            }
            throw th8;
        }
    }

    @AfterClass
    public static void afterAll() throws Exception {
        TestValuesTableFactory.clearAllData();
        Class.forName(container.getDriverClassName());
        Connection connection = DriverManager.getConnection(containerUrl);
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("DROP TABLE dynamicSinkForUpsert");
                    createStatement.execute("DROP TABLE dynamicSinkForAppend");
                    createStatement.execute("DROP TABLE dynamicSinkForBatch");
                    createStatement.execute("DROP TABLE REAL_TABLE");
                    createStatement.execute("DROP TABLE checkpointTable");
                    createStatement.execute("DROP TABLE USER_TABLE");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    container.stop();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    public static DataStream<Tuple4<Integer, Long, String, Timestamp>> get4TupleDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple4(1, 1L, "Hi", Timestamp.valueOf("1970-01-01 00:00:00.001")));
        arrayList.add(new Tuple4(2, 2L, "Hello", Timestamp.valueOf("1970-01-01 00:00:00.002")));
        arrayList.add(new Tuple4(3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 00:00:00.003")));
        arrayList.add(new Tuple4(4, 3L, "Hello world, how are you?", Timestamp.valueOf("1970-01-01 00:00:00.004")));
        arrayList.add(new Tuple4(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 00:00:00.005")));
        arrayList.add(new Tuple4(6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 00:00:00.006")));
        arrayList.add(new Tuple4(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 00:00:00.007")));
        arrayList.add(new Tuple4(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 00:00:00.008")));
        arrayList.add(new Tuple4(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 00:00:00.009")));
        arrayList.add(new Tuple4(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 00:00:00.010")));
        arrayList.add(new Tuple4(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 00:00:00.011")));
        arrayList.add(new Tuple4(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 00:00:00.012")));
        arrayList.add(new Tuple4(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 00:00:00.013")));
        arrayList.add(new Tuple4(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 00:00:00.014")));
        arrayList.add(new Tuple4(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 00:00:00.015")));
        arrayList.add(new Tuple4(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 00:00:00.016")));
        arrayList.add(new Tuple4(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 00:00:00.017")));
        arrayList.add(new Tuple4(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 00:00:00.018")));
        arrayList.add(new Tuple4(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 00:00:00.019")));
        arrayList.add(new Tuple4(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 00:00:00.020")));
        arrayList.add(new Tuple4(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 00:00:00.021")));
        Collections.shuffle(arrayList);
        return streamExecutionEnvironment.fromCollection(arrayList);
    }

    @Test
    public void testReal() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment, EnvironmentSettings.inStreamingMode());
        create.executeSql("CREATE TABLE upsertSink (  real_data float) WITH (  'connector'='jdbc',  'url'='" + containerUrl + "',  'table-name'='REAL_TABLE')");
        create.executeSql("INSERT INTO upsertSink SELECT CAST(1.1 as FLOAT)").await();
        JdbcTableOutputFormatTest.check(new Row[]{Row.of(new Object[]{Float.valueOf(1.1f)})}, containerUrl, "REAL_TABLE", new String[]{"real_data"});
    }

    @Test
    public void testUpsert() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        create.createTemporaryView("T", create.fromDataStream(get4TupleDataStream(executionEnvironment).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<Integer, Long, String, Timestamp>>() { // from class: org.apache.flink.connector.jdbc.dialect.oracle.OracleTableSinkITCase.1
            public long extractAscendingTimestamp(Tuple4<Integer, Long, String, Timestamp> tuple4) {
                return ((Integer) tuple4.f0).intValue();
            }
        }), new Expression[]{Expressions.$("id"), Expressions.$("num"), Expressions.$("text"), Expressions.$("ts")}));
        create.executeSql("CREATE TABLE upsertSink (  cnt DECIMAL(18,2),  lencnt DECIMAL(18,2),  cTag INT,  ts TIMESTAMP(3),  PRIMARY KEY (cnt, cTag) NOT ENFORCED) WITH (  'connector'='jdbc',  'url'='" + containerUrl + "',  'table-name'='dynamicSinkForUpsert',  'sink.buffer-flush.max-rows' = '2',  'sink.buffer-flush.interval' = '0',  'sink.max-retries' = '0')");
        create.executeSql("INSERT INTO upsertSink \nSELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS ts\nFROM (\n  SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS ts\n  FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n  GROUP BY len, cTag\n)\nGROUP BY cnt, cTag").await();
        JdbcTableOutputFormatTest.check(new Row[]{Row.of(new Object[]{1, 5, 1, Timestamp.valueOf("1970-01-01 00:00:00.006")}), Row.of(new Object[]{7, 1, 1, Timestamp.valueOf("1970-01-01 00:00:00.021")}), Row.of(new Object[]{9, 1, 1, Timestamp.valueOf("1970-01-01 00:00:00.015")})}, containerUrl, "dynamicSinkForUpsert", new String[]{"cnt", "lencnt", "cTag", "ts"});
    }

    @Test
    public void testAppend() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.getConfig().setParallelism(1);
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        create.registerTable("T", create.fromDataStream(get4TupleDataStream(executionEnvironment), new Expression[]{Expressions.$("id"), Expressions.$("num"), Expressions.$("text"), Expressions.$("ts")}));
        create.executeSql("CREATE TABLE upsertSink (  id INT,  num BIGINT,  ts TIMESTAMP(3)) WITH (  'connector'='jdbc',  'url'='" + containerUrl + "',  'table-name'='dynamicSinkForAppend')");
        create.executeSql("INSERT INTO upsertSink SELECT id, num, ts FROM T WHERE id IN (2, 10, 20)").await();
        JdbcTableOutputFormatTest.check(new Row[]{Row.of(new Object[]{2, 2, Timestamp.valueOf("1970-01-01 00:00:00.002")}), Row.of(new Object[]{10, 4, Timestamp.valueOf("1970-01-01 00:00:00.01")}), Row.of(new Object[]{20, 6, Timestamp.valueOf("1970-01-01 00:00:00.02")})}, containerUrl, "dynamicSinkForAppend", new String[]{"id", "num", "ts"});
    }

    @Test
    public void testBatchSink() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inBatchMode());
        create.executeSql("CREATE TABLE USER_RESULT(NAME VARCHAR,SCORE BIGINT) WITH ( 'connector' = 'jdbc','url'='" + containerUrl + "','table-name' = 'dynamicSinkForBatch','sink.buffer-flush.max-rows' = '2','sink.buffer-flush.interval' = '300ms','sink.max-retries' = '4')");
        create.executeSql("INSERT INTO USER_RESULT\nSELECT user_name, score FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) AS UserCountTable(score, user_name)").await();
        JdbcTableOutputFormatTest.check(new Row[]{Row.of(new Object[]{"Bob", 1}), Row.of(new Object[]{"Tom", 22}), Row.of(new Object[]{"Kim", 42}), Row.of(new Object[]{"Kim", 42}), Row.of(new Object[]{"Bob", 1})}, containerUrl, "dynamicSinkForBatch", new String[]{"NAME", "SCORE"});
    }

    @Test
    public void testReadingFromChangelogSource() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().build());
        create.executeSql("CREATE TABLE user_logs (\n  user_id STRING,\n  user_name STRING,\n  email STRING,\n  balance DECIMAL(18,2),\n  balance2 AS balance * 2\n) WITH (\n 'connector' = 'values',\n 'data-id' = '" + TestValuesTableFactory.registerData(TestData.userChangelog()) + "',\n 'changelog-mode' = 'I,UA,UB,D'\n)");
        create.executeSql("CREATE TABLE user_sink (\n  user_id STRING PRIMARY KEY NOT ENFORCED,\n  user_name STRING,\n  email STRING,\n  balance DECIMAL(18,2),\n  balance2 DECIMAL(18,2)\n) WITH (\n  'connector' = 'jdbc',  'url'='" + containerUrl + "',  'table-name' = 'USER_TABLE',  'sink.buffer-flush.max-rows' = '2',  'sink.buffer-flush.interval' = '0')");
        create.executeSql("INSERT INTO user_sink SELECT * FROM user_logs").await();
        JdbcTableOutputFormatTest.check(new Row[]{Row.of(new Object[]{"user1", "Tom", "tom123@gmail.com", new BigDecimal("8.1"), new BigDecimal("16.2")}), Row.of(new Object[]{"user3", "Bailey", "bailey@qq.com", new BigDecimal("9.99"), new BigDecimal("19.98")}), Row.of(new Object[]{"user4", "Tina", "tina@gmail.com", new BigDecimal("11.3"), new BigDecimal("22.6")})}, containerUrl, "USER_TABLE", new String[]{"user_id", "user_name", "email", "balance", "balance2"});
    }

    @Test
    public void testFlushBufferWhenCheckpoint() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", "jdbc");
        hashMap.put("url", containerUrl);
        hashMap.put("table-name", "checkpointTable");
        hashMap.put("sink.buffer-flush.interval", "0");
        GenericJdbcSinkFunction createSinkFunction = FactoryMocks.createTableSink(ResolvedSchema.of(new Column[]{Column.physical("id", DataTypes.BIGINT().notNull())}), hashMap).getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)).createSinkFunction();
        createSinkFunction.setRuntimeContext(new MockStreamingRuntimeContext(true, 1, 0));
        createSinkFunction.open(new Configuration());
        createSinkFunction.invoke(GenericRowData.of(new Object[]{1L}), SinkContextUtil.forTimestamp(1L));
        createSinkFunction.invoke(GenericRowData.of(new Object[]{2L}), SinkContextUtil.forTimestamp(1L));
        JdbcTableOutputFormatTest.check(new Row[0], containerUrl, "checkpointTable", new String[]{"id"});
        createSinkFunction.snapshotState(new StateSnapshotContextSynchronousImpl(1L, 1L));
        JdbcTableOutputFormatTest.check(new Row[]{Row.of(new Object[]{1L}), Row.of(new Object[]{2L})}, containerUrl, "checkpointTable", new String[]{"id"});
        createSinkFunction.close();
    }
}
