package org.apache.storm.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.windowing.EvictionPolicy;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/storm/windowing/WindowManagerTest.class */
public class WindowManagerTest {
    private WindowManager<Integer> windowManager;
    private Listener listener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/windowing/WindowManagerTest$Listener.class */
    public static class Listener implements WindowLifecycleListener<Integer> {
        List<Integer> onExpiryEvents;
        List<Integer> onActivationEvents;
        List<Integer> onActivationNewEvents;
        List<Integer> onActivationExpiredEvents;
        List<List<Integer>> allOnExpiryEvents;
        List<List<Integer>> allOnActivationEvents;
        List<List<Integer>> allOnActivationNewEvents;
        List<List<Integer>> allOnActivationExpiredEvents;

        private Listener() {
            this.onExpiryEvents = Collections.emptyList();
            this.onActivationEvents = Collections.emptyList();
            this.onActivationNewEvents = Collections.emptyList();
            this.onActivationExpiredEvents = Collections.emptyList();
            this.allOnExpiryEvents = new ArrayList();
            this.allOnActivationEvents = new ArrayList();
            this.allOnActivationNewEvents = new ArrayList();
            this.allOnActivationExpiredEvents = new ArrayList();
        }

        public void onExpiry(List<Integer> list) {
            this.onExpiryEvents = list;
            this.allOnExpiryEvents.add(list);
        }

        public void onActivation(List<Integer> list, List<Integer> list2, List<Integer> list3, Long l) {
            this.onActivationEvents = list;
            this.allOnActivationEvents.add(list);
            this.onActivationNewEvents = list2;
            this.allOnActivationNewEvents.add(list2);
            this.onActivationExpiredEvents = list3;
            this.allOnActivationExpiredEvents.add(list3);
        }

        void clear() {
            this.onExpiryEvents = Collections.emptyList();
            this.onActivationEvents = Collections.emptyList();
            this.onActivationNewEvents = Collections.emptyList();
            this.onActivationExpiredEvents = Collections.emptyList();
            this.allOnExpiryEvents.clear();
            this.allOnActivationEvents.clear();
            this.allOnActivationNewEvents.clear();
            this.allOnActivationExpiredEvents.clear();
        }
    }

    @BeforeEach
    public void setUp() {
        this.listener = new Listener();
        this.windowManager = new WindowManager<>(this.listener);
    }

    @AfterEach
    public void tearDown() {
        this.windowManager.shutdown();
    }

    @Test
    public void testCountBasedWindow() {
        CountEvictionPolicy countEvictionPolicy = new CountEvictionPolicy(5);
        CountTriggerPolicy countTriggerPolicy = new CountTriggerPolicy(2, this.windowManager, countEvictionPolicy);
        countTriggerPolicy.start();
        this.windowManager.setEvictionPolicy(countEvictionPolicy);
        this.windowManager.setTriggerPolicy(countTriggerPolicy);
        this.windowManager.add(1);
        this.windowManager.add(2);
        Assertions.assertTrue(this.listener.onExpiryEvents.isEmpty());
        Assertions.assertEquals(seq(1, 2), this.listener.onActivationEvents);
        Assertions.assertEquals(seq(1, 2), this.listener.onActivationNewEvents);
        Assertions.assertTrue(this.listener.onActivationExpiredEvents.isEmpty());
        this.windowManager.add(3);
        this.windowManager.add(4);
        Assertions.assertTrue(this.listener.onExpiryEvents.isEmpty());
        Assertions.assertEquals(seq(1, 4), this.listener.onActivationEvents);
        Assertions.assertEquals(seq(3, 4), this.listener.onActivationNewEvents);
        Assertions.assertTrue(this.listener.onActivationExpiredEvents.isEmpty());
        this.windowManager.add(5);
        this.windowManager.add(6);
        Assertions.assertEquals(seq(1), this.listener.onExpiryEvents);
        Assertions.assertEquals(seq(2, 6), this.listener.onActivationEvents);
        Assertions.assertEquals(seq(5, 6), this.listener.onActivationNewEvents);
        Assertions.assertEquals(seq(1), this.listener.onActivationExpiredEvents);
        this.listener.clear();
        this.windowManager.add(7);
        Assertions.assertTrue(this.listener.onExpiryEvents.isEmpty());
        this.windowManager.add(8);
        Assertions.assertEquals(seq(2, 3), this.listener.onExpiryEvents);
        Assertions.assertEquals(seq(4, 8), this.listener.onActivationEvents);
        Assertions.assertEquals(seq(7, 8), this.listener.onActivationNewEvents);
        Assertions.assertEquals(seq(2, 3), this.listener.onActivationExpiredEvents);
    }

