package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.file.FileVisitOption;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.LogLevelRule;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.class */
public class UnalignedCheckpointStressITCase extends TestLogger {
    private static final int CHECKPOINT_INTERVAL = 20;
    private static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 2;
    private static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 10;
    private static final int NUM_TASK_MANAGERS = 3;
    private static final int NUM_TASK_SLOTS = 2;
    private static final int PARALLELISM = 6;
    private static final int BUFFER_SIZE = 4096;
    private static final int BUFFER_TIME = 4;
    private static final int NORMAL_RECORD_SLEEP = 1;
    private static final int SMALL_RECORD_SIZE = 1024;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public TemporaryFolder changelogFolder = new TemporaryFolder();
    private MiniClusterWithClientResource cluster;
    private static final long TEST_DURATION = Time.seconds(20).toMilliseconds();

    @ClassRule
    public static final LogLevelRule NETWORK_LOGGER = new LogLevelRule().set(NetworkActionsLogger.class, Level.TRACE);
    private static final Pattern LAST_INT_PATTERN = Pattern.compile("[^0-9]+([0-9]+)$");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$FailingMapper.class */
    public static class FailingMapper implements MapFunction<Record, Record>, CheckpointListener {

        @Nullable
        private Long firstCompletedCheckpoint;

        @Nullable
        private Record lastProcessedRecord;
        private final int completedCheckpointsBeforeFailure;

        private FailingMapper() {
            this.completedCheckpointsBeforeFailure = ThreadLocalRandom.current().nextInt(2, 11);
        }

