package org.apache.phoenix.kafka;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.flume.DefaultKeyGenerator;
import org.apache.phoenix.flume.serializer.EventSerializers;
import org.apache.phoenix.kafka.consumer.PhoenixConsumer;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/phoenix/kafka/PhoenixConsumerIT.class */
public class PhoenixConsumerIT extends BaseHBaseManagedTimeIT {
    private static final String ZKHOST = "127.0.0.1";
    private static final String BROKERHOST = "127.0.0.1";
    private static final String BROKERPORT = "9092";
    private static final String TOPIC = "topic1";
    private KafkaServer kafkaServer;
    private PhoenixConsumer pConsumer;
    private EmbeddedZookeeper zkServer;
    private ZkClient zkClient;
    private Connection conn;

    /* loaded from: input_file:org/apache/phoenix/kafka/PhoenixConsumerIT$KafkaProducerThread.class */
    class KafkaProducerThread implements Runnable {
        KafkaProducer<String, String> producer;
        String topic;

        KafkaProducerThread(String str, String str2) {
            this.topic = str2;
            try {
                InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(str);
                Throwable th = null;
                try {
                    try {
                        Properties properties = new Properties();
                        properties.load(resourceAsStream);
                        this.producer = new KafkaProducer<>(properties);
                        if (resourceAsStream != null) {
                            if (0 != 0) {
                                try {
                                    resourceAsStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                resourceAsStream.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        KafkaProducerThread(Properties properties, String str) {
            this.topic = str;
            this.producer = new KafkaProducer<>(properties);
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 1; i <= 10; i++) {
                try {
                    this.producer.send(new ProducerRecord(this.topic, String.format("%s,%.3f,%d", "msg" + i, Float.valueOf(i * 2000.0f), Integer.valueOf(i))));
                    this.producer.flush();
                    Thread.sleep(100L);
                } catch (Throwable th) {
                    System.out.printf("%s", th.fillInStackTrace());
                    return;
                } finally {
                    this.producer.close();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/phoenix/kafka/PhoenixConsumerIT$PhoenixConsumerThread.class */
    class PhoenixConsumerThread implements Runnable {
        PhoenixConsumer pConsumer;
        Properties properties;

        PhoenixConsumerThread(PhoenixConsumer phoenixConsumer, String str) {
            this.pConsumer = phoenixConsumer;
            try {
                InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(str);
                Throwable th = null;
                try {
                    try {
                        Properties properties = new Properties();
                        properties.load(resourceAsStream);
                        this.properties = properties;
                        if (resourceAsStream != null) {
                            if (0 != 0) {
                                try {
                                    resourceAsStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                resourceAsStream.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        PhoenixConsumerThread(PhoenixConsumer phoenixConsumer, Properties properties) {
            this.pConsumer = phoenixConsumer;
            this.properties = properties;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.pConsumer.intializeKafka(this.properties);
            this.pConsumer.configure(this.pConsumer.prepareContext());
            this.pConsumer.start();
            this.pConsumer.process();
        }
    }

    @Before
    public void setUp() throws IOException, SQLException {
        this.zkServer = new EmbeddedZookeeper();
        String str = "127.0.0.1:" + this.zkServer.port();
        this.zkClient = new ZkClient(str, 30000, 30000, ZKStringSerializer$.MODULE$);
        ZkUtils apply = ZkUtils.apply(this.zkClient, false);
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", str);
        properties.setProperty("broker.id", "0");
        properties.setProperty("log.dirs", Files.createTempDirectory("kafka-", new FileAttribute[0]).toAbsolutePath().toString());
        properties.setProperty("listeners", "PLAINTEXT://127.0.0.1:9092");
        this.kafkaServer = TestUtils.createServer(new KafkaConfig(properties), new MockTime());
        this.kafkaServer.startup();
        AdminUtils.createTopic(apply, TOPIC, 1, 1, new Properties());
        this.pConsumer = new PhoenixConsumer();
        this.conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
    }

    @Test
    public void testPhoenixConsumerWithFile() throws SQLException {
        PhoenixConsumerThread phoenixConsumerThread = new PhoenixConsumerThread(this.pConsumer, "consumer.props");
        phoenixConsumerThread.properties.setProperty("jdbcUrl", getUrl());
        Thread thread = new Thread(phoenixConsumerThread);
        Thread thread2 = new Thread(new KafkaProducerThread("producer.props", TOPIC));
        thread.start();
        try {
            thread.join(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread2.start();
        try {
            thread2.join();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        if (!thread2.isAlive()) {
            System.out.println("kafka producer is not alive");
            this.pConsumer.stop();
        }
        ResultSet executeQuery = this.conn.createStatement().executeQuery("SELECT * FROM SAMPLE1");
        Assert.assertTrue(executeQuery.next());
        Assert.assertTrue(executeQuery.getFetchSize() > 0);
        executeQuery.close();
    }

    @Test
    public void testPhoenixConsumerWithProperties() throws SQLException {
        Properties properties = new Properties();
        properties.setProperty("table", "SAMPLE2");
        properties.setProperty("jdbcUrl", getUrl());
        properties.setProperty("serializer", EventSerializers.REGEX.name());
        properties.setProperty("ddl", "CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))\n");
        properties.setProperty("serializer.regex", "([^\\,]*),([^\\,]*),([^\\,]*)");
        properties.setProperty("serializer.columns", "c1,c2,c3");
        properties.setProperty("serializer.rowkeyType", DefaultKeyGenerator.UUID.name());
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("topics", "topic1,topic2");
        properties.setProperty("poll.timeout.ms", "100");
        Thread thread = new Thread(new PhoenixConsumerThread(this.pConsumer, properties));
        Properties properties2 = new Properties();
        properties2.setProperty("bootstrap.servers", "localhost:9092");
        properties2.setProperty("key.serializer", KafkaConstants.DEFAULT_KEY_SERIALIZER);
        properties2.setProperty("value.serializer", KafkaConstants.DEFAULT_VALUE_SERIALIZER);
        properties2.setProperty("auto.commit.interval.ms", "1000");
        Thread thread2 = new Thread(new KafkaProducerThread(properties2, TOPIC));
        thread.start();
        try {
            thread.join(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread2.start();
        try {
            thread2.join();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        if (!thread2.isAlive()) {
            System.out.println("kafka producer is not alive");
            this.pConsumer.stop();
        }
        ResultSet executeQuery = this.conn.createStatement().executeQuery("SELECT * FROM SAMPLE2");
        Assert.assertTrue(executeQuery.next());
        Assert.assertTrue(executeQuery.getFetchSize() > 0);
        executeQuery.close();
    }

    @After
    public void cleanUp() throws Exception {
        this.kafkaServer.shutdown();
        this.zkClient.close();
        this.zkServer.shutdown();
        this.conn.close();
    }
}