    @Test
    public void testExpireThreshold() {
        this.windowManager.setEvictionPolicy(new CountEvictionPolicy(5));
        TimeTriggerPolicy timeTriggerPolicy = new TimeTriggerPolicy(new BaseWindowedBolt.Duration(1, TimeUnit.HOURS).value, this.windowManager);
        timeTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(timeTriggerPolicy);
        Iterator<Integer> it = seq(1, 5).iterator();
        while (it.hasNext()) {
            this.windowManager.add(Integer.valueOf(it.next().intValue()));
        }
        Assertions.assertTrue(this.listener.onExpiryEvents.isEmpty());
        Iterator<Integer> it2 = seq(6, 10).iterator();
        while (it2.hasNext()) {
            this.windowManager.add(Integer.valueOf(it2.next().intValue()));
        }
        Iterator<Integer> it3 = seq(11, 100).iterator();
        while (it3.hasNext()) {
            this.windowManager.add(Integer.valueOf(it3.next().intValue()));
        }
        Assertions.assertEquals(seq(1, 100 - 5), this.listener.onExpiryEvents);
    }

    private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy<Integer, ?> evictionPolicy, int i) {
        this.windowManager.setEvictionPolicy(evictionPolicy);
        WatermarkCountTriggerPolicy watermarkCountTriggerPolicy = new WatermarkCountTriggerPolicy(i, this.windowManager, evictionPolicy, this.windowManager);
        watermarkCountTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkCountTriggerPolicy);
        Iterator<Integer> it = seq(1, 100).iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.windowManager.add(Integer.valueOf(intValue), intValue);
        }
        MatcherAssert.assertThat("The watermark eviction policies should never evict events before the first watermark is received", this.listener.onExpiryEvents, CoreMatchers.is(Matchers.empty()));
        this.windowManager.add(new WaterMarkEvent(100));
        Assertions.assertEquals(seq(1, 100), this.listener.onActivationEvents);
        Iterator<Integer> it2 = seq(100 + 1, 100 * 2).iterator();
        while (it2.hasNext()) {
            int intValue2 = it2.next().intValue();
            this.windowManager.add(Integer.valueOf(intValue2), intValue2);
        }
        this.windowManager.add(new WaterMarkEvent(100 + i + 1));
        MatcherAssert.assertThat("All the events should be expired after the second watermark", this.listener.onExpiryEvents, CoreMatchers.equalTo(seq(1, 100)));
    }

    @Test
    public void testExpireThresholdWithWatermarkCountEvictionPolicy() {
        testEvictBeforeWatermarkForWatermarkEvictionPolicy(new WatermarkCountEvictionPolicy(100), 100);
    }

    @Test
    public void testExpireThresholdWithWatermarkTimeEvictionPolicy() {
        testEvictBeforeWatermarkForWatermarkEvictionPolicy(new WatermarkTimeEvictionPolicy(100), 100);
    }

    @Test
    public void testTimeBasedWindow() {
        TimeEvictionPolicy timeEvictionPolicy = new TimeEvictionPolicy(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS).value);
        this.windowManager.setEvictionPolicy(timeEvictionPolicy);
        TimeTriggerPolicy timeTriggerPolicy = new TimeTriggerPolicy(new BaseWindowedBolt.Duration(1, TimeUnit.DAYS).value, this.windowManager, timeEvictionPolicy);
        timeTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(timeTriggerPolicy);
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Integer> it = seq(1, 50).iterator();
        while (it.hasNext()) {
            this.windowManager.add(Integer.valueOf(it.next().intValue()), currentTimeMillis - 1000);
        }
        Iterator<Integer> it2 = seq(51, 100).iterator();
        while (it2.hasNext()) {
            this.windowManager.add(Integer.valueOf(it2.next().intValue()), currentTimeMillis);
        }
        Assertions.assertEquals(50, this.listener.onExpiryEvents.size());
        Iterator<Integer> it3 = seq(101, 200).iterator();
        while (it3.hasNext()) {
            this.windowManager.add(Integer.valueOf(it3.next().intValue()), currentTimeMillis - 1000);
        }
        timeEvictionPolicy.setContext(new DefaultEvictionContext(Long.valueOf(currentTimeMillis + 100)));
        this.windowManager.onTrigger();
        Assertions.assertEquals(100, this.listener.onExpiryEvents.size());
        Assertions.assertEquals(seq(101, 200), this.listener.onExpiryEvents);
        List<Integer> seq = seq(51, 100);
        Assertions.assertEquals(seq(51, 100), this.listener.onActivationEvents);
        Assertions.assertEquals(seq(51, 100), this.listener.onActivationNewEvents);
        List<Integer> seq2 = seq(1, 50);
        seq2.addAll(seq(101, 200));
        Assertions.assertEquals(seq2, this.listener.onActivationExpiredEvents);
        this.listener.clear();
        List<Integer> seq3 = seq(201, 300);
        Iterator<Integer> it4 = seq3.iterator();
        while (it4.hasNext()) {
            this.windowManager.add(Integer.valueOf(it4.next().intValue()), currentTimeMillis);
        }
        seq.addAll(seq3);
        timeEvictionPolicy.setContext(new DefaultEvictionContext(Long.valueOf(currentTimeMillis + 200)));
        this.windowManager.onTrigger();
        Assertions.assertTrue(this.listener.onExpiryEvents.isEmpty());
        Assertions.assertEquals(seq, this.listener.onActivationEvents);
        Assertions.assertEquals(seq3, this.listener.onActivationNewEvents);
    }

    @Test
    public void testTimeBasedWindowExpiry() {
        TimeEvictionPolicy timeEvictionPolicy = new TimeEvictionPolicy(new BaseWindowedBolt.Duration(100, TimeUnit.MILLISECONDS).value);
        this.windowManager.setEvictionPolicy(timeEvictionPolicy);
        TimeTriggerPolicy timeTriggerPolicy = new TimeTriggerPolicy(new BaseWindowedBolt.Duration(1, TimeUnit.DAYS).value, this.windowManager);
        timeTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(timeTriggerPolicy);
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Integer> it = seq(1, 10).iterator();
        while (it.hasNext()) {
            this.windowManager.add(Integer.valueOf(it.next().intValue()));
        }
        timeEvictionPolicy.setContext(new DefaultEvictionContext(Long.valueOf(currentTimeMillis + 60)));
        this.windowManager.onTrigger();
        Assertions.assertEquals(seq(1, 10), this.listener.onActivationEvents);
        Assertions.assertTrue(this.listener.onActivationExpiredEvents.isEmpty());
        this.listener.clear();
        timeEvictionPolicy.setContext(new DefaultEvictionContext(Long.valueOf(currentTimeMillis + 120)));
        this.windowManager.onTrigger();
        Assertions.assertEquals(seq(1, 10), this.listener.onExpiryEvents);
        Assertions.assertTrue(this.listener.onActivationEvents.isEmpty());
        this.listener.clear();
        timeEvictionPolicy.setContext(new DefaultEvictionContext(Long.valueOf(currentTimeMillis + 180)));
        this.windowManager.onTrigger();
        Assertions.assertTrue(this.listener.onActivationExpiredEvents.isEmpty());
        Assertions.assertTrue(this.listener.onActivationEvents.isEmpty());
    }

    @Test
    public void testTumblingWindow() {
        CountEvictionPolicy countEvictionPolicy = new CountEvictionPolicy(3);
        this.windowManager.setEvictionPolicy(countEvictionPolicy);
        CountTriggerPolicy countTriggerPolicy = new CountTriggerPolicy(3, this.windowManager, countEvictionPolicy);
        countTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(countTriggerPolicy);
        this.windowManager.add(1);
        this.windowManager.add(2);
        Assertions.assertTrue(this.listener.onExpiryEvents.isEmpty());
        this.windowManager.add(3);
        Assertions.assertTrue(this.listener.onExpiryEvents.isEmpty());
        Assertions.assertEquals(seq(1, 3), this.listener.onActivationEvents);
        Assertions.assertTrue(this.listener.onActivationExpiredEvents.isEmpty());
        Assertions.assertEquals(seq(1, 3), this.listener.onActivationNewEvents);
        this.listener.clear();
        this.windowManager.add(4);
        this.windowManager.add(5);
        this.windowManager.add(6);
        Assertions.assertEquals(seq(1, 3), this.listener.onExpiryEvents);
        Assertions.assertEquals(seq(4, 6), this.listener.onActivationEvents);
        Assertions.assertEquals(seq(1, 3), this.listener.onActivationExpiredEvents);
        Assertions.assertEquals(seq(4, 6), this.listener.onActivationNewEvents);
    }

    @Test
    public void testEventTimeBasedWindow() {
        WatermarkTimeEvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(20);
        this.windowManager.setEvictionPolicy(watermarkTimeEvictionPolicy);
        WatermarkTimeTriggerPolicy watermarkTimeTriggerPolicy = new WatermarkTimeTriggerPolicy(10L, this.windowManager, watermarkTimeEvictionPolicy, this.windowManager);
        watermarkTimeTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkTimeTriggerPolicy);
        this.windowManager.add(1, 603L);
        this.windowManager.add(2, 605L);
        this.windowManager.add(3, 607L);
        this.windowManager.add(new WaterMarkEvent(609L));
        Assertions.assertEquals(Collections.emptyList(), this.listener.allOnActivationEvents);
        this.windowManager.add(4, 618L);
        this.windowManager.add(5, 626L);
        this.windowManager.add(6, 636L);
        this.windowManager.add(new WaterMarkEvent(631L));
        Assertions.assertEquals(3, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(1, 3), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(1, 4), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(seq(4, 5), this.listener.allOnActivationEvents.get(2));
        Assertions.assertEquals(Collections.emptyList(), this.listener.allOnActivationExpiredEvents.get(0));
        Assertions.assertEquals(Collections.emptyList(), this.listener.allOnActivationExpiredEvents.get(1));
        Assertions.assertEquals(seq(1, 3), this.listener.allOnActivationExpiredEvents.get(2));
        Assertions.assertEquals(seq(1, 3), this.listener.allOnActivationNewEvents.get(0));
        Assertions.assertEquals(seq(4, 4), this.listener.allOnActivationNewEvents.get(1));
        Assertions.assertEquals(seq(5, 5), this.listener.allOnActivationNewEvents.get(2));
        Assertions.assertEquals(seq(1, 3), this.listener.allOnExpiryEvents.get(0));
        this.windowManager.add(7, 825L);
        this.windowManager.add(8, 826L);
        this.windowManager.add(9, 827L);
        this.windowManager.add(10, 839L);
        this.listener.clear();
        this.windowManager.add(new WaterMarkEvent(834L));
        Assertions.assertEquals(3, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(5, 6), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(6, 6), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(seq(7, 9), this.listener.allOnActivationEvents.get(2));
        Assertions.assertEquals(seq(4, 4), this.listener.allOnActivationExpiredEvents.get(0));
        Assertions.assertEquals(seq(5, 5), this.listener.allOnActivationExpiredEvents.get(1));
        Assertions.assertEquals(Collections.emptyList(), this.listener.allOnActivationExpiredEvents.get(2));
        Assertions.assertEquals(seq(6, 6), this.listener.allOnActivationNewEvents.get(0));
        Assertions.assertEquals(Collections.emptyList(), this.listener.allOnActivationNewEvents.get(1));
        Assertions.assertEquals(seq(7, 9), this.listener.allOnActivationNewEvents.get(2));
        Assertions.assertEquals(seq(4, 4), this.listener.allOnExpiryEvents.get(0));
        Assertions.assertEquals(seq(5, 5), this.listener.allOnExpiryEvents.get(1));
        Assertions.assertEquals(seq(6, 6), this.listener.allOnExpiryEvents.get(2));
    }

    @Test
    public void testCountBasedWindowWithEventTs() {
        WatermarkCountEvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(3);
        this.windowManager.setEvictionPolicy(watermarkCountEvictionPolicy);
        WatermarkTimeTriggerPolicy watermarkTimeTriggerPolicy = new WatermarkTimeTriggerPolicy(10L, this.windowManager, watermarkCountEvictionPolicy, this.windowManager);
        watermarkTimeTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkTimeTriggerPolicy);
        this.windowManager.add(1, 603L);
        this.windowManager.add(2, 605L);
        this.windowManager.add(3, 607L);
        this.windowManager.add(4, 618L);
        this.windowManager.add(5, 626L);
        this.windowManager.add(6, 636L);
        this.windowManager.add(new WaterMarkEvent(631L));
        Assertions.assertEquals(3, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(1, 3), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(2, 4), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(seq(3, 5), this.listener.allOnActivationEvents.get(2));
        this.windowManager.add(7, 665L);
        this.windowManager.add(8, 666L);
        this.windowManager.add(9, 667L);
        this.windowManager.add(10, 679L);
        this.listener.clear();
        this.windowManager.add(new WaterMarkEvent(674L));
        Assertions.assertEquals(4, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(4, 6), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(4, 6), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(seq(4, 6), this.listener.allOnActivationEvents.get(2));
        Assertions.assertEquals(seq(7, 9), this.listener.allOnActivationEvents.get(3));
    }

    @Test
    public void testCountBasedTriggerWithEventTs() {
        WatermarkTimeEvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(20);
        this.windowManager.setEvictionPolicy(watermarkTimeEvictionPolicy);
        WatermarkCountTriggerPolicy watermarkCountTriggerPolicy = new WatermarkCountTriggerPolicy(3, this.windowManager, watermarkTimeEvictionPolicy, this.windowManager);
        watermarkCountTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkCountTriggerPolicy);
        this.windowManager.add(1, 603L);
        this.windowManager.add(2, 605L);
        this.windowManager.add(3, 607L);
        this.windowManager.add(4, 618L);
        this.windowManager.add(5, 625L);
        this.windowManager.add(6, 626L);
        this.windowManager.add(7, 629L);
        this.windowManager.add(8, 636L);
        this.windowManager.add(new WaterMarkEvent(631L));
        Assertions.assertEquals(2, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(1, 3), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(3, 6), this.listener.allOnActivationEvents.get(1));
        this.windowManager.add(9, 665L);
        this.windowManager.add(10, 666L);
        this.windowManager.add(11, 667L);
        this.windowManager.add(12, 669L);
        this.windowManager.add(12, 679L);
        this.listener.clear();
        this.windowManager.add(new WaterMarkEvent(674L));
        Assertions.assertEquals(2, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(9), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(9, 12), this.listener.allOnActivationEvents.get(1));
    }

    @Test
    public void testCountBasedTumblingWithSameEventTs() {
        WatermarkCountEvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(2);
        this.windowManager.setEvictionPolicy(watermarkCountEvictionPolicy);
        WatermarkCountTriggerPolicy watermarkCountTriggerPolicy = new WatermarkCountTriggerPolicy(2, this.windowManager, watermarkCountEvictionPolicy, this.windowManager);
        watermarkCountTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkCountTriggerPolicy);
        this.windowManager.add(1, 10L);
        this.windowManager.add(2, 10L);
        this.windowManager.add(3, 11L);
        this.windowManager.add(4, 12L);
        this.windowManager.add(5, 12L);
        this.windowManager.add(6, 12L);
        this.windowManager.add(7, 12L);
        this.windowManager.add(8, 13L);
        this.windowManager.add(9, 14L);
        this.windowManager.add(10, 15L);
        this.windowManager.add(new WaterMarkEvent(20L));
        Assertions.assertEquals(5, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(1, 2), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(3, 4), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(seq(5, 6), this.listener.allOnActivationEvents.get(2));
        Assertions.assertEquals(seq(7, 8), this.listener.allOnActivationEvents.get(3));
        Assertions.assertEquals(seq(9, 10), this.listener.allOnActivationEvents.get(4));
    }

    @Test
    public void testCountBasedSlidingWithSameEventTs() {
        WatermarkCountEvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(5);
        this.windowManager.setEvictionPolicy(watermarkCountEvictionPolicy);
        WatermarkCountTriggerPolicy watermarkCountTriggerPolicy = new WatermarkCountTriggerPolicy(2, this.windowManager, watermarkCountEvictionPolicy, this.windowManager);
        watermarkCountTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkCountTriggerPolicy);
        this.windowManager.add(1, 10L);
        this.windowManager.add(2, 10L);
        this.windowManager.add(3, 11L);
        this.windowManager.add(4, 12L);
        this.windowManager.add(5, 12L);
        this.windowManager.add(6, 12L);
        this.windowManager.add(7, 12L);
        this.windowManager.add(8, 13L);
        this.windowManager.add(9, 14L);
        this.windowManager.add(10, 15L);
        this.windowManager.add(new WaterMarkEvent(20L));
        Assertions.assertEquals(5, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(1, 2), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(1, 4), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(seq(2, 6), this.listener.allOnActivationEvents.get(2));
        Assertions.assertEquals(seq(4, 8), this.listener.allOnActivationEvents.get(3));
        Assertions.assertEquals(seq(6, 10), this.listener.allOnActivationEvents.get(4));
    }

    @Test
    public void testEventTimeLag() {
        WatermarkTimeEvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(20, 5);
        this.windowManager.setEvictionPolicy(watermarkTimeEvictionPolicy);
        WatermarkTimeTriggerPolicy watermarkTimeTriggerPolicy = new WatermarkTimeTriggerPolicy(10L, this.windowManager, watermarkTimeEvictionPolicy, this.windowManager);
        watermarkTimeTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkTimeTriggerPolicy);
        this.windowManager.add(1, 603L);
        this.windowManager.add(2, 605L);
        this.windowManager.add(3, 607L);
        this.windowManager.add(4, 618L);
        this.windowManager.add(5, 626L);
        this.windowManager.add(6, 632L);
        this.windowManager.add(7, 629L);
        this.windowManager.add(8, 636L);
        this.windowManager.add(new WaterMarkEvent(631L));
        Assertions.assertEquals(3, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(1, 3), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(1, 4), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(Arrays.asList(4, 5, 7), this.listener.allOnActivationEvents.get(2));
    }

    @Test
    public void testScanStop() {
        final HashSet hashSet = new HashSet();
        WatermarkTimeEvictionPolicy<Integer> watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy<Integer>(20, 5) { // from class: org.apache.storm.windowing.WindowManagerTest.1
            public EvictionPolicy.Action evict(Event<Integer> event) {
                hashSet.add(event.get());
                return super.evict(event);
            }
        };
        this.windowManager.setEvictionPolicy(watermarkTimeEvictionPolicy);
        WatermarkTimeTriggerPolicy watermarkTimeTriggerPolicy = new WatermarkTimeTriggerPolicy(10L, this.windowManager, watermarkTimeEvictionPolicy, this.windowManager);
        watermarkTimeTriggerPolicy.start();
        this.windowManager.setTriggerPolicy(watermarkTimeTriggerPolicy);
        this.windowManager.add(1, 603L);
        this.windowManager.add(2, 605L);
        this.windowManager.add(3, 607L);
        this.windowManager.add(4, 618L);
        this.windowManager.add(5, 626L);
        this.windowManager.add(6, 629L);
        this.windowManager.add(7, 636L);
        this.windowManager.add(8, 637L);
        this.windowManager.add(9, 638L);
        this.windowManager.add(10, 639L);
        this.windowManager.add(new WaterMarkEvent(631L));
        Assertions.assertEquals(3, this.listener.allOnActivationEvents.size());
        Assertions.assertEquals(seq(1, 3), this.listener.allOnActivationEvents.get(0));
        Assertions.assertEquals(seq(1, 4), this.listener.allOnActivationEvents.get(1));
        Assertions.assertEquals(Arrays.asList(4, 5, 6), this.listener.allOnActivationEvents.get(2));
        Assertions.assertEquals(new HashSet(seq(1, 7)), hashSet);
    }

    private List<Integer> seq(int i) {
        return seq(i, i);
    }

    private List<Integer> seq(int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = i; i3 <= i2; i3++) {
            arrayList.add(Integer.valueOf(i3));
        }
        return arrayList;
    }
}
