package com.couchbase.client.dcp.highlevel.internal;

import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/client/dcp/highlevel/internal/AsyncEventDispatcher.class */
public class AsyncEventDispatcher implements EventDispatcher {
    private final BlockingDeque<DatabaseChangeEvent> queue = new LinkedBlockingDeque();
    private final BlockingQueueConsumerOps<DatabaseChangeEvent> queueOps;
    private volatile boolean shutdown;
    private final Thread thread;
    private static final Logger log = LoggerFactory.getLogger(AsyncEventDispatcher.class);
    private static final ThreadFactory threadFactory = new SimpleThreadFactory("dcp-event-dispatch-", thread -> {
        thread.setDaemon(true);
    });

    /* loaded from: input_file:com/couchbase/client/dcp/highlevel/internal/AsyncEventDispatcher$GracefulShutdownPoisonPill.class */
    private static class GracefulShutdownPoisonPill extends RuntimeException {
        private GracefulShutdownPoisonPill() {
        }
    }

    public AsyncEventDispatcher(FlowControlMode flowControlMode, DatabaseChangeListener databaseChangeListener) {
        Objects.requireNonNull(databaseChangeListener);
        this.queueOps = new DatabaseChangeConsumerOps(this.queue, flowControlMode);
        this.thread = threadFactory.newThread(() -> {
            while (!this.shutdown) {
                try {
                    this.queueOps.take().dispatch(databaseChangeListener);
                } catch (GracefulShutdownPoisonPill e) {
                    log.info("High-level event dispatcher terminated due to graceful shutdown request.");
                    return;
                } catch (InterruptedException e2) {
                    log.info("High-level event dispatcher terminated due to interruption.");
                    return;
                } catch (Throwable th) {
                    try {
                        log.warn("Event listener threw exception.", th);
                        databaseChangeListener.onFailure(new StreamFailure(-1, th));
                    } catch (Throwable th2) {
                        log.error("Event listener error handler threw exception.", th2);
                    }
                }
            }
        });
        this.thread.start();
    }

    @Override // com.couchbase.client.dcp.highlevel.internal.EventDispatcher
    public void dispatch(DatabaseChangeEvent databaseChangeEvent) {
        this.queue.add(databaseChangeEvent);
    }

    @Override // com.couchbase.client.dcp.highlevel.internal.EventDispatcher
    public void shutdownNow() {
        this.shutdown = true;
        this.thread.interrupt();
    }

    @Override // com.couchbase.client.dcp.highlevel.internal.EventDispatcher
    public void gracefulShutdown() {
        this.queue.addFirst(poisonPill());
    }

    @Override // com.couchbase.client.dcp.highlevel.internal.EventDispatcher
    public boolean awaitTermination(Duration duration) throws InterruptedException {
        this.thread.join(duration.toMillis());
        return !this.thread.isAlive();
    }

    private static DatabaseChangeEvent poisonPill() {
        return new DatabaseChangeEvent() { // from class: com.couchbase.client.dcp.highlevel.internal.AsyncEventDispatcher.1
            @Override // com.couchbase.client.dcp.highlevel.internal.DatabaseChangeEvent
            public void dispatch(DatabaseChangeListener databaseChangeListener) {
                throw new GracefulShutdownPoisonPill();
            }

            @Override // com.couchbase.client.dcp.highlevel.internal.DatabaseChangeEvent
            public int getVbucket() {
                return -1;
            }
        };
    }
}
