package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.class */
public class SeekableStreamSupervisorStateManagerTest {
    private SeekableStreamSupervisorStateManager stateManager;
    private SupervisorStateManagerConfig config;
    private ObjectMapper defaultMapper;

    @Before
    public void setupTest() {
        this.config = new SupervisorStateManagerConfig(10);
        this.stateManager = new SeekableStreamSupervisorStateManager(this.config, false);
        this.defaultMapper = new DefaultObjectMapper();
    }

    @Test
    public void testHappyPath() {
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.markRunFinished();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState(SupervisorStateManager.BasicState.PENDING);
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.markRunFinished();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
    }

    @Test
    public void testStreamFailureLostContact() {
        this.stateManager.markRunFinished();
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        for (int i = 0; i < this.config.getUnhealthinessThreshold(); i++) {
            Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent(new StreamException(new IllegalStateException("DOH!")));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(this.config.getUnhealthinessThreshold(), this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(exceptionEvent -> {
            Assert.assertTrue(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvent).isStreamException());
            Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvent.getExceptionClass());
        });
    }

    @Test
    public void testStreamFailureUnableToConnect() {
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
        for (int i = 0; i < this.config.getUnhealthinessThreshold(); i++) {
            Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent(new StreamException(new IllegalStateException("DOH!")));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(this.config.getUnhealthinessThreshold(), this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(exceptionEvent -> {
            Assert.assertTrue(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvent).isStreamException());
            Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvent.getExceptionClass());
        });
    }

    @Test
    public void testNonStreamUnhealthiness() {
        this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
        for (int i = 0; i < this.config.getUnhealthinessThreshold(); i++) {
            Assert.assertEquals(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent(new NullPointerException("oof"));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(this.config.getUnhealthinessThreshold(), this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(exceptionEvent -> {
            Assert.assertFalse(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvent).isStreamException());
            Assert.assertEquals(NullPointerException.class.getName(), exceptionEvent.getExceptionClass());
        });
    }

    @Test
    public void testTransientUnhealthiness() {
        this.stateManager.markRunFinished();
        for (int i = 1; i < 3; i++) {
            for (int i2 = 0; i2 < this.config.getUnhealthinessThreshold() - 1; i2++) {
                this.stateManager.recordThrowableEvent(new NullPointerException("oof"));
                this.stateManager.markRunFinished();
                Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
            }
            this.stateManager.markRunFinished();
            Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
            Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
            Assert.assertEquals(i * (this.config.getUnhealthinessThreshold() - 1), this.stateManager.getExceptionEvents().size());
        }
    }

    @Test
    public void testNonTransientTaskUnhealthiness() {
        this.stateManager.markRunFinished();
        for (int i = 0; i < this.config.getTaskUnhealthinessThreshold(); i++) {
            Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_TASKS, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_TASKS, this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(0L, this.stateManager.getExceptionEvents().size());
    }

    @Test
    public void testTransientTaskUnhealthiness() {
        this.stateManager.markRunFinished();
        for (int i = 0; i < this.config.getTaskUnhealthinessThreshold() + 3; i++) {
            Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals(0L, this.stateManager.getExceptionEvents().size());
    }

    @Test
    public void testSupervisorRecoveryWithHealthinessThreshold() {
        for (int i = 0; i < this.config.getUnhealthinessThreshold(); i++) {
            Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent(new Exception("Except the inevitable"));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState());
        for (int i2 = 0; i2 < this.config.getHealthinessThreshold(); i2++) {
            Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState());
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
        Assert.assertEquals(this.config.getUnhealthinessThreshold(), this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(exceptionEvent -> {
            Assert.assertFalse(((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent) exceptionEvent).isStreamException());
            Assert.assertEquals(Exception.class.getName(), exceptionEvent.getExceptionClass());
        });
    }

    @Test
    public void testTaskRecoveryWithHealthinessThreshold() {
        this.stateManager.markRunFinished();
        for (int i = 0; i < this.config.getTaskUnhealthinessThreshold(); i++) {
            Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_TASKS, this.stateManager.getSupervisorState());
        for (int i2 = 0; i2 < this.config.getTaskHealthinessThreshold(); i2++) {
            Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_TASKS, this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, this.stateManager.getSupervisorState());
    }

    @Test
    public void testTwoUnhealthyStates() {
        this.stateManager.markRunFinished();
        for (int i = 0; i < Math.max(this.config.getTaskUnhealthinessThreshold(), this.config.getUnhealthinessThreshold()); i++) {
            this.stateManager.recordThrowableEvent(new NullPointerException("somebody goofed"));
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState());
    }

    @Test
    public void testGetThrowableEvents() {
        Iterator it = ImmutableList.of(new StreamException(new UnsupportedOperationException("oof")), new NullPointerException("oof"), new RuntimeException((Throwable) new StreamException(new Exception("oof"))), new RuntimeException(new IllegalArgumentException("oof"))).iterator();
        while (it.hasNext()) {
            this.stateManager.recordThrowableEvent((Exception) it.next());
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals(SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, this.stateManager.getSupervisorState());
        ImmutableList of = ImmutableList.of(Pair.of("java.lang.UnsupportedOperationException", true), Pair.of("java.lang.NullPointerException", false), Pair.of("java.lang.Exception", true), Pair.of("java.lang.IllegalArgumentException", false));
        Iterator it2 = this.stateManager.getExceptionEvents().iterator();
        of.forEach(pair -> {
            SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent seekableStreamExceptionEvent = (SupervisorStateManager.ExceptionEvent) it2.next();
            Assert.assertNotNull(seekableStreamExceptionEvent.getMessage());
            Assert.assertEquals(pair.lhs, seekableStreamExceptionEvent.getExceptionClass());
            Assert.assertEquals(pair.rhs, Boolean.valueOf(seekableStreamExceptionEvent.isStreamException()));
        });
        Assert.assertFalse(it2.hasNext());
    }

    @Test
    public void testExceptionEventSerde() throws IOException {
        Map map = (Map) this.defaultMapper.readValue(this.defaultMapper.writeValueAsString(new SupervisorStateManager.ExceptionEvent(new NullPointerException("msg"), true)), new TypeReference<Map<String, String>>() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManagerTest.1
        });
        Assert.assertNotNull(map.get("timestamp"));
        Assert.assertEquals("java.lang.NullPointerException", map.get("exceptionClass"));
        Assert.assertFalse(Boolean.getBoolean((String) map.get("streamException")));
        Assert.assertNotNull(map.get("message"));
    }
}
