package org.apache.phoenix.kafka.consumer;

import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.flume.Context;
import org.apache.flume.event.EventBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.phoenix.flume.FlumeConstants;
import org.apache.phoenix.flume.serializer.EventSerializer;
import org.apache.phoenix.flume.serializer.EventSerializers;
import org.apache.phoenix.kafka.KafkaConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/phoenix/kafka/consumer/PhoenixConsumer.class */
public class PhoenixConsumer {
    private static final Logger logger = LoggerFactory.getLogger(PhoenixConsumer.class);
    private Integer batchSize;
    private long timeout;
    private EventSerializer serializer;
    private KafkaConsumer<String, String> consumer = null;
    private Properties properties = new Properties();
    private Boolean process = true;

    public PhoenixConsumer() {
    }

    public PhoenixConsumer(Configuration configuration) throws IOException {
        intializeKafka(configuration);
        configure(prepareContext());
        start();
        process();
    }

    public void intializeKafka(Configuration configuration) throws IOException {
        String str = configuration.get("kafka.consumer.file");
        if (str == null) {
            throw new NullPointerException("File path cannot be empty, please specify in the arguments");
        }
        try {
            InputStream open = FileSystem.get(configuration).open(new Path(str));
            Throwable th = null;
            try {
                try {
                    this.properties.load(open);
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            open.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        intializeKafka(this.properties);
    }

    public void intializeKafka(Properties properties) {
        this.properties = properties;
        String property = properties.getProperty("bootstrap.servers");
        if (property == null) {
            throw new NullPointerException("Bootstrap Servers cannot be empty, please specify in the configuration file");
        }
        properties.setProperty("bootstrap.servers", property);
        if (properties.getProperty("group.id") == null) {
            properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
        }
        if (properties.getProperty(KafkaConstants.TIMEOUT) == null) {
            properties.setProperty(KafkaConstants.TIMEOUT, String.valueOf(100L));
        }
        String property2 = properties.getProperty("topics");
        if (property2 == null) {
            throw new NullPointerException("Topics cannot be empty, please specify in the configuration file");
        }
        properties.setProperty("key.deserializer", KafkaConstants.DEFAULT_KEY_DESERIALIZER);
        properties.setProperty("value.deserializer", KafkaConstants.DEFAULT_VALUE_DESERIALIZER);
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(Arrays.asList(property2.split(",")));
    }

    public Context prepareContext() {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : this.properties.entrySet()) {
            hashMap.put((String) entry.getKey(), (String) entry.getValue());
        }
        return new Context(hashMap);
    }

    public void configure(Context context) {
        this.timeout = context.getLong(KafkaConstants.TIMEOUT, 100L).longValue();
        this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, FlumeConstants.DEFAULT_BATCH_SIZE);
        String string = context.getString(FlumeConstants.CONFIG_SERIALIZER);
        if (string == null) {
            throw new NullPointerException("Event serializer cannot be empty, please specify in the configuration file");
        }
        initializeSerializer(context, string);
    }

    public void process() {
        int i = 0;
        while (this.process.booleanValue()) {
            ConsumerRecords<String, String> poll = this.consumer.poll(this.timeout);
            if (poll.count() == 0) {
                i++;
            } else {
                System.out.printf("Got %d records after %d timeouts\n", Integer.valueOf(poll.count()), Integer.valueOf(i));
                i = 0;
            }
            if (!poll.isEmpty()) {
                ArrayList arrayList = new ArrayList(poll.count());
                Iterator<ConsumerRecord<String, String>> it = poll.iterator();
                while (it.hasNext()) {
                    arrayList.add(EventBuilder.withBody(Bytes.toBytes(it.next().value())));
                }
                try {
                    this.serializer.upsertEvents(arrayList);
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void start() {
        logger.info("Starting consumer {} ", getClass());
        try {
            this.serializer.initialize();
        } catch (Exception e) {
            logger.error("Error {} in initializing the serializer.", e.getMessage());
            if (!(e instanceof RuntimeException)) {
                throw new RuntimeException(e);
            }
            throw ((RuntimeException) RuntimeException.class.cast(e));
        }
    }

    public void stop() {
        close();
        this.consumer.close();
        try {
            this.serializer.close();
        } catch (SQLException e) {
            logger.error(" Error while closing connection {} for consumer.", e.getMessage());
        }
    }

    public void close() {
        this.process = false;
        try {
            Thread.sleep(30000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void initializeSerializer(Context context, String str) {
        String str2 = null;
        EventSerializers eventSerializers = null;
        try {
            eventSerializers = EventSerializers.valueOf(str.toUpperCase());
        } catch (IllegalArgumentException e) {
            str2 = str;
        }
        Context context2 = new Context();
        context2.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX));
        copyPropertiesToSerializerContext(context, context2);
        try {
            this.serializer = (EventSerializer) (str2 == null ? Class.forName(eventSerializers.getClassName()) : Class.forName(str2)).newInstance();
            this.serializer.configure(context2);
        } catch (Exception e2) {
            logger.error("Could not instantiate event serializer.", e2);
            if (!(e2 instanceof RuntimeException)) {
                throw new RuntimeException(e2);
            }
            throw ((RuntimeException) e2);
        }
    }

    private void copyPropertiesToSerializerContext(Context context, Context context2) {
        context2.put(FlumeConstants.CONFIG_TABLE_DDL, context.getString(FlumeConstants.CONFIG_TABLE_DDL));
        context2.put(FlumeConstants.CONFIG_TABLE, context.getString(FlumeConstants.CONFIG_TABLE));
        context2.put(FlumeConstants.CONFIG_ZK_QUORUM, context.getString(FlumeConstants.CONFIG_ZK_QUORUM));
        context2.put(FlumeConstants.CONFIG_JDBC_URL, context.getString(FlumeConstants.CONFIG_JDBC_URL));
        context2.put(FlumeConstants.CONFIG_BATCHSIZE, context.getString(FlumeConstants.CONFIG_BATCHSIZE));
    }
}
