package org.apache.flink.runtime.jobmaster;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolServiceFactory;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolServiceBuilder;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest.class */
public class JobMasterTest extends TestLogger {
    private static final long fastHeartbeatInterval = 1;
    private static final long heartbeatInterval = 1000;
    private static final long heartbeatTimeout = 5000000;
    private static TestingRpcService rpcService;
    private static HeartbeatServices fastHeartbeatServices;
    private static HeartbeatServices heartbeatServices;
    private Configuration configuration;
    private ResourceID jmResourceId;
    private JobMasterId jobMasterId;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService rmLeaderRetrievalService;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final long fastHeartbeatTimeout = 10;
    private static final Time testingTimeout = Time.seconds(fastHeartbeatTimeout);
    private static final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$DummyCheckpointStorageLocation.class */
    private static final class DummyCheckpointStorageLocation implements CompletedCheckpointStorageLocation {
        private static final long serialVersionUID = 164095949572620688L;

        private DummyCheckpointStorageLocation() {
        }

        public String getExternalPointer() {
            return null;
        }

        public StreamStateHandle getMetadataHandle() {
            return null;
        }

        public void disposeStorageLocation() throws IOException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingInputSplit.class */
    public static final class TestingInputSplit implements InputSplit {
        private static final long serialVersionUID = -5404803705463116083L;
        private final int splitNumber;

        TestingInputSplit(int i) {
            this.splitNumber = i;
        }

        public int getSplitNumber() {
            return this.splitNumber;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.splitNumber == ((TestingInputSplit) obj).splitNumber;
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.splitNumber));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingInputSplitSource.class */
    public static final class TestingInputSplitSource implements InputSplitSource<TestingInputSplit> {
        private static final long serialVersionUID = -2344684048759139086L;
        private final List<TestingInputSplit> inputSplits;

        private TestingInputSplitSource(List<TestingInputSplit> list) {
            this.inputSplits = list;
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public TestingInputSplit[] m188createInputSplits(int i) {
            return (TestingInputSplit[]) this.inputSplits.toArray(JobMasterTest.EMPTY_TESTING_INPUT_SPLITS);
        }

        public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] testingInputSplitArr) {
            return new DefaultInputSplitAssigner(testingInputSplitArr);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingSlotPool.class */
    private static final class TestingSlotPool implements SlotPool, SlotPoolService {
        private final JobID jobId;
        private final OneShotLatch hasReceivedSlotOffers;
        private final Map<ResourceID, Collection<SlotInfo>> registeredSlots;

        private TestingSlotPool(JobID jobID, OneShotLatch oneShotLatch) {
            this.jobId = jobID;
            this.hasReceivedSlotOffers = oneShotLatch;
            this.registeredSlots = new HashMap(16);
        }

        public void start(JobMasterId jobMasterId, String str, ComponentMainThreadExecutor componentMainThreadExecutor) {
        }

        public void close() {
            clear();
        }

        private void clear() {
            this.registeredSlots.clear();
        }

        public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public void disconnectResourceManager() {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public boolean registerTaskManager(ResourceID resourceID) {
            this.registeredSlots.computeIfAbsent(resourceID, resourceID2 -> {
                return new ArrayList(16);
            });
            return true;
        }

        public boolean releaseTaskManager(ResourceID resourceID, Exception exc) {
            this.registeredSlots.remove(resourceID);
            return true;
        }

        public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> collection) {
            this.hasReceivedSlotOffers.trigger();
            Collection collection2 = (Collection) Optional.ofNullable(this.registeredSlots.get(taskManagerLocation.getResourceID())).orElseThrow(() -> {
                return new FlinkRuntimeException("TaskManager not registered.");
            });
            int size = collection2.size();
            Iterator<SlotOffer> it = collection.iterator();
            while (it.hasNext()) {
                collection2.add(new SimpleSlotContext(it.next().getAllocationId(), taskManagerLocation, size, taskManagerGateway));
                size++;
            }
            return collection;
        }

        public Optional<ResourceID> failAllocation(@Nullable ResourceID resourceID, AllocationID allocationID, Exception exc) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public Optional<ResourceID> failAllocation(AllocationID allocationID, Exception exc) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        @Nonnull
        public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
            return Collections.unmodifiableCollection((Collection) this.registeredSlots.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map(slotInfo -> {
                return SlotInfoWithUtilization.from(slotInfo, 0.0d);
            }).collect(Collectors.toList()));
        }

        public Collection<SlotInfo> getAllocatedSlotsInformation() {
            return Collections.emptyList();
        }

        public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nonnull ResourceProfile resourceProfile) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nullable Time time) {
            return new CompletableFuture<>();
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
            return new CompletableFuture<>();
        }

