package org.apache.storm.spout;

import java.util.HashMap;
import java.util.List;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/spout/CheckpointSpoutTest.class */
public class CheckpointSpoutTest {
    CheckpointSpout spout = new CheckpointSpout();
    TopologyContext mockTopologyContext;
    SpoutOutputCollector mockOutputCollector;

    @BeforeEach
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
        Mockito.when(this.mockTopologyContext.getThisComponentId()).thenReturn("test");
        Mockito.when(Integer.valueOf(this.mockTopologyContext.getThisTaskId())).thenReturn(1);
        this.mockOutputCollector = (SpoutOutputCollector) Mockito.mock(SpoutOutputCollector.class);
    }

    @Test
    public void testInitState() throws Exception {
        this.spout.open(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
        this.spout.nextTuple();
        Values values = new Values(new Object[]{-1L, CheckPointState.Action.INITSTATE});
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector)).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values, forClass2.getValue());
        Assertions.assertEquals(-1L, forClass3.getValue());
        this.spout.ack(-1L);
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector)).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Values values2 = new Values(new Object[]{-1L, CheckPointState.Action.INITSTATE});
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values2, forClass2.getValue());
        Assertions.assertEquals(-1L, forClass3.getValue());
    }

    @Test
    public void testPrepare() {
        this.spout.open(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack(-1L);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(2))).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Values values = new Values(new Object[]{0L, CheckPointState.Action.PREPARE});
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values, forClass2.getValue());
        Assertions.assertEquals(0L, forClass3.getValue());
    }

    @Test
    public void testPrepareWithFail() {
        KeyValueState state = StateFactory.getState("__state", new HashMap(), this.mockTopologyContext);
        state.put("__state", new CheckPointState(-1L, CheckPointState.State.COMMITTED));
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack(-1L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        this.spout.ack(0L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        this.spout.ack(0L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        this.spout.fail(1L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        this.spout.fail(1L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        this.spout.ack(1L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        this.spout.ack(0L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(8))).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Values values = new Values(new Object[]{1L, CheckPointState.Action.PREPARE});
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values, forClass2.getValue());
        Assertions.assertEquals(1L, forClass3.getValue());
    }

    @Test
    public void testCommit() {
        HashMap hashMap = new HashMap();
        hashMap.put("topology.state.checkpoint.interval.ms", 0);
        this.spout.open(hashMap, this.mockTopologyContext, this.mockOutputCollector);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack(-1L);
        this.spout.nextTuple();
        this.spout.ack(0L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        this.spout.fail(0L);
        Utils.sleep(10L);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(4))).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Values values = new Values(new Object[]{0L, CheckPointState.Action.COMMIT});
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values, forClass2.getValue());
        Assertions.assertEquals(0L, forClass3.getValue());
    }

    @Test
    public void testRecoveryRollback() {
        KeyValueState state = StateFactory.getState("test-1", new HashMap(), this.mockTopologyContext);
        state.put("__state", new CheckPointState(100L, CheckPointState.State.PREPARING));
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Values values = new Values(new Object[]{100L, CheckPointState.Action.ROLLBACK});
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values, forClass2.getValue());
        Assertions.assertEquals(100L, forClass3.getValue());
    }

    @Test
    public void testRecoveryRollbackAck() {
        KeyValueState state = StateFactory.getState("test-1", new HashMap(), this.mockTopologyContext);
        state.put("__state", new CheckPointState(100L, CheckPointState.State.PREPARING));
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        this.spout.ack(100L);
        this.spout.nextTuple();
        this.spout.ack(99L);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(3))).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Values values = new Values(new Object[]{100L, CheckPointState.Action.PREPARE});
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values, forClass2.getValue());
        Assertions.assertEquals(100L, forClass3.getValue());
    }

    @Test
    public void testRecoveryCommit() {
        KeyValueState state = StateFactory.getState("test-1", new HashMap(), this.mockTopologyContext);
        state.put("__state", new CheckPointState(100L, CheckPointState.State.COMMITTING));
        this.spout.open(this.mockTopologyContext, this.mockOutputCollector, 0, state);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).emit((String) forClass.capture(), (List) forClass2.capture(), forClass3.capture());
        Values values = new Values(new Object[]{100L, CheckPointState.Action.COMMIT});
        Assertions.assertEquals("$checkpoint", forClass.getValue());
        Assertions.assertEquals(values, forClass2.getValue());
        Assertions.assertEquals(100L, forClass3.getValue());
    }
}
