package org.apache.flink.streaming.tests;

import java.util.Iterator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/* loaded from: input_file:org/apache/flink/streaming/tests/PeriodicStreamingJob.class */
public class PeriodicStreamingJob {

    /* loaded from: input_file:org/apache/flink/streaming/tests/PeriodicStreamingJob$PeriodicSourceGenerator.class */
    public static class PeriodicSourceGenerator implements SourceFunction<Tuple>, ResultTypeQueryable<Tuple>, CheckpointedFunction {
        private final int sleepMs;
        private final int durationMs;
        private final int offsetSeconds;
        private long ms = 0;
        private ListState<Long> state = null;

        public PeriodicSourceGenerator(float f, int i, int i2) {
            this.durationMs = i * 1000;
            this.sleepMs = (int) (1000.0f / f);
            this.offsetSeconds = i2;
        }

        public void run(SourceFunction.SourceContext<Tuple> sourceContext) throws Exception {
            long j = this.offsetSeconds * 1000;
            while (this.ms < this.durationMs) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new Tuple2(Long.valueOf(this.ms + j), "key"));
                }
                this.ms += this.sleepMs;
                Thread.sleep(this.sleepMs);
            }
        }

        public void cancel() {
        }

        public TypeInformation<Tuple> getProducedType() {
            return Types.TUPLE(new TypeInformation[]{Types.LONG, Types.STRING});
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.state = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("state", LongSerializer.INSTANCE));
            Iterator it = ((Iterable) this.state.get()).iterator();
            while (it.hasNext()) {
                this.ms += ((Long) it.next()).longValue();
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.state.clear();
            this.state.add(Long.valueOf(this.ms));
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        String required = fromArgs.getRequired("outputPath");
        int i = fromArgs.getInt("recordsPerSecond", 10);
        int i2 = fromArgs.getInt("durationInSecond", 60);
        int i3 = fromArgs.getInt("offsetInSecond", 0);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(4000L);
        executionEnvironment.getConfig().setAutoWatermarkInterval(1000L);
        executionEnvironment.addSource(new PeriodicSourceGenerator(i, i2, i3)).keyBy(new int[]{1}).window(TumblingProcessingTimeWindows.of(Time.seconds(5L))).sum(0).writeAsText(required + "/result.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        executionEnvironment.execute();
    }
}
