package org.apache.flink.streaming.examples.statemachine.generator;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.requests.FetchRequest;

/* loaded from: input_file:org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.class */
public class EventsGeneratorSource extends RichParallelSourceFunction<Event> {
    private final double errorProbability;
    private final int delayPerRecordMillis;
    private volatile boolean running = true;

    public EventsGeneratorSource(double d, int i) {
        Preconditions.checkArgument(d >= 0.0d && d <= 1.0d, "error probability must be in [0.0, 1.0]");
        Preconditions.checkArgument(i >= 0, "delay must be >= 0");
        this.errorProbability = d;
        this.delayPerRecordMillis = i;
    }

    public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
        EventsGenerator eventsGenerator = new EventsGenerator(this.errorProbability);
        int numberOfParallelSubtasks = FetchRequest.DEFAULT_RESPONSE_MAX_BYTES / getRuntimeContext().getNumberOfParallelSubtasks();
        int indexOfThisSubtask = numberOfParallelSubtasks * getRuntimeContext().getIndexOfThisSubtask();
        int i = indexOfThisSubtask + numberOfParallelSubtasks;
        while (this.running) {
            sourceContext.collect(eventsGenerator.next(indexOfThisSubtask, i));
            if (this.delayPerRecordMillis > 0) {
                Thread.sleep(this.delayPerRecordMillis);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }
}