        public Record map(Record record) throws Exception {
            this.lastProcessedRecord = record;
            return record;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (this.firstCompletedCheckpoint == null) {
                this.firstCompletedCheckpoint = Long.valueOf(j);
            }
            if (this.completedCheckpointsBeforeFailure <= j - this.firstCompletedCheckpoint.longValue()) {
                throw new ExpectedTestException(this.lastProcessedRecord == null ? "no record" : this.lastProcessedRecord.toString());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$LegacySourceFunction.class */
    public static class LegacySourceFunction extends AbstractRichFunction implements ParallelSourceFunction<Record>, CheckpointedFunction {
        private final int sourceIdOffset;
        private long nextValue;
        private ListState<Long> nextState;
        private volatile boolean running = true;

        public LegacySourceFunction(int i) {
            this.sourceIdOffset = i;
        }

        public void run(SourceFunction.SourceContext<Record> sourceContext) throws Exception {
            RecordGenerator recordGenerator = new RecordGenerator(getRuntimeContext().getIndexOfThisSubtask() + this.sourceIdOffset);
            while (this.running) {
                Record next = recordGenerator.next(this.nextValue);
                synchronized (sourceContext.getCheckpointLock()) {
                    this.nextValue++;
                    sourceContext.collect(next);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.nextState.clear();
            this.nextState.add(Long.valueOf(this.nextValue));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.nextState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("state", Long.class));
            this.nextValue = ((Long) Objects.requireNonNull(Iterables.getOnlyElement((Iterable) this.nextState.get(), 0L))).longValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$MaxCheckpointFileVisitor.class */
    public static class MaxCheckpointFileVisitor extends SimpleFileVisitor<Path> {
        private Path maxCheckpointDir;

        private MaxCheckpointFileVisitor() {
        }

        @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
        public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) {
            if (path.endsWith("_metadata")) {
                if ((this.maxCheckpointDir == null ? -1 : UnalignedCheckpointStressITCase.getCheckpointNumberFromPath(this.maxCheckpointDir)) < UnalignedCheckpointStressITCase.getCheckpointNumberFromPath(path.getParent())) {
                    this.maxCheckpointDir = path.getParent();
                }
            }
            return FileVisitResult.CONTINUE;
        }

        @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
        public FileVisitResult visitFileFailed(Path path, IOException iOException) throws IOException {
            if (iOException instanceof NoSuchFileException) {
                return FileVisitResult.CONTINUE;
            }
            throw iOException;
        }

        public Path getMaxCheckpointDir() {
            return this.maxCheckpointDir;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$ReEmitAll.class */
    public static class ReEmitAll extends ProcessWindowFunction<Record, Record, Integer, TimeWindow> {
        private ReEmitAll() {
        }

        public void process(Integer num, ProcessWindowFunction<Record, Record, Integer, TimeWindow>.Context context, Iterable<Record> iterable, Collector<Record> collector) {
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }

        public /* bridge */ /* synthetic */ void process(Object obj, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
            process((Integer) obj, (ProcessWindowFunction<Record, Record, Integer, TimeWindow>.Context) context, (Iterable<Record>) iterable, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$Record.class */
    public static class Record implements Serializable {
        public int sourceId;
        public long value;
        public byte[] payload;

        public Record(int i, long j, int i2) {
            this.sourceId = i;
            this.payload = new byte[i2];
            this.value = j;
            for (int i3 = 0; i3 < this.payload.length; i3 += UnalignedCheckpointStressITCase.NORMAL_RECORD_SLEEP) {
                this.payload[i3] = payloadAt(i3);
            }
        }

        public int getSourceId() {
            return this.sourceId;
        }

        public long getValue() {
            return this.value;
        }

        public Record validate() {
            for (int i = 0; i < this.payload.length; i += UnalignedCheckpointStressITCase.NORMAL_RECORD_SLEEP) {
                Preconditions.checkState(this.payload[i] == payloadAt(i), "Expected %s at position %s, but found %s in %s", Byte.valueOf(payloadAt(i)), Integer.valueOf(i), Byte.valueOf(this.payload[i]), this);
            }
            return this;
        }

        private byte payloadAt(int i) {
            return (byte) ((this.value + i) % 128);
        }

        public String toString() {
            return String.format("Record(sourceId=%d, payload.length=%d, value=%d)", Integer.valueOf(this.sourceId), Integer.valueOf(this.payload.length), Long.valueOf(this.value));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$RecordGenerator.class */
    private static class RecordGenerator {
        private final int sourceId;
        private final SizeMode sizeMode;
        private final SpeedMode speedMode;

        public RecordGenerator(int i) {
            this.sourceId = i;
            this.sizeMode = SizeMode.valueOf(i);
            this.speedMode = SpeedMode.valueOf(i);
        }

        public Record next(long j) throws InterruptedException {
            int sleep = this.speedMode.getSleep();
            if (sleep > 0) {
                Thread.sleep(sleep);
            }
            return new Record(this.sourceId, j, this.sizeMode.getSize());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$SizeMode.class */
    public enum SizeMode {
        SMALL { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SizeMode.1
            @Override // org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SizeMode
            public int getSize() {
                return UnalignedCheckpointStressITCase.SMALL_RECORD_SIZE;
            }
        },
        LARGE { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SizeMode.2
            @Override // org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SizeMode
            public int getSize() {
                return UnalignedCheckpointStressITCase.BUFFER_SIZE;
            }
        },
        RANDOM { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SizeMode.3
            @Override // org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SizeMode
            public int getSize() {
                return (ThreadLocalRandom.current().nextInt(UnalignedCheckpointStressITCase.BUFFER_TIME) * UnalignedCheckpointStressITCase.SMALL_RECORD_SIZE) + UnalignedCheckpointStressITCase.SMALL_RECORD_SIZE;
            }
        };

        public static SizeMode valueOf(int i) {
            Preconditions.checkState(i >= 0);
            return values()[i % values().length];
        }

        public abstract int getSize();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$SpeedMode.class */
    public enum SpeedMode {
        SLOW { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode.1
            @Override // org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode
            public int getSleep() {
                return ThreadLocalRandom.current().nextInt(UnalignedCheckpointStressITCase.MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES);
            }
        },
        NORMAL { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode.2
            @Override // org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode
            public int getSleep() {
                return ThreadLocalRandom.current().nextInt(2);
            }
        },
        FAST { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode.3
            @Override // org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode
            public int getSleep() {
                if (ThreadLocalRandom.current().nextInt(UnalignedCheckpointStressITCase.MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES) == 0) {
                    return UnalignedCheckpointStressITCase.NORMAL_RECORD_SLEEP;
                }
                return 0;
            }
        },
        BURST { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode.4
            @Override // org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.SpeedMode
            public int getSleep() {
                if (ThreadLocalRandom.current().nextInt(1000) == 0) {
                    return 1000 * UnalignedCheckpointStressITCase.NORMAL_RECORD_SLEEP;
                }
                return 0;
            }
        };

        public abstract int getSleep();

        public static SpeedMode valueOf(int i) {
            Preconditions.checkState(i >= 0);
            return values()[i % values().length];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$ThrottlingMap.class */
    public static class ThrottlingMap implements MapFunction<Record, Record> {
        private final int chance;

        public ThrottlingMap(int i) {
            this.chance = i;
        }

        public Record map(Record record) throws Exception {
            if (ThreadLocalRandom.current().nextInt(this.chance) == 0) {
                Thread.sleep(1L);
            }
            return record.validate();
        }
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, this.temporaryFolder.getRoot().toURI().toString());
        configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, Integer.valueOf(NORMAL_RECORD_SLEEP));
        FsStateChangelogStorageFactory.configure(configuration, this.changelogFolder.newFolder(), Duration.ofMinutes(1L), MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(NUM_TASK_MANAGERS).setNumberSlotsPerTaskManager(2).build());
        this.cluster.before();
    }

    @After
    public void shutDownExistingCluster() {
        if (this.cluster != null) {
            this.cluster.after();
            this.cluster = null;
        }
    }

    @Test
    public void runStressTest() throws Exception {
        Deadline fromNow = Deadline.fromNow(Duration.ofMillis(TEST_DURATION));
        File file = null;
        while (fromNow.hasTimeLeft()) {
            file = runAndTakeExternalCheckpoint(file);
            cleanDirectoryExcept(file);
        }
    }

    private void testProgram(StreamExecutionEnvironment streamExecutionEnvironment) {
        int i = PARALLELISM + NUM_TASK_MANAGERS + 2 + NORMAL_RECORD_SLEEP;
        DataStreamUtils.reinterpretAsKeyedStream(streamExecutionEnvironment.addSource(new LegacySourceFunction(0)).setParallelism(PARALLELISM).union(new DataStream[]{streamExecutionEnvironment.addSource(new LegacySourceFunction(PARALLELISM)).setParallelism(NUM_TASK_MANAGERS)}).keyBy((v0) -> {
            return v0.getSourceId();
        }).connect(streamExecutionEnvironment.addSource(new LegacySourceFunction(PARALLELISM + NUM_TASK_MANAGERS)).setParallelism(2).union(new DataStream[]{streamExecutionEnvironment.addSource(new LegacySourceFunction(PARALLELISM + NUM_TASK_MANAGERS + 2)).setParallelism(NORMAL_RECORD_SLEEP)}).keyBy((v0) -> {
            return v0.getSourceId();
        })).process(new KeyedCoProcessFunction<Integer, Record, Record, Record>() { // from class: org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase.1
            public void processElement1(Record record, KeyedCoProcessFunction<Integer, Record, Record, Record>.Context context, Collector<Record> collector) {
                collector.collect(record.validate());
            }

            public void processElement2(Record record, KeyedCoProcessFunction<Integer, Record, Record, Record>.Context context, Collector<Record> collector) {
                collector.collect(record.validate());
            }

            public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
                processElement2((Record) obj, (KeyedCoProcessFunction<Integer, Record, Record, Record>.Context) context, (Collector<Record>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
                processElement1((Record) obj, (KeyedCoProcessFunction<Integer, Record, Record, Record>.Context) context, (Collector<Record>) collector);
            }
        }).rebalance().map((v0) -> {
            return v0.validate();
        }).keyBy((v0) -> {
            return v0.getSourceId();
        }).map(new ThrottlingMap(100)), (v0) -> {
            return v0.getSourceId();
        }).window(TumblingProcessingTimeWindows.of(Time.milliseconds(5L))).process(new ReEmitAll()).map(new ThrottlingMap(Math.max(NORMAL_RECORD_SLEEP, i - 2))).setParallelism(NORMAL_RECORD_SLEEP).map(new FailingMapper()).setParallelism(NORMAL_RECORD_SLEEP);
    }

    private void cleanDirectoryExcept(File file) throws IOException {
        File parentFile = file.getParentFile();
        File[] fileArr = (File[]) Objects.requireNonNull(this.temporaryFolder.getRoot().listFiles());
        int length = fileArr.length;
        for (int i = 0; i < length; i += NORMAL_RECORD_SLEEP) {
            File file2 = fileArr[i];
            if (!file2.equals(parentFile)) {
                FileUtils.deleteDirectory(file2);
            }
        }
    }

    private File runAndTakeExternalCheckpoint(@Nullable File file) throws Exception {
        StreamExecutionEnvironment defineEnvironment = defineEnvironment();
        testProgram(defineEnvironment);
        StreamGraph streamGraph = defineEnvironment.getStreamGraph();
        Optional map = Optional.ofNullable(file).map((v0) -> {
            return v0.toString();
        }).map(SavepointRestoreSettings::forPath);
        streamGraph.getClass();
        map.ifPresent(streamGraph::setSavepointRestoreSettings);
        try {
            TestUtils.submitJobAndWaitForResult(this.cluster.getClusterClient(), streamGraph.getJobGraph(), getClass().getClassLoader());
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowable(e, ExpectedTestException.class).isPresent()) {
                throw e;
            }
        }
        return discoverRetainedCheckpoint();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int getCheckpointNumberFromPath(Path path) {
        Matcher matcher = LAST_INT_PATTERN.matcher(path.toString());
        Preconditions.checkState(matcher.find());
        return Integer.parseInt(matcher.group(NORMAL_RECORD_SLEEP));
    }

    private File discoverRetainedCheckpoint() throws Exception {
        File root = this.temporaryFolder.getRoot();
        Path path = null;
        for (int i = 0; i <= 1000 && path == null; i += NORMAL_RECORD_SLEEP) {
            Thread.sleep(5L);
            MaxCheckpointFileVisitor maxCheckpointFileVisitor = new MaxCheckpointFileVisitor();
            Files.walkFileTree(Paths.get(root.getPath(), new String[0]), maxCheckpointFileVisitor);
            path = maxCheckpointFileVisitor.getMaxCheckpointDir();
        }
        if (path != null) {
            return path.toFile();
        }
        Stream<Path> walk = Files.walk(Paths.get(root.getPath(), new String[0]), new FileVisitOption[0]);
        Throwable th = null;
        try {
            throw new IllegalStateException("Failed to find _metadata file among " + ((List) walk.collect(Collectors.toList())));
        } catch (Throwable th2) {
            if (walk != null) {
                if (0 != 0) {
                    try {
                        walk.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    walk.close();
                }
            }
            throw th2;
        }
    }

    private StreamExecutionEnvironment defineEnvironment() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.enableCheckpointing(20L);
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints();
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        return executionEnvironment;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1421272810:
                if (implMethodName.equals("validate")) {
                    z = NORMAL_RECORD_SLEEP;
                    break;
                }
                break;
            case -470739956:
                if (implMethodName.equals("getSourceId")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$Record") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getSourceId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$Record") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getSourceId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$Record") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getSourceId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$Record") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getSourceId();
                    };
                }
                break;
            case NORMAL_RECORD_SLEEP /* 1 */:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$Record") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase$Record;")) {
                    return (v0) -> {
                        return v0.validate();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
