package org.apache.flink.deployment;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/deployment/HeavyDeploymentStressTestProgram.class */
public class HeavyDeploymentStressTestProgram {
    private static final ConfigOption<Integer> NUM_LIST_STATES_PER_OP = ConfigOptions.key("heavy_deployment_test.num_list_states_per_op").intType().defaultValue(100);
    private static final ConfigOption<Integer> NUM_PARTITIONS_PER_LIST_STATE = ConfigOptions.key("heavy_deployment_test.num_partitions_per_list_state").intType().defaultValue(100);

    /* loaded from: input_file:org/apache/flink/deployment/HeavyDeploymentStressTestProgram$SimpleEndlessSourceWithBloatedState.class */
    static class SimpleEndlessSourceWithBloatedState extends RichParallelSourceFunction<String> implements CheckpointedFunction, CheckpointListener {
        private static final long serialVersionUID = 1;
        private final int numListStates;
        private final int numPartitionsPerListState;
        private volatile transient boolean isRunning;
        private volatile transient boolean readyToFail;

        SimpleEndlessSourceWithBloatedState(int i, int i2) {
            this.numListStates = i;
            this.numPartitionsPerListState = i2;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.readyToFail = false;
            if (functionInitializationContext.isRestored()) {
                this.isRunning = false;
                return;
            }
            this.isRunning = true;
            OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();
            for (int i = 0; i < this.numListStates; i++) {
                ListState unionListState = operatorStateStore.getUnionListState(new ListStateDescriptor("test-list-state-" + i, String.class));
                for (int i2 = 0; i2 < this.numPartitionsPerListState; i2++) {
                    unionListState.add(String.valueOf(i2));
                }
            }
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.isRunning) {
                if (this.readyToFail && getRuntimeContext().getIndexOfThisSubtask() == 0) {
                    throw new Exception("Artificial failure.");
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect("test-element");
                }
                Thread.sleep(serialVersionUID);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long j) {
            this.readyToFail = true;
        }

        public void notifyCheckpointAborted(long j) {
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamAllroundTestJobFactory.setupEnvironment(executionEnvironment, fromArgs);
        int i = fromArgs.getInt(NUM_LIST_STATES_PER_OP.key(), ((Integer) NUM_LIST_STATES_PER_OP.defaultValue()).intValue());
        int i2 = fromArgs.getInt(NUM_PARTITIONS_PER_LIST_STATE.key(), ((Integer) NUM_PARTITIONS_PER_LIST_STATE.defaultValue()).intValue());
        Preconditions.checkState(executionEnvironment.getCheckpointInterval() > 0, "Checkpointing must be enabled for this test!");
        executionEnvironment.addSource(new SimpleEndlessSourceWithBloatedState(i, i2)).setParallelism(executionEnvironment.getParallelism()).addSink(new DiscardingSink()).setParallelism(1);
        executionEnvironment.execute("HeavyDeploymentStressTestProgram");
    }
}
