package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.util.concurrent.DefaultThreadFactory;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.config.NodeInfo;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.state.LifecycleState;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.core.utils.CbCollections;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.FailedToAddNodeEvent;
import com.couchbase.client.dcp.events.FailedToMovePartitionEvent;
import com.couchbase.client.dcp.events.FailedToRemoveNodeEvent;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.highlevel.internal.KeyExtractor;
import com.couchbase.client.dcp.message.PartitionAndSeqno;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/Conductor.class */
public class Conductor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Conductor.class);
    private final BucketConfigArbiter bucketConfigArbiter;
    private final Client.Environment env;
    private final DcpClientMetrics metrics;
    private final Disposable configSubscription;
    private final Thread configUpdateThread;
    private final CountDownLatch configurationApplied;
    private NanoTimestamp lastDnsSrvRefresh;
    private final Set<DcpChannel> channels = ConcurrentHashMap.newKeySet();
    private volatile boolean stopped = true;
    private final AtomicReference<DcpBucketConfig> currentConfig = new AtomicReference<>();
    private final SessionState sessionState = new SessionState();
    private final Duration dnsSrvRefreshCheckInterval = Duration.ofSeconds(2);
    private final Duration dnsSrvRefreshThrottle = Duration.ofSeconds(15);
    private final Duration shutdownTimeout = Duration.ofSeconds(30);
    private final ScheduledExecutorService configUpdateExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("couchbase-dcp-reconfigure", true));

    private void requireConfigUpdateThread() {
        if (Thread.currentThread() != this.configUpdateThread) {
            throw new IllegalStateException("This method may only be called on the config update thread, but was called on: " + Thread.currentThread());
        }
    }

    public Conductor(Client.Environment environment, DcpClientMetrics dcpClientMetrics) {
        try {
            this.configUpdateThread = (Thread) this.configUpdateExecutor.submit(Thread::currentThread).get();
            this.configurationApplied = new CountDownLatch(1);
            this.metrics = (DcpClientMetrics) Objects.requireNonNull(dcpClientMetrics);
            this.env = environment;
            this.bucketConfigArbiter = new BucketConfigArbiter(environment);
            this.configSubscription = this.bucketConfigArbiter.configs().publishOn(Schedulers.fromExecutor(this.configUpdateExecutor)).subscribe(dcpBucketConfig -> {
                if (dcpBucketConfig.numberOfPartitions() == 0 && this.currentConfig.get() == null) {
                    LOGGER.debug("Skipping initial config (rev {}) because it has invalid partition count.", dcpBucketConfig.rev());
                    return;
                }
                LOGGER.trace("Applying new configuration, new rev is {}.", dcpBucketConfig.rev());
                this.currentConfig.set(dcpBucketConfig);
                reconfigure(dcpBucketConfig);
                this.configurationApplied.countDown();
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public Mono<Void> connect() {
        Duration plus = this.env.bootstrapTimeout().plus(this.env.configRefreshInterval());
        return Mono.fromRunnable(() -> {
            this.stopped = false;
            HashSet hashSet = new HashSet(this.env.clusterAt());
            this.configUpdateExecutor.execute(() -> {
                add(hashSet);
            });
            if (!CbCollections.setCopyOf(CbCollections.transform(hashSet, (v0) -> {
                return v0.host();
            })).equals(CbCollections.setCopyOf(CbCollections.transform(this.env.connectionString().hosts(), (v0) -> {
                return v0.hostname();
            })))) {
                startDnsSrvRefreshWatchdog();
            }
        }).then(await(this.configurationApplied, plus).doOnError(th -> {
            LOGGER.warn("Did not receive initial configuration from cluster within {}", plus);
        }));
    }

    private void startDnsSrvRefreshWatchdog() {
        LOGGER.info("Scheduling DNS SRV re-bootstrap check at interval {}", this.dnsSrvRefreshCheckInterval);
        long millis = this.dnsSrvRefreshCheckInterval.toMillis();
        long millis2 = this.env.bootstrapTimeout().toMillis();
        this.configUpdateExecutor.execute(() -> {
            this.lastDnsSrvRefresh = NanoTimestamp.never();
        });
        this.configUpdateExecutor.scheduleWithFixedDelay(this::maybeBootstrapAgain, millis2, millis, TimeUnit.MILLISECONDS);
    }

    private void maybeBootstrapAgain() {
        requireConfigUpdateThread();
        try {
            if (this.stopped || !this.lastDnsSrvRefresh.hasElapsed(this.dnsSrvRefreshThrottle) || this.channels.stream().anyMatch(dcpChannel -> {
                return dcpChannel.state() == LifecycleState.CONNECTED;
            })) {
                return;
            }
            this.lastDnsSrvRefresh = NanoTimestamp.now();
            LOGGER.info("Attempting DNS SRV refresh because the client is currently connected to zero nodes.");
            HashSet hashSet = new HashSet(this.env.clusterAt());
            hashSet.removeAll(channelsByAddress().keySet());
            if (hashSet.isEmpty()) {
                LOGGER.info("DNS SRV record has no new nodes.");
            } else {
                LOGGER.info("Adding new nodes from DNS SRV record: {}", RedactableArgument.system(hashSet));
                add(hashSet);
            }
        } catch (Throwable th) {
            LOGGER.error("Exception in DNS SRV refresh watchdog task.", th);
        }
    }

    private static Mono<Void> await(CountDownLatch countDownLatch, Duration duration) {
        return Mono.fromRunnable(() -> {
            try {
                if (countDownLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                } else {
                    throw new RuntimeException(new TimeoutException("Timed out after waiting " + duration + " for latch."));
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BucketConfigArbiter bucketConfigArbiter() {
        return this.bucketConfigArbiter;
    }

    public boolean disconnected() {
        return this.channels.stream().allMatch(dcpChannel -> {
            return dcpChannel.isState(LifecycleState.DISCONNECTED);
        });
    }

    public Mono<Void> stop() {
        return Mono.fromCallable(() -> {
            LOGGER.debug("Shutting down...");
            this.stopped = true;
            this.configSubscription.dispose();
            this.configUpdateExecutor.shutdown();
            if (this.configUpdateExecutor.awaitTermination(this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                return null;
            }
            LOGGER.error("Config updater executor failed to terminate within {}", this.shutdownTimeout);
            return null;
        }).subscribeOn(Schedulers.boundedElastic()).then(Flux.fromIterable(this.channels).flatMap((v0) -> {
            return v0.disconnect();
        }).then()).doOnSuccess(r3 -> {
            LOGGER.info("Shutdown complete.");
        });
    }

    public int numberOfPartitions() {
        return this.currentConfig.get().numberOfPartitions();
    }

    public Flux<PartitionAndSeqno> getSeqnos() {
        return Flux.fromIterable(this.channels).flatMap(this::getSeqnosForChannel).flatMap((v0) -> {
            return Flux.fromIterable(v0);
        });
    }

    private Mono<List<PartitionAndSeqno>> getSeqnosForChannel(DcpChannel dcpChannel) {
        return Mono.just(dcpChannel).flatMap((v0) -> {
            return v0.getSeqnos();
        }).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMillis(200L)).filter(th -> {
            return th instanceof NotConnectedException;
        }).doAfterRetry(retrySignal -> {
            LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", dcpChannel);
        }));
    }

    public Mono<ByteBuf> getFailoverLog(int i) {
        return Mono.just(Integer.valueOf(i)).map(num -> {
            return activeChannelByPartition(i);
        }).flatMap(dcpChannel -> {
            return dcpChannel.getFailoverLog(i);
        }).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMillis(200L)).filter(th -> {
            return th instanceof NotConnectedException;
        }).doAfterRetry(retrySignal -> {
            LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", Integer.valueOf(i));
        }));
    }

    public Mono<Void> startStreamForPartition(int i, StreamOffset streamOffset, long j) {
        return Mono.just(Integer.valueOf(i)).map((v1) -> {
            return activeChannelByPartition(v1);
        }).flatMap(dcpChannel -> {
            return dcpChannel.getCollectionsManifest().flatMap(optional -> {
                CollectionsManifest collectionsManifest = (CollectionsManifest) optional.orElse(CollectionsManifest.DEFAULT);
                PartitionState partitionState = this.sessionState.get(i);
                partitionState.setCollectionsManifest(collectionsManifest);
                partitionState.setKeyExtractor(optional.isPresent() ? KeyExtractor.COLLECTIONS : KeyExtractor.NO_COLLECTIONS);
                partitionState.setMostRecentOpenStreamOffset(streamOffset);
                return dcpChannel.openStream(i, streamOffset, j, collectionsManifest, this.env.streamFlags());
            });
        }).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMillis(200L)).filter(th -> {
            return th instanceof NotConnectedException;
        }).doAfterRetry(retrySignal -> {
            LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", Integer.valueOf(i));
        }));
    }

    public Mono<Void> stopStreamForPartition(int i) {
        return streamIsOpen(i) ? activeChannelByPartition(i).closeStream(i) : Mono.empty();
    }

    public boolean streamIsOpen(int i) {
        return activeChannelByPartition(i).streamIsOpen(i);
    }

    private DcpChannel activeChannelByPartition(int i) {
        HostAndPort activeNodeKvAddress = this.currentConfig.get().getActiveNodeKvAddress(i);
        for (DcpChannel dcpChannel : this.channels) {
            if (dcpChannel.address().equals(activeNodeKvAddress)) {
                return dcpChannel;
            }
        }
        throw new NotConnectedException("No DcpChannel found for partition " + i);
    }

    private Map<HostAndPort, DcpChannel> channelsByAddress() {
        requireConfigUpdateThread();
        return (Map) this.channels.stream().collect(Collectors.toMap((v0) -> {
            return v0.address();
        }, dcpChannel -> {
            return dcpChannel;
        }));
    }

    private void reconfigure(DcpBucketConfig dcpBucketConfig) {
        requireConfigUpdateThread();
        if (this.stopped) {
            return;
        }
        this.metrics.incrementReconfigure();
        List<NodeInfo> kvNodes = dcpBucketConfig.getKvNodes();
        if (kvNodes.isEmpty()) {
            throw new IllegalStateException("Bucket config helper returned no data nodes");
        }
        Map<HostAndPort, DcpChannel> channelsByAddress = channelsByAddress();
        Stream<NodeInfo> stream = kvNodes.stream();
        Objects.requireNonNull(dcpBucketConfig);
        Set<HostAndPort> set = (Set) stream.map(dcpBucketConfig::getAddress).collect(Collectors.toSet());
        boolean z = false;
        for (HostAndPort hostAndPort : set) {
            if (!channelsByAddress.containsKey(hostAndPort)) {
                this.metrics.incrementAddChannel();
                add(hostAndPort);
                z = true;
            }
        }
        for (Map.Entry<HostAndPort, DcpChannel> entry : channelsByAddress.entrySet()) {
            if (!set.contains(entry.getKey())) {
                this.metrics.incrementRemoveChannel();
                remove(entry.getValue());
                z = true;
            }
        }
        if (z) {
            updateChannelGauges();
        }
    }

    private void updateChannelGauges() {
        this.metrics.registerConnectionStatusGauges(this.channels);
    }

    private void add(Collection<HostAndPort> collection) {
        requireConfigUpdateThread();
        collection.forEach(this::add);
        updateChannelGauges();
    }

    private void add(HostAndPort hostAndPort) {
        requireConfigUpdateThread();
        LOGGER.info("Adding DCP Channel against {}", RedactableArgument.system(hostAndPort));
        DcpChannel dcpChannel = new DcpChannel(hostAndPort, this.env, this, this.metrics);
        if (!this.channels.add(dcpChannel)) {
            throw new IllegalStateException("Tried to add duplicate channel: " + RedactableArgument.system(dcpChannel));
        }
        dcpChannel.connect().retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1L)).filter(th -> {
            return !this.stopped;
        }).doAfterRetry(retrySignal -> {
            LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", hostAndPort);
        })).doOnSuccess(r5 -> {
            LOGGER.debug("Completed Node connect for DCP channel {}", hostAndPort);
        }).onErrorResume(th2 -> {
            LOGGER.warn("Got error during connect (maybe retried) for node {}", RedactableArgument.system(hostAndPort), th2);
            if (this.env.eventBus() != null) {
                this.env.eventBus().publish(new FailedToAddNodeEvent(hostAndPort, th2));
            }
            return Mono.empty();
        }).subscribe();
    }

    private void remove(DcpChannel dcpChannel) {
        requireConfigUpdateThread();
        if (!this.channels.remove(dcpChannel)) {
            throw new IllegalStateException("Tried to remove unknown channel: " + RedactableArgument.system(dcpChannel));
        }
        LOGGER.info("Removing DCP Channel against {}", RedactableArgument.system(dcpChannel));
        for (int i = 0; i < dcpChannel.streamIsOpen.length(); i++) {
            if (dcpChannel.streamIsOpen(i)) {
                maybeMovePartition(i);
            }
        }
        dcpChannel.disconnect().doOnSuccess(r5 -> {
            LOGGER.debug("Channel remove notified as complete for {}", dcpChannel.address());
        }).onErrorResume(th -> {
            LOGGER.warn("Got error during Node removal for node {}", RedactableArgument.system(dcpChannel.address()), th);
            if (this.env.eventBus() != null) {
                this.env.eventBus().publish(new FailedToRemoveNodeEvent(dcpChannel.address(), th));
            }
            return Mono.empty();
        }).subscribe();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeMovePartition(int i) {
        Mono.just(Integer.valueOf(i)).delayElement(Duration.ofMillis(50L)).filter(num -> {
            PartitionState partitionState = this.sessionState.get(i);
            boolean isAtEnd = partitionState.isAtEnd();
            if (isAtEnd) {
                LOGGER.debug("Reached desired high seqno {} for vbucket {}, not reopening stream.", Long.valueOf(partitionState.getEndSeqno()), Integer.valueOf(i));
            }
            return !isAtEnd;
        }).flatMap(num2 -> {
            PartitionState partitionState = this.sessionState.get(i);
            return startStreamForPartition(i, partitionState.getOffset(), partitionState.getEndSeqno()).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMillis(200L)).filter(th -> {
                return th instanceof NotMyVbucketException;
            }));
        }).doOnSubscribe(subscription -> {
            LOGGER.debug("Subscribing for Partition Move for partition {}", Integer.valueOf(i));
        }).doOnSuccess(r5 -> {
            LOGGER.trace("Completed Partition Move for partition {}", Integer.valueOf(i));
        }).onErrorResume(th -> {
            if (th instanceof RollbackException) {
                LOGGER.warn("Rollback during Partition Move for partition {}", Integer.valueOf(i));
            } else {
                LOGGER.warn("Error during Partition Move for partition {}", Integer.valueOf(i), th);
                if (this.env.eventBus() != null) {
                    this.env.eventBus().publish(new FailedToMovePartitionEvent(i, th));
                }
            }
            return Mono.empty();
        }).subscribe();
    }
}
