package org.apache.hadoop.hive.ql.metadata.events;

import hiveexec.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.EventUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.class */
public class NotificationEventPoll {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationEventPoll.class);
    private static final AtomicBoolean inited = new AtomicBoolean(false);
    private static NotificationEventPoll instance;
    Configuration conf;
    ScheduledExecutorService executorService;
    List<EventConsumer> eventConsumers = new ArrayList();
    ScheduledFuture<?> pollFuture;
    long lastCheckedEventId;

    /* loaded from: input_file:org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll$Poller.class */
    class Poller implements Runnable {
        Poller() {
        }

        @Override // java.lang.Runnable
        public void run() {
            NotificationEventPoll.LOG.debug("Polling for notification events");
            int i = 0;
            try {
                EventUtils.NotificationEventIterator notificationEventIterator = new EventUtils.NotificationEventIterator(new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()), NotificationEventPoll.this.lastCheckedEventId, 0, "*", (String) null);
                while (notificationEventIterator.hasNext()) {
                    NotificationEvent next = notificationEventIterator.next();
                    NotificationEventPoll.LOG.debug("Event: " + next);
                    Iterator<EventConsumer> it = NotificationEventPoll.this.eventConsumers.iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().accept(next);
                        } catch (Exception e) {
                            NotificationEventPoll.LOG.error("Error processing notification event " + next, e);
                        }
                    }
                    i++;
                    NotificationEventPoll.this.lastCheckedEventId = next.getEventId();
                }
            } catch (Exception e2) {
                NotificationEventPoll.LOG.error("Error polling for notification events", e2);
            }
            NotificationEventPoll.LOG.debug("Processed {} notification events", Integer.valueOf(i));
        }
    }

    public static void initialize(Configuration configuration) throws Exception {
        if (inited.getAndSet(true)) {
            return;
        }
        try {
            instance = new NotificationEventPoll(configuration);
        } catch (Exception e) {
            inited.set(false);
            throw e;
        }
    }

    public static void shutdown() {
        if (inited.get()) {
            instance.stop();
            instance = null;
            inited.set(false);
        }
    }

    private NotificationEventPoll(Configuration configuration) throws Exception {
        this.conf = configuration;
        long timeVar = HiveConf.getTimeVar(configuration, HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_POLL_INTERVAL, TimeUnit.MILLISECONDS);
        if (timeVar <= 0) {
            LOG.debug("Non-positive poll interval configured, notification event polling disabled");
            return;
        }
        String[] strings = configuration.getStrings(HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_CONSUMERS.varname);
        if (strings == null || strings.length <= 0) {
            LOG.debug("No event consumers configured, notification event polling disabled");
            return;
        }
        for (String str : strings) {
            this.eventConsumers.add((EventConsumer) ReflectionUtils.newInstance(JavaUtils.loadClass(str), configuration));
        }
        this.lastCheckedEventId = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()).getCurrentNotificationEventId();
        LOG.info("Initializing lastCheckedEventId to {}", Long.valueOf(this.lastCheckedEventId));
        this.executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NotificationEventPoll %d").build());
        this.pollFuture = this.executorService.scheduleAtFixedRate(new Poller(), timeVar, timeVar, TimeUnit.MILLISECONDS);
    }

    private void stop() {
        if (this.pollFuture != null) {
            this.pollFuture.cancel(true);
            this.pollFuture = null;
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            this.executorService = null;
        }
    }
}
