package org.apache.flink.streaming.tests;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.tests.verify.TtlStateVerifier;
import org.apache.flink.streaming.tests.verify.TtlUpdateContext;
import org.apache.flink.streaming.tests.verify.TtlVerificationContext;
import org.apache.flink.streaming.tests.verify.ValueWithTs;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.class */
public class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);

    @Nonnull
    private final StateTtlConfig ttlConfig;
    private final UpdateStat stat;
    private transient Map<String, State> states;
    private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/tests/TtlVerifyUpdateFunction$UpdateStat.class */
    public static class UpdateStat implements Serializable {
        private static final long serialVersionUID = -4557720969995878873L;
        final long reportStatAfterUpdatesNum;
        long updates = 0;
        long prevUpdatesNum = 0;

        UpdateStat(long j) {
            this.reportStatAfterUpdatesNum = j;
        }

        void update(long j) {
            this.updates += TtlVerifyUpdateFunction.serialVersionUID;
            this.prevUpdatesNum += j;
            if (this.updates % this.reportStatAfterUpdatesNum == 0) {
                TtlVerifyUpdateFunction.LOG.info(String.format("Avg update chain length: %d", Long.valueOf(this.prevUpdatesNum / this.updates)));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TtlVerifyUpdateFunction(@Nonnull StateTtlConfig stateTtlConfig, long j) {
        this.ttlConfig = stateTtlConfig;
        this.stat = new UpdateStat(j);
    }

    public void flatMap(TtlStateUpdate ttlStateUpdate, Collector<String> collector) throws Exception {
        for (TtlStateVerifier<?, ?> ttlStateVerifier : TtlStateVerifier.VERIFIERS) {
            TtlVerificationContext<?, ?> generateUpdateAndVerificationContext = generateUpdateAndVerificationContext(ttlStateUpdate, ttlStateVerifier);
            if (!ttlStateVerifier.verify(generateUpdateAndVerificationContext)) {
                collector.collect("TTL verification failed: " + generateUpdateAndVerificationContext.toString());
            }
        }
    }

    private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext(TtlStateUpdate ttlStateUpdate, TtlStateVerifier<?, ?> ttlStateVerifier) throws Exception {
        List<ValueWithTs<?>> prevUpdates = getPrevUpdates(ttlStateVerifier.getId());
        TtlUpdateContext<?, ?> performUpdate = performUpdate(ttlStateVerifier, ttlStateUpdate.getUpdate(ttlStateVerifier.getId()));
        this.stat.update(prevUpdates.size());
        this.prevUpdatesByVerifierId.get(ttlStateVerifier.getId()).add(performUpdate.getUpdateWithTs());
        return new TtlVerificationContext<>(ttlStateUpdate.getKey(), ttlStateVerifier.getId(), prevUpdates, performUpdate);
    }

    private List<ValueWithTs<?>> getPrevUpdates(String str) throws Exception {
        return (List) StreamSupport.stream(((Iterable) this.prevUpdatesByVerifierId.get(str).get()).spliterator(), false).collect(Collectors.toList());
    }

    private TtlUpdateContext<?, ?> performUpdate(TtlStateVerifier<?, ?> ttlStateVerifier, Object obj) throws Exception {
        return (TtlUpdateContext) MonotonicTTLTimeProvider.doWithFrozenTime(l -> {
            State state = this.states.get(ttlStateVerifier.getId());
            Object obj2 = ttlStateVerifier.get(state);
            ttlStateVerifier.update(state, obj);
            return new TtlUpdateContext(obj2, obj, ttlStateVerifier.get(state), l.longValue());
        });
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
        this.states = (Map) TtlStateVerifier.VERIFIERS.stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, ttlStateVerifier -> {
            return ttlStateVerifier.createState(functionInitializationContext, this.ttlConfig);
        }));
        this.prevUpdatesByVerifierId = (Map) TtlStateVerifier.VERIFIERS.stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, ttlStateVerifier2 -> {
            Preconditions.checkNotNull(ttlStateVerifier2);
            return functionInitializationContext.getKeyedStateStore().getListState(new ListStateDescriptor("TtlPrevValueState_" + ttlStateVerifier2.getId(), new ValueWithTs.Serializer((TypeSerializer<?>) ttlStateVerifier2.getUpdateSerializer(), (TypeSerializer<Long>) LongSerializer.INSTANCE)));
        }));
    }

    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((TtlStateUpdate) obj, (Collector<String>) collector);
    }
}
