package org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.cloud.pubsub.v1;

import io.grpc.Status;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.AbstractApiService;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiClock;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiFutureCallback;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiFutures;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiService;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.SettableApiFuture;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.batching.FlowControlSettings;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.batching.FlowController;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.core.Distribution;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.grpc.GrpcCallContext;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.grpc.GrpcStatusCode;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ApiException;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ApiExceptionFactory;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ClientStream;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ResponseObserver;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.StreamController;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.cloud.pubsub.v1.MessageDispatcher;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.cloud.pubsub.v1.stub.SubscriberStub;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.protobuf.Empty;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.pubsub.v1.AcknowledgeRequest;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.pubsub.v1.ModifyAckDeadlineRequest;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.pubsub.v1.StreamingPullRequest;
import org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.pubsub.v1.StreamingPullResponse;
import org.threeten.bp.Duration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/examples/gcp/pubsub/shaded/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.class */
public final class StreamingSubscriberConnection extends AbstractApiService implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private final SubscriberStub stub;
    private final int channelAffinity;
    private final String subscription;
    private final ScheduledExecutorService systemExecutor;
    private final MessageDispatcher messageDispatcher;
    private final FlowControlSettings flowControlSettings;
    private ClientStream<StreamingPullRequest> clientStream;
    private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
    private final Waiter ackOperationsWaiter = new Waiter();
    private final Lock lock = new ReentrantLock();
    private final String clientId = UUID.randomUUID().toString();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/examples/gcp/pubsub/shaded/com/google/cloud/pubsub/v1/StreamingSubscriberConnection$StreamingPullResponseObserver.class */
    public class StreamingPullResponseObserver implements ResponseObserver<StreamingPullResponse> {
        final SettableApiFuture<Void> errorFuture;
        StreamController thisController;

        StreamingPullResponseObserver(SettableApiFuture<Void> settableApiFuture) {
            this.errorFuture = settableApiFuture;
        }

        @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ResponseObserver
        public void onStart(StreamController streamController) {
            this.thisController = streamController;
            this.thisController.disableAutoInboundFlowControl();
            this.thisController.request(1);
        }

        @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ResponseObserver
        public void onResponse(StreamingPullResponse streamingPullResponse) {
            StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(StreamingSubscriberConnection.INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(streamingPullResponse.getReceivedMessagesList());
            if (!StreamingSubscriberConnection.this.isAlive() || this.errorFuture.isDone()) {
                return;
            }
            StreamingSubscriberConnection.this.lock.lock();
            try {
                this.thisController.request(1);
            } catch (Exception e) {
                StreamingSubscriberConnection.logger.log(Level.WARNING, "cannot request more messages", (Throwable) e);
            } finally {
                StreamingSubscriberConnection.this.lock.unlock();
            }
        }

        @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ResponseObserver
        public void onError(Throwable th) {
            this.errorFuture.setException(th);
        }

        @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.gax.rpc.ResponseObserver
        public void onComplete() {
            StreamingSubscriberConnection.logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }

    public StreamingSubscriberConnection(String str, MessageReceiver messageReceiver, Duration duration, Duration duration2, Duration duration3, Distribution distribution, SubscriberStub subscriberStub, int i, FlowControlSettings flowControlSettings, FlowController flowController, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, ApiClock apiClock) {
        this.subscription = str;
        this.systemExecutor = scheduledExecutorService2;
        this.stub = subscriberStub;
        this.channelAffinity = i;
        this.messageDispatcher = new MessageDispatcher(messageReceiver, this, duration, duration2, duration3, distribution, flowController, scheduledExecutorService, scheduledExecutorService2, apiClock);
        this.flowControlSettings = flowControlSettings;
    }

    @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.AbstractApiService
    protected void doStart() {
        logger.config("Starting subscriber.");
        this.messageDispatcher.start();
        initialize();
        notifyStarted();
    }

    @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.AbstractApiService
    protected void doStop() {
        this.messageDispatcher.stop();
        this.ackOperationsWaiter.waitComplete();
        this.lock.lock();
        try {
            this.clientStream.closeSendWithError(Status.CANCELLED.asException());
        } finally {
            this.lock.unlock();
            notifyStopped();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initialize() {
        SettableApiFuture create = SettableApiFuture.create();
        ClientStream<StreamingPullRequest> splitCall = this.stub.streamingPullCallable().splitCall(new StreamingPullResponseObserver(create), GrpcCallContext.createDefault().withChannelAffinity(Integer.valueOf(this.channelAffinity)));
        logger.log(Level.FINER, "Initializing stream to subscription {0}", this.subscription);
        splitCall.send(StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(60).setClientId(this.clientId).setMaxOutstandingMessages(this.flowControlSettings.getMaxOutstandingElementCount().longValue()).setMaxOutstandingBytes(this.flowControlSettings.getMaxOutstandingRequestBytes().longValue()).build());
        this.lock.lock();
        try {
            this.clientStream = splitCall;
            this.lock.unlock();
            ApiFutures.addCallback(create, new ApiFutureCallback<Void>() { // from class: org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.cloud.pubsub.v1.StreamingSubscriberConnection.1
                @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiFutureCallback
                public void onSuccess(@Nullable Void r5) {
                    if (StreamingSubscriberConnection.this.isAlive()) {
                        StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(StreamingSubscriberConnection.INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
                        StreamingSubscriberConnection.this.initialize();
                    }
                }

                @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiFutureCallback
                public void onFailure(Throwable th) {
                    if (!StreamingSubscriberConnection.this.isAlive()) {
                        StreamingSubscriberConnection.logger.log(Level.FINE, "pull failure after service no longer running", th);
                        return;
                    }
                    if (!StatusUtil.isRetryable(th)) {
                        ApiException createException = ApiExceptionFactory.createException(th, GrpcStatusCode.of(Status.fromThrowable(th).getCode()), false);
                        StreamingSubscriberConnection.logger.log(Level.SEVERE, "terminated streaming with exception", (Throwable) createException);
                        StreamingSubscriberConnection.this.notifyFailed(createException);
                    } else {
                        StreamingSubscriberConnection.logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", th);
                        long j = StreamingSubscriberConnection.this.channelReconnectBackoffMillis.get();
                        StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(Math.min(j * 2, StreamingSubscriberConnection.MAX_CHANNEL_RECONNECT_BACKOFF.toMillis()));
                        StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable() { // from class: org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.cloud.pubsub.v1.StreamingSubscriberConnection.1.1
                            @Override // java.lang.Runnable
                            public void run() {
                                StreamingSubscriberConnection.this.initialize();
                            }
                        }, j, TimeUnit.MILLISECONDS);
                    }
                }
            }, MoreExecutors.directExecutor());
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAlive() {
        ApiService.State state = state();
        return state == ApiService.State.RUNNING || state == ApiService.State.STARTING;
    }

    @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor
    public void sendAckOperations(List<String> list, List<MessageDispatcher.PendingModifyAckDeadline> list2) {
        ApiFutureCallback<Empty> apiFutureCallback = new ApiFutureCallback<Empty>() { // from class: org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.cloud.pubsub.v1.StreamingSubscriberConnection.2
            @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiFutureCallback
            public void onSuccess(Empty empty) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
            }

            @Override // org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google.api.core.ApiFutureCallback
            public void onFailure(Throwable th) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
                StreamingSubscriberConnection.logger.log(StreamingSubscriberConnection.this.isAlive() ? Level.WARNING : Level.FINER, "failed to send operations", th);
            }
        };
        int i = 0;
        for (MessageDispatcher.PendingModifyAckDeadline pendingModifyAckDeadline : list2) {
            Iterator it = Lists.partition(pendingModifyAckDeadline.ackIds, MAX_PER_REQUEST_CHANGES).iterator();
            while (it.hasNext()) {
                ApiFutures.addCallback(this.stub.modifyAckDeadlineCallable().futureCall(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((List) it.next()).setAckDeadlineSeconds(pendingModifyAckDeadline.deadlineExtensionSeconds).build()), apiFutureCallback, MoreExecutors.directExecutor());
                i++;
            }
        }
        Iterator it2 = Lists.partition(list, MAX_PER_REQUEST_CHANGES).iterator();
        while (it2.hasNext()) {
            ApiFutures.addCallback(this.stub.acknowledgeCallable().futureCall(AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((List) it2.next()).build()), apiFutureCallback, MoreExecutors.directExecutor());
            i++;
        }
        this.ackOperationsWaiter.incrementPendingCount(i);
    }
}