        public void disableBatchSlotRequestTimeoutCheck() {
        }

        public AllocatedSlotReport createAllocatedSlotReport(ResourceID resourceID) {
            return new AllocatedSlotReport(this.jobId, (List) this.registeredSlots.getOrDefault(resourceID, Collections.emptyList()).stream().map(slotInfo -> {
                return new AllocatedSlotInfo(slotInfo.getPhysicalSlotNumber(), slotInfo.getAllocationId());
            }).collect(Collectors.toList()));
        }

        public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable th) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingSlotPoolFactory.class */
    private static final class TestingSlotPoolFactory implements SlotPoolServiceFactory {
        private final OneShotLatch hasReceivedSlotOffers;

        public TestingSlotPoolFactory(OneShotLatch oneShotLatch) {
            this.hasReceivedSlotOffers = oneShotLatch;
        }

        @Nonnull
        public SlotPoolService createSlotPoolService(@Nonnull JobID jobID) {
            return new TestingSlotPool(jobID, this.hasReceivedSlotOffers);
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
        fastHeartbeatServices = new HeartbeatServices(fastHeartbeatInterval, fastHeartbeatTimeout);
        heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
    }

    @Before
    public void setup() throws IOException {
        this.configuration = new Configuration();
        this.haServices = new TestingHighAvailabilityServices();
        this.jobMasterId = JobMasterId.generate();
        this.jmResourceId = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
        this.rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        this.haServices.setResourceManagerLeaderRetriever(this.rmLeaderRetrievalService);
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithTaskManager() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerConsumer((resourceID, allocatedSlotReport) -> {
            completableFuture.complete(resourceID);
        }).setDisconnectJobManagerConsumer((jobID, th) -> {
            completableFuture2.complete(jobID);
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(fastHeartbeatServices).createJobMaster();
        createJobMaster.start();
        try {
            createJobMaster.getSelfGateway(JobMasterGateway.class).registerTaskManager(createTestingTaskExecutorGateway.getAddress(), localUnresolvedTaskManagerLocation, jobGraph.getJobID(), testingTimeout).get();
            MatcherAssert.assertThat((JobID) completableFuture2.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(jobGraph.getJobID()));
            MatcherAssert.assertThat((ResourceID) completableFuture.getNow(null), Matchers.anyOf(Matchers.nullValue(), Matchers.equalTo(this.jmResourceId)));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th2) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th2;
        }
    }

    @Test
    public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        OneShotLatch oneShotLatch = new OneShotLatch();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerConsumer((resourceID, allocatedSlotReport) -> {
            try {
                if (oneShotLatch.isTriggered()) {
                    MatcherAssert.assertThat(allocatedSlotReport.getAllocatedSlotInfos(), Matchers.hasSize(1));
                } else {
                    MatcherAssert.assertThat(allocatedSlotReport.getAllocatedSlotInfos(), Matchers.empty());
                }
            } catch (AssertionError e) {
                completableFuture.completeExceptionally(e);
            }
            if (atomicBoolean.get()) {
                completableFuture.complete(null);
            }
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        JobManagerSharedServices build = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        JobMaster createJobMaster = new JobMasterBuilder(singleNoOpJobGraph, rpcService).withHeartbeatServices(new HeartbeatServices(5L, heartbeatInterval)).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(new TestingSlotPoolFactory(oneShotLatch), new DefaultSchedulerFactory())).createJobMaster();
        createJobMaster.start();
        try {
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            selfGateway.registerTaskManager(createTestingTaskExecutorGateway.getAddress(), localUnresolvedTaskManagerLocation, singleNoOpJobGraph.getJobID(), testingTimeout).get();
            SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
            MatcherAssert.assertThat(selfGateway.offerSlots(localUnresolvedTaskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get(), Matchers.containsInAnyOrder(new SlotOffer[]{slotOffer}));
            atomicBoolean.set(true);
            completableFuture.get();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            build.shutdown();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            build.shutdown();
            throw th;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(generate, new ResourceID("rm"), "rm", "localhost");
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        testingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
            completableFuture.complete(Tuple3.of(jobMasterId, resourceID, jobID));
            countDownLatch.countDown();
            return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
        });
        testingResourceManagerGateway.setDisconnectJobManagerConsumer(tuple3 -> {
            completableFuture2.complete(tuple3.f0);
        });
        rpcService.registerGateway("rm", testingResourceManagerGateway);
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(fastHeartbeatServices).createJobMaster();
        createJobMaster.start();
        try {
            this.rmLeaderRetrievalService.notifyListener("rm", generate.toUUID());
            Tuple3 tuple32 = (Tuple3) completableFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat(tuple32.f0, Matchers.equalTo(this.jobMasterId));
            MatcherAssert.assertThat(tuple32.f1, Matchers.equalTo(this.jmResourceId));
            MatcherAssert.assertThat(tuple32.f2, Matchers.equalTo(jobGraph.getJobID()));
            MatcherAssert.assertThat((JobID) completableFuture2.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(jobGraph.getJobID()));
            countDownLatch.await();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testRestoringFromSavepoint() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(SavepointRestoreSettings.forPath(createSavepoint(42L).getAbsolutePath(), true));
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        this.haServices.setCheckpointRecoveryFactory(PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(standaloneCompletedCheckpointStore, new StandaloneCheckpointIDCounter()));
        JobMaster createJobMaster = new JobMasterBuilder(createJobGraphWithCheckpointing, rpcService).withHighAvailabilityServices(this.haServices).createJobMaster();
        try {
            createJobMaster.start();
            OneShotLatch oneShotLatch = new OneShotLatch();
            registerSlotsAtJobMaster(1, (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class), createJobGraphWithCheckpointing.getJobID(), new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                oneShotLatch.trigger();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            oneShotLatch.await();
            CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint(false);
            MatcherAssert.assertThat(latestCheckpoint, Matchers.notNullValue());
            MatcherAssert.assertThat(Long.valueOf(latestCheckpoint.getCheckpointID()), Matchers.is(42L));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testCheckpointPrecedesSavepointRecovery() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(SavepointRestoreSettings.forPath("" + createSavepoint(42L).getAbsolutePath(), true));
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(createJobGraphWithCheckpointing.getJobID(), fastHeartbeatInterval, fastHeartbeatInterval, fastHeartbeatInterval, Collections.emptyMap(), (Collection) null, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new DummyCheckpointStorageLocation());
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        standaloneCompletedCheckpointStore.addCheckpoint(completedCheckpoint, new CheckpointsCleaner(), () -> {
        });
        this.haServices.setCheckpointRecoveryFactory(PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(standaloneCompletedCheckpointStore, new StandaloneCheckpointIDCounter()));
        JobMaster createJobMaster = new JobMasterBuilder(createJobGraphWithCheckpointing, rpcService).createJobMaster();
        try {
            CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint(false);
            MatcherAssert.assertThat(latestCheckpoint, Matchers.notNullValue());
            MatcherAssert.assertThat(Long.valueOf(latestCheckpoint.getCheckpointID()), Matchers.is(Long.valueOf(fastHeartbeatInterval)));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testCloseUnestablishedResourceManagerConnection() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).createJobMaster();
        try {
            createJobMaster.start();
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway();
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway2 = createAndRegisterTestingResourceManagerGateway();
            OneShotLatch oneShotLatch = new OneShotLatch();
            OneShotLatch oneShotLatch2 = new OneShotLatch();
            createAndRegisterTestingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
                oneShotLatch.trigger();
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway.getJobMasterRegistrationSuccess());
            });
            createAndRegisterTestingResourceManagerGateway2.setRegisterJobManagerFunction((jobMasterId2, resourceID2, str2, jobID2) -> {
                oneShotLatch2.trigger();
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway2.getJobMasterRegistrationSuccess());
            });
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway);
            oneShotLatch.await();
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway2);
            oneShotLatch2.await();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testReconnectionAfterDisconnect() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        createJobMaster.start();
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            createAndRegisterTestingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
                arrayBlockingQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway.getJobMasterRegistrationSuccess());
            });
            ResourceManagerId m340getFencingToken = createAndRegisterTestingResourceManagerGateway.m340getFencingToken();
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway);
            MatcherAssert.assertThat((JobMasterId) arrayBlockingQueue.take(), Matchers.equalTo(this.jobMasterId));
            MatcherAssert.assertThat(Boolean.valueOf(arrayBlockingQueue.isEmpty()), Matchers.is(true));
            selfGateway.disconnectResourceManager(m340getFencingToken, new FlinkException("Test exception"));
            MatcherAssert.assertThat(arrayBlockingQueue.take(), Matchers.equalTo(this.jobMasterId));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testResourceManagerConnectionAfterStart() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            createAndRegisterTestingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
                arrayBlockingQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway.getJobMasterRegistrationSuccess());
            });
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway);
            createJobMaster.start();
            MatcherAssert.assertThat((JobMasterId) arrayBlockingQueue.take(), Matchers.equalTo(this.jobMasterId));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    @Category({FailsWithAdaptiveScheduler.class})
    public void testRequestNextInputSplitWithLocalFailover() throws Exception {
        this.configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        runRequestNextInputSplitTest(list -> {
            return (List) list.get(0);
        });
    }

    @Test
    public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
        this.configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(0L));
        this.configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        runRequestNextInputSplitTest(this::flattenCollection);
    }

    private void runRequestNextInputSplitTest(Function<List<List<InputSplit>>, Collection<InputSplit>> function) throws Exception {
        ArrayList arrayList = new ArrayList(4);
        for (int i = 0; i < 4; i++) {
            arrayList.add(new TestingInputSplit(i));
        }
        TestingInputSplitSource testingInputSplitSource = new TestingInputSplitSource(arrayList);
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(2);
        jobVertex.setInputSplitSource(testingInputSplitSource);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0L));
        JobGraph build = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setExecutionConfig(executionConfig).build();
        JobMaster createJobMaster = new JobMasterBuilder(build, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        createJobMaster.start();
        try {
            JobMasterGateway jobMasterGateway = (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(jobMasterGateway, build.getJobID(), 2);
            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
            JobVertexID id = jobVertex.getID();
            List<AccessExecution> executions = getExecutions(jobMasterGateway, id);
            ExecutionAttemptID attemptId = executions.get(0).getAttemptId();
            ArrayList arrayList2 = new ArrayList(2);
            Iterator<AccessExecution> it = executions.iterator();
            while (it.hasNext()) {
                arrayList2.add(getInputSplits(2, getInputSplitSupplier(id, jobMasterGateway, it.next().getAttemptId())));
            }
            MatcherAssert.assertThat(flattenCollection(arrayList2), Matchers.containsInAnyOrder(arrayList.toArray(EMPTY_TESTING_INPUT_SPLITS)));
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.FAILED)).get();
            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
            MatcherAssert.assertThat(getRemainingInputSplits(getInputSplitSupplier(id, jobMasterGateway, getFirstExecution(jobMasterGateway, id).getAttemptId())), Matchers.containsInAnyOrder(function.apply(arrayList2).toArray(EMPTY_TESTING_INPUT_SPLITS)));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Nonnull
    private List<InputSplit> flattenCollection(List<List<InputSplit>> list) {
        return (List) list.stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    @Nonnull
    private Supplier<SerializedInputSplit> getInputSplitSupplier(JobVertexID jobVertexID, JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        return () -> {
            return getInputSplit(jobMasterGateway, jobVertexID, executionAttemptID);
        };
    }

    private void waitUntilAllExecutionsAreScheduledOrDeployed(JobMasterGateway jobMasterGateway) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            Collection<AccessExecution> executions = getExecutions(jobMasterGateway);
            return Boolean.valueOf(!executions.isEmpty() && executions.stream().allMatch(accessExecution -> {
                return accessExecution.getState() == ExecutionState.SCHEDULED || accessExecution.getState() == ExecutionState.DEPLOYING;
            }));
        }, Deadline.fromNow(Duration.ofMillis(testingTimeout.toMilliseconds())));
    }

    private static AccessExecution getFirstExecution(JobMasterGateway jobMasterGateway, JobVertexID jobVertexID) {
        List<AccessExecution> executions = getExecutions(jobMasterGateway, jobVertexID);
        MatcherAssert.assertThat(executions, Matchers.hasSize(Matchers.greaterThanOrEqualTo(1)));
        return executions.get(0);
    }

    private static Collection<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway) {
        return (Collection) requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph().getAllVertices().values().stream().flatMap(accessExecutionJobVertex -> {
            return Arrays.stream(accessExecutionJobVertex.getTaskVertices());
        }).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).collect(Collectors.toList());
    }

    private static List<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway, JobVertexID jobVertexID) {
        return (List) ((List) Optional.ofNullable(requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph().getAllVertices().get(jobVertexID)).map(accessExecutionJobVertex -> {
            return Arrays.asList(accessExecutionJobVertex.getTaskVertices());
        }).orElse(Collections.emptyList())).stream().map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).collect(Collectors.toList());
    }

    private static ExecutionGraphInfo requestExecutionGraph(JobMasterGateway jobMasterGateway) {
        try {
            return (ExecutionGraphInfo) jobMasterGateway.requestJob(testingTimeout).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Nonnull
    private static List<InputSplit> getInputSplits(int i, Supplier<SerializedInputSplit> supplier) throws Exception {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            SerializedInputSplit serializedInputSplit = supplier.get();
            MatcherAssert.assertThat(Boolean.valueOf(serializedInputSplit.isEmpty()), Matchers.is(false));
            arrayList.add(InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), ClassLoader.getSystemClassLoader()));
        }
        return arrayList;
    }

    private List<InputSplit> getRemainingInputSplits(Supplier<SerializedInputSplit> supplier) throws Exception {
        ArrayList arrayList = new ArrayList(16);
        boolean z = true;
        while (z) {
            SerializedInputSplit serializedInputSplit = supplier.get();
            if (serializedInputSplit.isEmpty()) {
                z = false;
            } else {
                InputSplit inputSplit = (InputSplit) InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), ClassLoader.getSystemClassLoader());
                if (inputSplit == null) {
                    z = false;
                } else {
                    arrayList.add(inputSplit);
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SerializedInputSplit getInputSplit(JobMasterGateway jobMasterGateway, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) {
        try {
            return (SerializedInputSplit) jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testRequestPartitionState() throws Exception {
        JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
        JobMaster createJobMaster = new JobMasterBuilder(producerConsumerJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        createJobMaster.start();
        try {
            CompletableFuture completableFuture = new CompletableFuture();
            TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                completableFuture.complete(taskDeploymentDescriptor);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            JobMasterGateway jobMasterGateway = (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class);
            MatcherAssert.assertThat(registerSlotsAtJobMaster(1, jobMasterGateway, producerConsumerJobGraph.getJobID(), createTestingTaskExecutorGateway, localUnresolvedTaskManagerLocation), Matchers.hasSize(1));
            TaskDeploymentDescriptor taskDeploymentDescriptor2 = (TaskDeploymentDescriptor) completableFuture.get();
            MatcherAssert.assertThat(taskDeploymentDescriptor2.getProducedPartitions(), Matchers.hasSize(1));
            ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = (ResultPartitionDeploymentDescriptor) taskDeploymentDescriptor2.getProducedPartitions().iterator().next();
            ExecutionAttemptID executionAttemptId = taskDeploymentDescriptor2.getExecutionAttemptId();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(executionAttemptId);
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.FINISHED)).get();
            ResultPartitionID resultPartitionID = new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), executionAttemptID);
            MatcherAssert.assertThat(jobMasterGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), resultPartitionID).get(), Matchers.equalTo(ExecutionState.FINISHED));
            try {
                jobMasterGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), new ResultPartitionID()).get();
                Assert.fail("Expected failure.");
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent()), Matchers.is(true));
            }
            try {
                jobMasterGateway.requestPartitionState(new IntermediateDataSetID(), resultPartitionID).get();
                Assert.fail("Expected failure.");
            } catch (ExecutionException e2) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e2, IllegalArgumentException.class).isPresent()), Matchers.is(true));
            }
            try {
                jobMasterGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), new ExecutionAttemptID())).get();
                Assert.fail("Expected failure.");
            } catch (ExecutionException e3) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e3, PartitionProducerDisposedException.class).isPresent()), Matchers.is(true));
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    private void notifyResourceManagerLeaderListeners(TestingResourceManagerGateway testingResourceManagerGateway) {
        this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m340getFencingToken().toUUID());
    }

    @Test
    public void testTriggerSavepointTimeout() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withFatalErrorHandler(this.testingFatalErrorHandler).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(TestingSchedulerNG.newBuilder().setTriggerSavepointFunction((str, bool) -> {
            return new CompletableFuture();
        }).build()))).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture triggerSavepoint = selfGateway.triggerSavepoint("/tmp", false, Time.milliseconds(fastHeartbeatInterval));
            CompletableFuture triggerSavepoint2 = selfGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
            try {
                triggerSavepoint.get(testingTimeout.getSize(), testingTimeout.getUnit());
                Assert.fail();
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(TimeoutException.class));
            }
            MatcherAssert.assertThat(Boolean.valueOf(triggerSavepoint2.isDone()), Matchers.is(Matchers.equalTo(false)));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
        JobGraph createSingleVertexJobWithRestartStrategy = createSingleVertexJobWithRestartStrategy();
        JobMaster createJobMaster = new JobMasterBuilder(createSingleVertexJobWithRestartStrategy, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, th) -> {
            completableFuture2.complete(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setDisconnectJobManagerConsumer((jobID, th2) -> {
            completableFuture.complete(jobID);
        }).createTestingTaskExecutorGateway();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> registerSlotsAtJobMaster = registerSlotsAtJobMaster(1, selfGateway, createSingleVertexJobWithRestartStrategy.getJobID(), createTestingTaskExecutorGateway, localUnresolvedTaskManagerLocation);
            MatcherAssert.assertThat(registerSlotsAtJobMaster, Matchers.hasSize(1));
            AllocationID allocationId = registerSlotsAtJobMaster.iterator().next().getAllocationId();
            selfGateway.failSlot(localUnresolvedTaskManagerLocation.getResourceID(), allocationId, new FlinkException("Fail allocation test exception"));
            MatcherAssert.assertThat(completableFuture2.get(), Matchers.equalTo(allocationId));
            MatcherAssert.assertThat(completableFuture.get(), Matchers.equalTo(createSingleVertexJobWithRestartStrategy.getJobID()));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th3) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th3;
        }
    }

    @Test
    public void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception {
        JobManagerSharedServices build = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        TestingJobMasterPartitionTracker testingJobMasterPartitionTracker = new TestingJobMasterPartitionTracker();
        testingJobMasterPartitionTracker.setIsTrackingPartitionsForFunction(resourceID -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        JobMaster createJobMaster = new JobMasterBuilder(singleNoOpJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withJobManagerSharedServices(build).withHeartbeatServices(heartbeatServices).withPartitionTrackerFactory(taskExecutorGatewayLookup -> {
            return testingJobMasterPartitionTracker;
        }).createJobMaster();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, th) -> {
            completableFuture2.complete(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setDisconnectJobManagerConsumer((jobID, th2) -> {
            completableFuture.complete(jobID);
        }).createTestingTaskExecutorGateway();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> registerSlotsAtJobMaster = registerSlotsAtJobMaster(1, selfGateway, singleNoOpJobGraph.getJobID(), createTestingTaskExecutorGateway, localUnresolvedTaskManagerLocation);
            MatcherAssert.assertThat(registerSlotsAtJobMaster, Matchers.hasSize(1));
            AllocationID allocationId = registerSlotsAtJobMaster.iterator().next().getAllocationId();
            selfGateway.failSlot(localUnresolvedTaskManagerLocation.getResourceID(), allocationId, new FlinkException("Fail allocation test exception"));
            MatcherAssert.assertThat(completableFuture2.get(), Matchers.equalTo(allocationId));
            selfGateway.requestJobStatus(Time.seconds(5L)).get();
            MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(false));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th3) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th3;
        }
    }

    @Test
    public void testJobMasterAggregatesValuesCorrectly() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        createJobMaster.start();
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            AggregateFunction<Integer, Integer, Integer> createAggregateFunction = createAggregateFunction();
            ClosureCleaner.clean(createAggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            byte[] serializeObject = InstantiationUtil.serializeObject(createAggregateFunction);
            MatcherAssert.assertThat(selfGateway.updateGlobalAggregate("agg1", 1, serializeObject).get(), Matchers.equalTo(1));
            MatcherAssert.assertThat(selfGateway.updateGlobalAggregate("agg1", 2, serializeObject).get(), Matchers.equalTo(3));
            MatcherAssert.assertThat(selfGateway.updateGlobalAggregate("agg1", 3, serializeObject).get(), Matchers.equalTo(6));
            MatcherAssert.assertThat(selfGateway.updateGlobalAggregate("agg1", 4, serializeObject).get(), Matchers.equalTo(10));
            MatcherAssert.assertThat(selfGateway.updateGlobalAggregate("agg2", 10, serializeObject).get(), Matchers.equalTo(10));
            MatcherAssert.assertThat(selfGateway.updateGlobalAggregate("agg2", 23, serializeObject).get(), Matchers.equalTo(33));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    private AggregateFunction<Integer, Integer, Integer> createAggregateFunction() {
        return new AggregateFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.runtime.jobmaster.JobMasterTest.1
            /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
            public Integer m187createAccumulator() {
                return 0;
            }

            public Integer add(Integer num, Integer num2) {
                return Integer.valueOf(num2.intValue() + num.intValue());
            }

            public Integer getResult(Integer num) {
                return num;
            }

            public Integer merge(Integer num, Integer num2) {
                return add(num, num2);
            }
        };
    }

    @Nonnull
    private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() {
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        return testingResourceManagerGateway;
    }

    @Test
    public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
        runJobFailureWhenTaskExecutorTerminatesTest(heartbeatServices, (localUnresolvedTaskManagerLocation, jobMasterGateway) -> {
            jobMasterGateway.disconnectTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new FlinkException("Test disconnectTaskManager exception."));
        }, (jobMasterGateway2, resourceID) -> {
            return (resourceID, allocatedSlotReport) -> {
            };
        });
    }

    @Test
    public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
        TestingHeartbeatServices testingHeartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
        runJobFailureWhenTaskExecutorTerminatesTest(testingHeartbeatServices, (localUnresolvedTaskManagerLocation, jobMasterGateway) -> {
            testingHeartbeatServices.triggerHeartbeatTimeout(this.jmResourceId, localUnresolvedTaskManagerLocation.getResourceID());
        }, (jobMasterGateway2, resourceID) -> {
            return (resourceID, allocatedSlotReport) -> {
                jobMasterGateway2.heartbeatFromTaskManager(resourceID, TaskExecutorToJobManagerHeartbeatPayload.empty());
            };
        });
    }

    @Test
    public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            MatcherAssert.assertThat(createJobMaster.registerTaskManager("foobar", new LocalUnresolvedTaskManagerLocation(), new JobID(), testingTimeout).get(), Matchers.instanceOf(JMTMRegistrationRejection.class));
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    @Test
    public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(TestingSchedulerNG.newBuilder().setCloseAsyncSupplier(() -> {
            return completableFuture;
        }).build()))).createJobMaster();
        createJobMaster.start();
        CompletableFuture closeAsync = createJobMaster.closeAsync();
        try {
            closeAsync.get(fastHeartbeatTimeout, TimeUnit.MILLISECONDS);
            Assert.fail("Expected TimeoutException because the JobMaster should not terminate.");
        } catch (TimeoutException e) {
        }
        completableFuture.complete(null);
        closeAsync.get();
    }

    private void runJobFailureWhenTaskExecutorTerminatesTest(HeartbeatServices heartbeatServices2, BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> biConsumer, BiFunction<JobMasterGateway, ResourceID, BiConsumer<ResourceID, AllocatedSlotReport>> biFunction) throws Exception {
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        JobMasterBuilder.TestingOnCompletionActions testingOnCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
        JobMaster createJobMaster = new JobMasterBuilder(singleNoOpJobGraph, rpcService).withResourceId(this.jmResourceId).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices2).withOnCompletionActions(testingOnCompletionActions).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class);
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            CompletableFuture completableFuture = new CompletableFuture();
            MatcherAssert.assertThat(registerSlotsAtJobMaster(1, jobMasterGateway, singleNoOpJobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                completableFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).setHeartbeatJobManagerConsumer(biFunction.apply(jobMasterGateway, localUnresolvedTaskManagerLocation.getResourceID())).createTestingTaskExecutorGateway(), localUnresolvedTaskManagerLocation), Matchers.hasSize(1));
            ExecutionAttemptID executionAttemptID = (ExecutionAttemptID) completableFuture.get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.INITIALIZING)).get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.RUNNING)).get();
            biConsumer.accept(localUnresolvedTaskManagerLocation, jobMasterGateway);
            MatcherAssert.assertThat(testingOnCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getArchivedExecutionGraph().getState(), Matchers.is(JobStatus.FAILED));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    private Collection<SlotOffer> registerSlotsAtJobMaster(int i, JobMasterGateway jobMasterGateway, JobID jobID, TaskExecutorGateway taskExecutorGateway, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws ExecutionException, InterruptedException {
        rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
        jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), unresolvedTaskManagerLocation, jobID, testingTimeout).get();
        return (Collection) jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), (Collection) IntStream.range(0, i).mapToObj(i2 -> {
            return new SlotOffer(new AllocationID(), i2, ResourceProfile.ANY);
        }).collect(Collectors.toList()), testingTimeout).get();
    }

    private JobGraph producerConsumerJobGraph() {
        JobVertex jobVertex = new JobVertex("Producer");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("Consumer");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2);
    }

    private File createSavepoint(long j) throws IOException {
        return TestUtils.createSavepointWithOperatorState(temporaryFolder.newFile(), j, new OperatorID[0]);
    }

    @Nonnull
    private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, jobVertex);
    }

    private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
        singleNoOpJobGraph.setExecutionConfig(executionConfig);
        return singleNoOpJobGraph;
    }

    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway, JobID jobID, int i) throws ExecutionException, InterruptedException {
        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(rpcService, jobMasterGateway, jobID, i, new TestingTaskExecutorGatewayBuilder().setCancelTaskFunction(executionAttemptID -> {
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.CANCELED));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway(), testingTimeout);
    }
}
