package org.apache.flink.table.planner.runtime.harness;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.runtime.harness.HarnessTestBase;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.Symbol;
import scala.Symbol$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.MutableList;
import scala.reflect.ScalaSignature;

/* compiled from: OverAggregateHarnessTest.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u000194A!\u0001\u0002\u0001#\tArJ^3s\u0003\u001e<'/Z4bi\u0016D\u0015M\u001d8fgN$Vm\u001d;\u000b\u0005\r!\u0011a\u00025be:,7o\u001d\u0006\u0003\u000b\u0019\tqA];oi&lWM\u0003\u0002\b\u0011\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u0005\u000b\u0003\u0015!\u0018M\u00197f\u0015\tYA\"A\u0003gY&t7N\u0003\u0002\u000e\u001d\u00051\u0011\r]1dQ\u0016T\u0011aD\u0001\u0004_J<7\u0001A\n\u0003\u0001I\u0001\"a\u0005\u000b\u000e\u0003\tI!!\u0006\u0002\u0003\u001f!\u000b'O\\3tgR+7\u000f\u001e\"bg\u0016D\u0001b\u0006\u0001\u0003\u0002\u0003\u0006I\u0001G\u0001\u0005[>$W\r\u0005\u0002\u001a_9\u0011!\u0004\f\b\u00037)r!\u0001H\u0015\u000f\u0005uAcB\u0001\u0010(\u001d\tybE\u0004\u0002!K9\u0011\u0011\u0005J\u0007\u0002E)\u00111\u0005E\u0001\u0007yI|w\u000e\u001e \n\u0003=I!!\u0004\b\n\u0005-a\u0011BA\u0005\u000b\u0013\t9\u0001\"\u0003\u0002\u0006\r%\u00111\u0006B\u0001\u0006kRLGn]\u0005\u0003[9\n!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016T!a\u000b\u0003\n\u0005A\n$\u0001E*uCR,')Y2lK:$Wj\u001c3f\u0015\tic\u0006C\u00034\u0001\u0011\u0005A'\u0001\u0004=S:LGO\u0010\u000b\u0003kY\u0002\"a\u0005\u0001\t\u000b]\u0011\u0004\u0019\u0001\r\t\u000ba\u0002A\u0011I\u001d\u0002\r\t,gm\u001c:f)\u0005Q\u0004CA\u001e?\u001b\u0005a$\"A\u001f\u0002\u000bM\u001c\u0017\r\\1\n\u0005}b$\u0001B+oSRD#aN!\u0011\u0005\t+U\"A\"\u000b\u0005\u0011s\u0011!\u00026v]&$\u0018B\u0001$D\u0005\u0019\u0011UMZ8sK\")\u0001\n\u0001C\u0001s\u0005YB/Z:u!J|7\rV5nK\n{WO\u001c3fIJ{wo](wKJD#a\u0012&\u0011\u0005\t[\u0015B\u0001'D\u0005\u0011!Vm\u001d;\t\u000b9\u0003A\u0011A\u001d\u00029Q,7\u000f\u001e)s_\u000e$\u0016.\\3C_VtG-\u001a3SC:<Wm\u0014<fe\"\u0012QJ\u0013\u0005\u0006#\u0002!\t!O\u0001\u001ai\u0016\u001cH\u000f\u0015:pGRKW.Z+oE>,h\u000eZ3e\u001fZ,'\u000f\u000b\u0002Q\u0015\")A\u000b\u0001C\u0001s\u0005YB/Z:u%><H+[7f\u0005>,h\u000eZ3e%\u0006tw-Z(wKJD#a\u0015&\t\u000b]\u0003A\u0011A\u001d\u00025Q,7\u000f\u001e*poRKW.\u001a\"pk:$W\r\u001a*poN|e/\u001a:)\u0005YS\u0005\"\u0002.\u0001\t\u0003I\u0014!\b;fgR\u0014vn\u001e+j[\u0016,fNY8v]\u0012,GMU1oO\u0016|e/\u001a:)\u0005eS\u0005\"B/\u0001\t\u0003I\u0014\u0001\b;fgR\u0014vn\u001e+j[\u0016,fNY8v]\u0012,GMU8xg>3XM\u001d\u0015\u00039*CC\u0001\u00011gOB\u0011\u0011\rZ\u0007\u0002E*\u00111mQ\u0001\u0007eVtg.\u001a:\n\u0005\u0015\u0014'a\u0002*v]^KG\u000f[\u0001\u0006m\u0006dW/Z\u0012\u0002QB\u0011\u0011\u000e\\\u0007\u0002U*\u00111nQ\u0001\beVtg.\u001a:t\u0013\ti'NA\u0007QCJ\fW.\u001a;fe&TX\r\u001a")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.class */
public class OverAggregateHarnessTest extends HarnessTestBase {
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("currtime");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("c");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("proctime");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("rowtime");

