package org.apache.flink.streaming.tests;

import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;

/* loaded from: input_file:org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.class */
public class StatefulStreamJobUpgradeTestProgram {
    private static final String TEST_JOB_VARIANT_ORIGINAL = "original";
    private static final String TEST_JOB_VARIANT_UPGRADED = "upgraded";
    private static final ConfigOption<String> TEST_JOB_VARIANT = ConfigOptions.key("test.job.variant").defaultValue(TEST_JOB_VARIANT_ORIGINAL).withDescription(String.format("This configures the job variant to test. Can be '%s' or '%s'", TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$DowngradeEvent.class */
    public static class DowngradeEvent implements MapFunction<UpgradedEvent, Event> {
        private DowngradeEvent() {
        }

        public Event map(UpgradedEvent upgradedEvent) throws Exception {
            return upgradedEvent.event;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradeEvent.class */
    public static class UpgradeEvent implements MapFunction<Event, UpgradedEvent> {
        private UpgradeEvent() {
        }

        public UpgradedEvent map(Event event) {
            return new UpgradedEvent(event);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradedEvent.class */
    public static class UpgradedEvent {
        public final Event event;
        public final String randomPayload;

        public UpgradedEvent(Event event) {
            this.event = event;
            this.randomPayload = event.getPayload().toUpperCase();
        }

        public int getKey() {
            return this.event.getKey();
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamAllroundTestJobFactory.setupEnvironment(executionEnvironment, fromArgs);
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(true);
        if (isOriginalJobVariant(fromArgs)) {
            executeOriginalVariant(executionEnvironment, fromArgs);
        } else {
            executeUpgradedVariant(executionEnvironment, fromArgs);
        }
    }

    private static void executeOriginalVariant(StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool parameterTool) throws Exception {
        applyOriginalStatefulOperations(streamExecutionEnvironment.addSource(DataStreamAllroundTestJobFactory.createEventSource(parameterTool)).name("EventSource").uid("EventSource").assignTimestampsAndWatermarks(DataStreamAllroundTestJobFactory.createTimestampExtractor(parameterTool)).keyBy((v0) -> {
            return v0.getKey();
        }), Collections.singletonList(new KryoSerializer(ComplexPayload.class, streamExecutionEnvironment.getConfig())), Collections.emptyList()).flatMap(DataStreamAllroundTestJobFactory.createSemanticsCheckMapper(parameterTool)).name("SemanticsCheckMapper").addSink(new PrintSinkFunction());
        streamExecutionEnvironment.execute("General purpose test job");
    }

    private static void executeUpgradedVariant(StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool parameterTool) throws Exception {
        applyUpgradedStatefulOperations(streamExecutionEnvironment.addSource(DataStreamAllroundTestJobFactory.createEventSource(parameterTool)).name("EventSource").uid("EventSource").assignTimestampsAndWatermarks(DataStreamAllroundTestJobFactory.createTimestampExtractor(parameterTool)).map(new UpgradeEvent()).keyBy((v0) -> {
            return v0.getKey();
        }), Collections.singletonList(new KryoSerializer(ComplexPayload.class, streamExecutionEnvironment.getConfig())), Collections.emptyList()).map(new DowngradeEvent()).keyBy((v0) -> {
            return v0.getKey();
        }).flatMap(DataStreamAllroundTestJobFactory.createSemanticsCheckMapper(parameterTool)).name("SemanticsCheckMapper").addSink(new PrintSinkFunction());
        streamExecutionEnvironment.execute("General purpose test job");
    }

    private static boolean isOriginalJobVariant(ParameterTool parameterTool) {
        String str = parameterTool.get(TEST_JOB_VARIANT.key());
        boolean z = -1;
        switch (str.hashCode()) {
            case 1379043793:
                if (str.equals(TEST_JOB_VARIANT_ORIGINAL)) {
                    z = false;
                    break;
                }
                break;
            case 1423616456:
                if (str.equals(TEST_JOB_VARIANT_UPGRADED)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return true;
            case true:
                return false;
            default:
                throw new IllegalArgumentException(String.format("'--test.job.variant' can be either '%s' or '%s'", TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));
        }
    }

    private static KeyedStream<Event, Integer> applyOriginalStatefulOperations(KeyedStream<Event, Integer> keyedStream, List<TypeSerializer<ComplexPayload>> list, List<Class<ComplexPayload>> list2) {
        return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), keyedStream, list, list2), list, list2);
    }

    private static KeyedStream<Event, Integer> applyTestStatefulOperator(String str, JoinFunction<Event, ComplexPayload, ComplexPayload> joinFunction, KeyedStream<Event, Integer> keyedStream, List<TypeSerializer<ComplexPayload>> list, List<Class<ComplexPayload>> list2) {
        return keyedStream.map(DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper(event -> {
            return event;
        }, joinFunction, list, list2)).name(str).uid(str).returns(Event.class).keyBy((v0) -> {
            return v0.getKey();
        });
    }

    private static KeyedStream<UpgradedEvent, Integer> applyUpgradedStatefulOperations(KeyedStream<UpgradedEvent, Integer> keyedStream, List<TypeSerializer<ComplexPayload>> list, List<Class<ComplexPayload>> list2) {
        return applyUpgradedTestStatefulOperator("stateMap3", simpleUpgradedStateUpdate("stateMap3"), applyUpgradedTestStatefulOperator("stateMap1", lastUpgradedStateUpdate("stateMap1"), applyUpgradedTestStatefulOperator("stateMap2", simpleUpgradedStateUpdate("stateMap2"), keyedStream, list, list2), list, list2), list, list2);
    }

    private static KeyedStream<UpgradedEvent, Integer> applyUpgradedTestStatefulOperator(String str, JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> joinFunction, KeyedStream<UpgradedEvent, Integer> keyedStream, List<TypeSerializer<ComplexPayload>> list, List<Class<ComplexPayload>> list2) {
        return keyedStream.map(DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper(upgradedEvent -> {
            return upgradedEvent;
        }, joinFunction, list, list2)).name(str).uid(str).returns(UpgradedEvent.class).keyBy((v0) -> {
            return v0.getKey();
        });
    }

    private static JoinFunction<Event, ComplexPayload, ComplexPayload> simpleStateUpdate(String str) {
        return (event, complexPayload) -> {
            verifyState(str, complexPayload);
            return new ComplexPayload(event, str);
        };
    }

    private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> simpleUpgradedStateUpdate(String str) {
        return (upgradedEvent, complexPayload) -> {
            verifyState(str, complexPayload);
            return new ComplexPayload(upgradedEvent.event, str);
        };
    }

    private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String str) {
        return (event, complexPayload) -> {
            verifyState(str, complexPayload);
            return complexPayload != null && (event.getEventTime() > complexPayload.getEventTime() ? 1 : (event.getEventTime() == complexPayload.getEventTime() ? 0 : -1)) <= 0 ? complexPayload : new ComplexPayload(event, str);
        };
    }

    private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> lastUpgradedStateUpdate(String str) {
        return (upgradedEvent, complexPayload) -> {
            verifyState(str, complexPayload);
            return complexPayload != null && (upgradedEvent.event.getEventTime() > complexPayload.getEventTime() ? 1 : (upgradedEvent.event.getEventTime() == complexPayload.getEventTime() ? 0 : -1)) <= 0 ? complexPayload : new ComplexPayload(upgradedEvent.event, str);
        };
    }

    private static void verifyState(String str, ComplexPayload complexPayload) {
        if (complexPayload == null || complexPayload.getStrPayload().equals(str)) {
            return;
        }
        System.out.println("State is set or restored incorrectly");
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1746514167:
                if (implMethodName.equals("lambda$simpleStateUpdate$297cb398$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = true;
                    break;
                }
                break;
            case -530858051:
                if (implMethodName.equals("lambda$lastUpgradedStateUpdate$4ec0f1bd$1")) {
                    z = 3;
                    break;
                }
                break;
            case -299631105:
                if (implMethodName.equals("lambda$applyTestStatefulOperator$877f045d$1")) {
                    z = 4;
                    break;
                }
                break;
            case -160506872:
                if (implMethodName.equals("lambda$applyUpgradedTestStatefulOperator$c21a3f22$1")) {
                    z = 2;
                    break;
                }
                break;
            case 821164077:
                if (implMethodName.equals("lambda$lastStateUpdate$297cb398$1")) {
                    z = 5;
                    break;
                }
                break;
            case 2070510233:
                if (implMethodName.equals("lambda$simpleUpgradedStateUpdate$4ec0f1bd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/JoinFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("join") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradedEvent;Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;)Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return (upgradedEvent, complexPayload) -> {
                        verifyState(str, complexPayload);
                        return new ComplexPayload(upgradedEvent.event, str);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/Event") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradedEvent") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/Event") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/Event") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradedEvent") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradedEvent;)Lorg/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradedEvent;")) {
                    return upgradedEvent2 -> {
                        return upgradedEvent2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/JoinFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("join") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram$UpgradedEvent;Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;)Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;")) {
                    String str2 = (String) serializedLambda.getCapturedArg(0);
                    return (upgradedEvent3, complexPayload2) -> {
                        verifyState(str2, complexPayload2);
                        return complexPayload2 != null && (upgradedEvent3.event.getEventTime() > complexPayload2.getEventTime() ? 1 : (upgradedEvent3.event.getEventTime() == complexPayload2.getEventTime() ? 0 : -1)) <= 0 ? complexPayload2 : new ComplexPayload(upgradedEvent3.event, str2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/tests/Event;)Lorg/apache/flink/streaming/tests/Event;")) {
                    return event -> {
                        return event;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/JoinFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("join") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/streaming/tests/Event;Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;)Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;")) {
                    String str3 = (String) serializedLambda.getCapturedArg(0);
                    return (event2, complexPayload3) -> {
                        verifyState(str3, complexPayload3);
                        return complexPayload3 != null && (event2.getEventTime() > complexPayload3.getEventTime() ? 1 : (event2.getEventTime() == complexPayload3.getEventTime() ? 0 : -1)) <= 0 ? complexPayload3 : new ComplexPayload(event2, str3);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/JoinFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("join") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/streaming/tests/Event;Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;)Lorg/apache/flink/streaming/tests/artificialstate/ComplexPayload;")) {
                    String str4 = (String) serializedLambda.getCapturedArg(0);
                    return (event3, complexPayload4) -> {
                        verifyState(str4, complexPayload4);
                        return new ComplexPayload(event3, str4);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
