package org.apache.flink.runtime.operators.lifecycle.validation;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.event.InputEndedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/validation/DrainingValidator.class */
public class DrainingValidator implements TestOperatorLifecycleValidator {
    @Override // org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator
    public void validateOperatorLifecycle(TestJobWithDescription testJobWithDescription, String str, int i, List<TestEvent> list) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        int i2 = Integer.MIN_VALUE;
        for (TestEvent testEvent : list) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(testEvent.attemptNumber), num -> {
                return new ArrayList();
            })).add(testEvent);
            if (isFinishAck(testEvent)) {
                hashSet.add(Integer.valueOf(testEvent.attemptNumber));
            }
            i2 = Math.max(i2, testEvent.attemptNumber);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            if (i2 == ((Integer) entry.getKey()).intValue() || hashSet.contains(entry.getKey())) {
                validateSubtaskAttempt(testJobWithDescription, str, i, (List) entry.getValue());
            }
        }
    }

    private void validateSubtaskAttempt(TestJobWithDescription testJobWithDescription, String str, int i, List<TestEvent> list) {
        BitSet bitSet = new BitSet();
        BitSet bitSet2 = new BitSet();
        for (TestEvent testEvent : list) {
            if (testEvent instanceof WatermarkReceivedEvent) {
                WatermarkReceivedEvent watermarkReceivedEvent = (WatermarkReceivedEvent) testEvent;
                if (watermarkReceivedEvent.ts == Watermark.MAX_WATERMARK.getTimestamp()) {
                    Assert.assertFalse(String.format("Max Watermark received twice by %s/%d/%d", watermarkReceivedEvent.operatorId, Integer.valueOf(watermarkReceivedEvent.subtaskIndex), Integer.valueOf(watermarkReceivedEvent.inputId)), bitSet2.get(watermarkReceivedEvent.inputId));
                    bitSet2.set(watermarkReceivedEvent.inputId);
                }
            } else if (testEvent instanceof InputEndedEvent) {
                InputEndedEvent inputEndedEvent = (InputEndedEvent) testEvent;
                Assert.assertTrue(String.format("Input %d ended before receiving max watermark by %s[%d]#%d", Integer.valueOf(inputEndedEvent.inputId), str, Integer.valueOf(i), Integer.valueOf(inputEndedEvent.attemptNumber)), bitSet2.get(inputEndedEvent.inputId));
                Assert.assertFalse(bitSet.get(inputEndedEvent.inputId));
                bitSet.set(inputEndedEvent.inputId);
            }
        }
        Assert.assertEquals(String.format("Incorrect number of ended inputs for %s[%d]", str, Integer.valueOf(i)), getNumInputs(testJobWithDescription, str), bitSet.cardinality());
    }

    private boolean isFinishAck(TestEvent testEvent) {
        return (testEvent instanceof TestCommandAckEvent) && ((TestCommandAckEvent) testEvent).getCommand() == TestCommand.FINISH_SOURCES;
    }

    private static int getNumInputs(TestJobWithDescription testJobWithDescription, String str) {
        Integer num = testJobWithDescription.operatorsNumberOfInputs.get(str);
        if (num != null) {
            return num.intValue();
        }
        for (JobVertex jobVertex : testJobWithDescription.jobGraph.getVertices()) {
            for (OperatorIDPair operatorIDPair : jobVertex.getOperatorIDs()) {
                if (((OperatorID) operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID())).toString().equals(str)) {
                    return jobVertex.getNumberOfInputs();
                }
            }
        }
        throw new NoSuchElementException(str);
    }
}