    @Override // org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    @Before
    public void before() {
        super.before();
        tEnv_$eq(StreamTableEnvironmentImpl$.MODULE$.create(env(), EnvironmentSettings.newInstance().inStreamingMode().build(), new HarnessTestBase.TestTableConfig(this)));
    }

    @Test
    public void testProcTimeBoundedRowsOver() {
        tEnv().registerTable("T", package$.MODULE$.dataStreamConversions(env().fromCollection(new MutableList(), new OverAggregateHarnessTest$$anon$8(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()})));
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT currtime, b, c,\n        | min(c) OVER\n        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),\n        | max(c) OVER\n        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)\n        |FROM T\n      ")).stripMargin());
        tEnv().getConfig().setIdleStateRetentionTime(Time.seconds(2L), Time.seconds(4L));
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(package$.MODULE$.tableConversions(sqlQuery).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "OverAggregate");
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.BIGINT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()});
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(1L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(10L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(2L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(3L), null})));
        createHarnessTester.setProcessingTime(1100L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(20L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(4L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(5L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(6L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(30L), null})));
        createHarnessTester.setProcessingTime(3001L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(7L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(8L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(9L), null})));
        createHarnessTester.setProcessingTime(6002L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(10L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2L), "bbb", Predef$.MODULE$.long2Long(40L), null})));
        ConcurrentLinkedQueue output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(1L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(10L), null, Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(2L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(3L), null, Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(3L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(20L), null, Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(4L), null, Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(5L), null, Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(5L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(6L), null, Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(30L), null, Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(30L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(7L), null, Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(7L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(8L), null, Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(8L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(9L), null, Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(9L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(10L), null, Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2L), "bbb", Predef$.MODULE$.long2Long(40L), null, Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testProcTimeBoundedRangeOver() {
        tEnv().registerTable("T", package$.MODULE$.dataStreamConversions(env().fromCollection(new MutableList(), new OverAggregateHarnessTest$$anon$9(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()})));
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT currtime, b, c,\n        | min(c) OVER\n        |   (PARTITION BY b ORDER BY proctime\n        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),\n        | max(c) OVER\n        |   (PARTITION BY b ORDER BY proctime\n        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)\n        |FROM T\n      ")).stripMargin())).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "OverAggregate");
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.BIGINT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()});
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(3L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), null})));
        createHarnessTester.setProcessingTime(4L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), null})));
        createHarnessTester.setProcessingTime(3003L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), null})));
        createHarnessTester.setProcessingTime(5L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), null})));
        createHarnessTester.setProcessingTime(6002L);
        createHarnessTester.setProcessingTime(7002L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), null})));
        createHarnessTester.setProcessingTime(11002L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), null})));
        createHarnessTester.setProcessingTime(11004L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), null})));
        createHarnessTester.setProcessingTime(11006L);
        ConcurrentLinkedQueue output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), null, Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), null, Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), null, Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), null, Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), null, Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(30L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), null, Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(7L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), null, Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), null, Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), null, Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), null, Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testProcTimeUnboundedOver() {
        tEnv().registerTable("T", package$.MODULE$.dataStreamConversions(env().fromCollection(new MutableList(), new OverAggregateHarnessTest$$anon$10(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()})));
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT currtime, b, c,\n        | min(c) OVER\n        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),\n        | max(c) OVER\n        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)\n        |FROM T\n      ")).stripMargin());
        tEnv().getConfig().setIdleStateRetentionTime(Time.seconds(2L), Time.seconds(4L));
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(package$.MODULE$.tableConversions(sqlQuery).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "OverAggregate");
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.BIGINT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()});
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1003L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), null})));
        createHarnessTester.setProcessingTime(5003L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), null})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), null})));
        ConcurrentLinkedQueue output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), null, Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), null, Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(5L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), null, Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), null, Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), null, Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(9L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), null, Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), null, Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeBoundedRangeOver() {
        tEnv().registerTable("T", package$.MODULE$.dataStreamConversions(env().fromCollection(new MutableList(), new OverAggregateHarnessTest$$anon$11(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{(Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime(), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)})));
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT rowtime, b, c,\n        | min(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),\n        | max(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)\n        |FROM T\n      ")).stripMargin())).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "OverAggregate");
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.BIGINT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()});
        createHarnessTester.open();
        createHarnessTester.processWatermark(1L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.processWatermark(2L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(3L), "bbb", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.processWatermark(4001L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4002L), "aaa", Predef$.MODULE$.long2Long(3L)})));
        createHarnessTester.processWatermark(4002L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4003L), "aaa", Predef$.MODULE$.long2Long(4L)})));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4801L), "bbb", Predef$.MODULE$.long2Long(25L)})));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)})));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)})));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)})));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)})));
        createHarnessTester.processWatermark(19000L);
        Collection<Object> dropWatermarks = dropWatermarks(createHarnessTester.getOutput().toArray());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(3L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4002L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4003L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4801L), "bbb", Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(25L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(7L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(8L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(30L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, dropWatermarks);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeBoundedRowsOver() {
        tEnv().registerTable("T", package$.MODULE$.dataStreamConversions(env().fromCollection(new MutableList(), new OverAggregateHarnessTest$$anon$12(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{(Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime(), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)})));
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT rowtime, b, c,\n        | min(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),\n        | max(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)\n        |FROM T\n      ")).stripMargin());
        tEnv().getConfig().setIdleStateRetentionTime(Time.seconds(1L), Time.seconds(2L));
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(package$.MODULE$.tableConversions(sqlQuery).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "OverAggregate");
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.BIGINT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()});
        createHarnessTester.open();
        createHarnessTester.processWatermark(800L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.processWatermark(2500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)})));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)})));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)})));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)})));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)})));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)})));
        createHarnessTester.processWatermark(19000L);
        createHarnessTester.setProcessingTime(1000L);
        createHarnessTester.processWatermark(20000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.setProcessingTime(2500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.processWatermark(20010L);
        createHarnessTester.setProcessingTime(4499L);
        createHarnessTester.setProcessingTime(4500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L)})));
        createHarnessTester.setProcessingTime(6500L);
        createHarnessTester.processWatermark(20020L);
        createHarnessTester.setProcessingTime(8499L);
        createHarnessTester.setProcessingTime(8500L);
        Collection<Object> dropWatermarks = dropWatermarks(createHarnessTester.getOutput().toArray());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(5L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(7L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(8L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(9L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(40L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L)})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, dropWatermarks);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeUnboundedRangeOver() {
        tEnv().registerTable("T", package$.MODULE$.dataStreamConversions(env().fromCollection(new MutableList(), new OverAggregateHarnessTest$$anon$13(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{(Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime(), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)})));
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT rowtime, b, c,\n        | min(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW),\n        | max(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW)\n        |FROM T\n      ")).stripMargin());
        tEnv().getConfig().setIdleStateRetentionTime(Time.seconds(1L), Time.seconds(2L));
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(package$.MODULE$.tableConversions(sqlQuery).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "OverAggregate");
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.BIGINT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()});
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1000L);
        createHarnessTester.processWatermark(800L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.processWatermark(2500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)})));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)})));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)})));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)})));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)})));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)})));
        createHarnessTester.processWatermark(19000L);
        createHarnessTester.setProcessingTime(2999L);
        createHarnessTester.setProcessingTime(3000L);
        createHarnessTester.processWatermark(20000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20000L), "ccc", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.setProcessingTime(2500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.setProcessingTime(5000L);
        createHarnessTester.processWatermark(20010L);
        createHarnessTester.setProcessingTime(6999L);
        createHarnessTester.setProcessingTime(7000L);
        Collection<Object> dropWatermarks = dropWatermarks(createHarnessTester.getOutput().toArray());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(40L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, dropWatermarks);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeUnboundedRowsOver() {
        tEnv().registerTable("T", package$.MODULE$.dataStreamConversions(env().fromCollection(new MutableList(), new OverAggregateHarnessTest$$anon$14(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{(Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime(), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)})));
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT rowtime, b, c,\n        | min(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),\n        | max(c) OVER\n        |   (PARTITION BY b ORDER BY rowtime\n        |   ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)\n        |FROM T\n      ")).stripMargin());
        tEnv().getConfig().setIdleStateRetentionTime(Time.seconds(1L), Time.seconds(2L));
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarnessTester = createHarnessTester(package$.MODULE$.tableConversions(sqlQuery).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "OverAggregate");
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.BIGINT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BIGINT().getLogicalType()});
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1000L);
        createHarnessTester.processWatermark(800L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.processWatermark(2500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)})));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)})));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)})));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)})));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)})));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)})));
        createHarnessTester.processWatermark(19000L);
        createHarnessTester.setProcessingTime(2999L);
        createHarnessTester.setProcessingTime(3000L);
        createHarnessTester.processWatermark(20000L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20000L), "ccc", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)})));
        createHarnessTester.setProcessingTime(2500L);
        createHarnessTester.processElement(new StreamRecord(StreamRecordUtils.binaryrow(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)})));
        createHarnessTester.setProcessingTime(5000L);
        createHarnessTester.processWatermark(20010L);
        createHarnessTester.setProcessingTime(6999L);
        createHarnessTester.setProcessingTime(7000L);
        Collection<Object> dropWatermarks = dropWatermarks(createHarnessTester.getOutput().toArray());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(5L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(9L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(40L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)})));
        concurrentLinkedQueue.add(new StreamRecord(StreamRecordUtils.row(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)})));
        rowDataHarnessAssertor.assertOutputEqualsSorted("result mismatch", concurrentLinkedQueue, dropWatermarks);
        createHarnessTester.close();
    }

    public OverAggregateHarnessTest(StreamingWithStateTestBase.StateBackendMode stateBackendMode) {
        super(stateBackendMode);
    }
}
