package org.apache.phoenix.end2end;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.end2end.CDCBaseIT;
import org.apache.phoenix.filter.DistinctPrefixFilter;
import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
@Category({NeedsOwnMiniClusterTest.class})
/* loaded from: input_file:org/apache/phoenix/end2end/CDCQueryIT.class */
public class CDCQueryIT extends CDCBaseIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(CDCQueryIT.class);
    private static final int MAX_LOOKBACK_AGE = 10;
    private final boolean forView;
    private final PTable.QualifierEncodingScheme encodingScheme;
    private final boolean multitenant;
    private final Integer tableSaltBuckets;
    private final boolean withSchemaName;

    public CDCQueryIT(Boolean bool, PTable.QualifierEncodingScheme qualifierEncodingScheme, boolean z, Integer num, boolean z2) {
        this.forView = bool.booleanValue();
        this.encodingScheme = qualifierEncodingScheme;
        this.multitenant = z;
        this.tableSaltBuckets = num;
        this.withSchemaName = z2;
    }

    @Parameterized.Parameters(name = "forView={0}, encodingScheme={1}, multitenant={2}, tableSaltBuckets={3}, withSchemaName={4}")
    public static synchronized Collection<Object[]> data() {
        return Arrays.asList(new Object[]{Boolean.FALSE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE}, new Object[]{Boolean.FALSE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.TRUE}, new Object[]{Boolean.FALSE, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, Boolean.FALSE}, new Object[]{Boolean.FALSE, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, Boolean.TRUE, 2, Boolean.TRUE}, new Object[]{Boolean.FALSE, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE}, new Object[]{Boolean.TRUE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE});
    }

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
        newHashMapWithExpectedSize.put("phoenix.max.lookback.age.seconds", Integer.toString(10));
        setUpTestDriver(new ReadOnlyProps(newHashMapWithExpectedSize.entrySet().iterator()));
    }

    @Before
    public void beforeTest() {
        EnvironmentEdgeManager.reset();
        this.injectEdge = new ManualEnvironmentEdge();
        this.injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
    }

    private void cdcIndexShouldNotBeUsedForDataTableQueries(Connection connection, String str, String str2) throws Exception {
        Assert.assertFalse(QueryUtil.getExplainPlan(connection.createStatement().executeQuery("EXPLAIN SELECT * FROM " + str + " WHERE PHOENIX_ROW_TIMESTAMP() < CURRENT_TIME()")).contains(str2));
    }

    private boolean isDistinctPrefixFilterIncludedInFilterList(FilterList filterList) {
        for (Filter filter : filterList.getFilters()) {
            if (filter instanceof DistinctPrefixFilter) {
                return true;
            }
            if (filter instanceof FilterList) {
                return isDistinctPrefixFilterIncludedInFilterList((FilterList) filter);
            }
        }
        return false;
    }

    private boolean isDistinctPrefixFilterIncluded(Scan scan) {
        Filter filter = scan.getFilter();
        if (filter != null && (filter instanceof DistinctPrefixFilter)) {
            return true;
        }
        if (filter instanceof FilterList) {
            return isDistinctPrefixFilterIncludedInFilterList((FilterList) filter);
        }
        return false;
    }

    private void checkIndexPartitionIdCount(Connection connection, String str, String str2) throws Exception {
        int nonEmptySaltBucketCount = getNonEmptySaltBucketCount(connection, str);
        ResultSet executeQuery = connection.createStatement().executeQuery("SELECT PARTITION_ID() FROM " + str2 + " ORDER BY PARTITION_ID()");
        String[] strArr = new String[nonEmptySaltBucketCount];
        int[] iArr = new int[nonEmptySaltBucketCount];
        int i = 0;
        Assert.assertTrue(executeQuery.next());
        strArr[0] = executeQuery.getString(1);
        iArr[0] = iArr[0] + 1;
        LOGGER.info("PARTITION_ID[0] = " + strArr[0]);
        while (executeQuery.next()) {
            if (!strArr[i].equals(executeQuery.getString(1))) {
                i++;
                strArr[i] = executeQuery.getString(1);
                LOGGER.info("PARTITION_ID[" + i + "] = " + strArr[i]);
            }
            int i2 = i;
            iArr[i2] = iArr[i2] + 1;
        }
        Assert.assertEquals(nonEmptySaltBucketCount, i + 1);
        PhoenixResultSet executeQuery2 = connection.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " + str2);
        Assert.assertTrue(executeQuery2.next());
        int i3 = 0;
        strArr[0] = executeQuery2.getString(1);
        int i4 = 1;
        while (executeQuery2.next()) {
            if (!strArr[i3].equals(executeQuery2.getString(1))) {
                i3++;
                strArr[i3] = executeQuery2.getString(1);
                LOGGER.info("PARTITION_ID[" + i3 + "] = " + strArr[i3]);
            }
            i4++;
        }
        Assert.assertEquals(nonEmptySaltBucketCount, i3 + 1);
        Assert.assertEquals(nonEmptySaltBucketCount, i4);
        Assert.assertTrue(isDistinctPrefixFilterIncluded(executeQuery2.getContext().getScan()));
        PreparedStatement prepareStatement = connection.prepareStatement(getCDCQuery(str2, strArr));
        prepareStatement.setTimestamp(1, new Timestamp(1000L));
        prepareStatement.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
        PhoenixResultSet executeQuery3 = prepareStatement.executeQuery();
        int i5 = 0;
        while (executeQuery3.next()) {
            i5++;
            String string = executeQuery3.getString(1);
            int i6 = executeQuery3.getInt(2);
            boolean z = false;
            int i7 = 0;
            while (true) {
                if (i7 < nonEmptySaltBucketCount) {
                    if (strArr[i7].equals(string) && i6 == iArr[i7]) {
                        z = true;
                        break;
                    }
                    i7++;
                } else {
                    break;
                }
            }
            Assert.assertTrue(z);
        }
        Assert.assertTrue(executeQuery3.getUnderlyingIterator() instanceof RowKeyOrderedAggregateResultIterator);
        Assert.assertEquals(nonEmptySaltBucketCount, i5);
    }

    private static String getCDCQuery(String str, String[] strArr) {
        StringBuilder sb = new StringBuilder("SELECT PARTITION_ID(), Count(*) from ");
        sb.append(str);
        sb.append(" WHERE PARTITION_ID() IN (");
        for (int i = 0; i < strArr.length - 1; i++) {
            sb.append("'");
            sb.append(strArr[i]);
            sb.append("',");
        }
        sb.append("'");
        sb.append(strArr[strArr.length - 1]);
        sb.append("')");
        sb.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?");
        sb.append(" Group By PARTITION_ID()");
        return sb.toString();
    }

    private static String addPartitionInList(Connection connection, String str, String str2) throws SQLException {
        ResultSet executeQuery = connection.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " + str);
        ArrayList<String> arrayList = new ArrayList();
        while (executeQuery.next()) {
            arrayList.add(executeQuery.getString(1));
        }
        StringBuilder sb = new StringBuilder(str2);
        sb.append(" WHERE PARTITION_ID() IN (");
        boolean z = false;
        for (String str3 : arrayList) {
            if (z) {
                sb.append(",'");
            } else {
                sb.append("'");
                z = true;
            }
            sb.append(str3);
            sb.append("'");
        }
        sb.append(")");
        return sb.toString();
    }

    private static PreparedStatement getCDCQueryPreparedStatement(Connection connection, String str, String str2, long j, long j2) throws SQLException {
        PreparedStatement prepareStatement = connection.prepareStatement(addPartitionInList(connection, str, str2) + " AND PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?");
        prepareStatement.setTimestamp(1, new Timestamp(j));
        prepareStatement.setTimestamp(2, new Timestamp(j2));
        return prepareStatement;
    }

    @Test
    public void testSelectCDC() throws Exception {
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
        Connection newConnection = newConnection();
        try {
            createTable(newConnection, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
                createTable(newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = tableName2;
            }
            String generateUniqueName2 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + tableName, this.encodingScheme);
            if (newConnection != null) {
                newConnection.close();
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            long currentTimeMillis = System.currentTimeMillis();
            List<CDCBaseIT.ChangeRow> generateChanges = generateChanges(currentTimeMillis, strArr, tableName, null, this.COMMIT_SUCCESS);
            long currentTimeMillis2 = System.currentTimeMillis();
            long timestamp = generateChanges.get(generateChanges.size() - 1).getTimestamp() + 1;
            if (timestamp > currentTimeMillis2) {
                Thread.sleep(timestamp - currentTimeMillis2);
            }
            final String tableName3 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName2);
            Connection newConnection2 = newConnection(str);
            try {
                dumpCDCResults(newConnection2, generateUniqueName2, new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.1
                    {
                        put("K", "INTEGER");
                    }
                }, addPartitionInList(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K,\"CDC JSON\" FROM " + tableName3));
                ResultSet executeQuery = newConnection2.createStatement().executeQuery("SELECT  /*+ INDEX(" + tableName + " " + CDCUtil.getCDCIndexName(generateUniqueName2) + ") */ k, v1 FROM " + tableName);
                try {
                    Assert.assertTrue(executeQuery.next());
                    Assert.assertEquals(2L, executeQuery.getInt(1));
                    Assert.assertEquals(201L, executeQuery.getInt(2));
                    Assert.assertTrue(executeQuery.next());
                    Assert.assertEquals(3L, executeQuery.getInt(1));
                    Assert.assertEquals(300L, executeQuery.getInt(2));
                    Assert.assertFalse(executeQuery.next());
                    if (executeQuery != null) {
                        executeQuery.close();
                    }
                    ResultSet executeQuery2 = newConnection2.createStatement().executeQuery("SELECT   k, v1 FROM " + tableName);
                    try {
                        Assert.assertTrue(executeQuery2.next());
                        Assert.assertEquals(2L, executeQuery2.getInt(1));
                        Assert.assertEquals(201L, executeQuery2.getInt(2));
                        Assert.assertTrue(executeQuery2.next());
                        Assert.assertEquals(3L, executeQuery2.getInt(1));
                        Assert.assertEquals(300L, executeQuery2.getInt(2));
                        Assert.assertFalse(executeQuery2.next());
                        if (executeQuery2 != null) {
                            executeQuery2.close();
                        }
                        TreeMap<String, String> treeMap = new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.2
                            {
                                put("V1", "INTEGER");
                                put("V2", "INTEGER");
                                put("B.VB", "INTEGER");
                            }
                        };
                        verifyChangesViaSCN(str, getCDCQueryPreparedStatement(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + tableName3, currentTimeMillis, timestamp).executeQuery(), tableName, treeMap, generateChanges, CHANGE_IMG);
                        verifyChangesViaSCN(str, getCDCQueryPreparedStatement(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K,\"CDC JSON\" FROM " + tableName3, currentTimeMillis, timestamp).executeQuery(), tableName, treeMap, generateChanges, CHANGE_IMG);
                        verifyChangesViaSCN(str, getCDCQueryPreparedStatement(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + tableName3, currentTimeMillis, timestamp).executeQuery(), tableName, treeMap, generateChanges, PRE_POST_IMG);
                        verifyChangesViaSCN(str, getCDCQueryPreparedStatement(newConnection2, tableName3, "SELECT * FROM " + tableName3, currentTimeMillis, timestamp).executeQuery(), tableName, treeMap, generateChanges, new HashSet());
                        HashMap<String, int[]> hashMap = new HashMap<String, int[]>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.3
                            {
                                put("SELECT 'dummy', k, \"CDC JSON\" FROM " + tableName3 + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC, K ASC", new int[]{1, 2, 3, 1, 1, 1, 1, 2, 1, 1, 1, 1});
                                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + tableName3 + " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3});
                                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + tableName3 + " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1});
                            }
                        };
                        HashMap hashMap2 = new HashMap() { // from class: org.apache.phoenix.end2end.CDCQueryIT.4
                            {
                                put("event_type", "dummy");
                            }
                        };
                        for (Map.Entry<String, int[]> entry : hashMap.entrySet()) {
                            executeQuery = newConnection2.createStatement().executeQuery(entry.getKey());
                            for (int i = 0; i < entry.getValue().length; i++) {
                                try {
                                    int i2 = entry.getValue()[i];
                                    Assert.assertEquals(true, Boolean.valueOf(executeQuery.next()));
                                    Assert.assertEquals("Index: " + i + " for query: " + entry.getKey(), i2, executeQuery.getInt(2));
                                    Map map = (Map) mapper.reader(HashMap.class).readValue(executeQuery.getString(3));
                                    map.put("event_type", "dummy");
                                    Assert.assertEquals(hashMap2, map);
                                } finally {
                                }
                            }
                            Assert.assertEquals(false, Boolean.valueOf(executeQuery.next()));
                            if (executeQuery != null) {
                                executeQuery.close();
                            }
                        }
                        cdcIndexShouldNotBeUsedForDataTableQueries(newConnection2, tableName, generateUniqueName2);
                        if (newConnection2 != null) {
                            newConnection2.close();
                        }
                    } finally {
                    }
                } finally {
                    if (executeQuery != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } catch (Throwable th2) {
                if (newConnection2 != null) {
                    try {
                        newConnection2.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
                throw th2;
            }
        } catch (Throwable th4) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSelectGeneric() throws Exception {
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
        TreeMap<String, String> treeMap = new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.5
            {
                put("K1", "INTEGER");
                put("K2", "INTEGER");
            }
        };
        TreeMap<String, String> treeMap2 = new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.6
            {
                put("V1", "INTEGER");
                put("V2", "VARCHAR");
                put("V3", "CHAR");
                put("V4", "DOUBLE");
                put("V5", "DATE");
                put("V6", "TIME");
                put("V7", "TIMESTAMP");
                put("V8", "VARBINARY");
                put("V9", "BINARY");
                put("V10", "VARCHAR ARRAY");
                put("V11", "JSON");
            }
        };
        Connection newConnection = newConnection();
        try {
            createTable(newConnection, tableName, treeMap, treeMap2, this.multitenant, this.encodingScheme, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
                createTable(newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = tableName2;
            }
            String generateUniqueName2 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + tableName + " INCLUDE (change)", this.encodingScheme);
            if (newConnection != null) {
                newConnection.close();
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            long currentTimeMillis = System.currentTimeMillis();
            HashMap hashMap = new HashMap(strArr.length);
            for (String str2 : strArr) {
                hashMap.put(str2, generateMutations(str, currentTimeMillis, treeMap, treeMap2, 20, 5));
                applyMutations(this.COMMIT_SUCCESS, generateUniqueName, tableName, tableName, str2, (List) hashMap.get(str2), generateUniqueName2);
            }
            String tableName3 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName2);
            Connection newConnection2 = newConnection(str);
            try {
                dumpCDCResults(newConnection2, generateUniqueName2, treeMap, addPartitionInList(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + tableName3));
                ArrayList arrayList = new ArrayList();
                Iterator it = ((List) hashMap.get(str)).iterator();
                while (it.hasNext()) {
                    arrayList.addAll((Set) it.next());
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                long timestamp = arrayList.get(arrayList.size() - 1).getTimestamp() + 1;
                if (timestamp > currentTimeMillis2) {
                    Thread.sleep(timestamp - currentTimeMillis2);
                }
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery(addPartitionInList(newConnection2, tableName3, "SELECT * FROM " + tableName3)), tableName, treeMap2, arrayList, CHANGE_IMG);
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery(addPartitionInList(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + tableName3)), tableName, treeMap2, arrayList, CHANGE_IMG);
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery(addPartitionInList(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + tableName3)), tableName, treeMap2, arrayList, PRE_POST_IMG);
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery(addPartitionInList(newConnection2, tableName3, "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + tableName3)), tableName, treeMap2, arrayList, ALL_IMG);
                cdcIndexShouldNotBeUsedForDataTableQueries(newConnection2, tableName, generateUniqueName2);
                checkIndexPartitionIdCount(newConnection2, tableName, tableName3);
                if (newConnection2 != null) {
                    newConnection2.close();
                }
            } catch (Throwable th) {
                if (newConnection2 != null) {
                    try {
                        newConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private int getNonEmptySaltBucketCount(Connection connection, String str) throws SQLException {
        if (this.tableSaltBuckets == null) {
            return 1;
        }
        HashSet newHashSet = Sets.newHashSet();
        ResultSet executeQuery = connection.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ROWKEY_BYTES_STRING() FROM " + str);
        while (executeQuery.next()) {
            try {
                newHashSet.add(executeQuery.getString(1).substring(0, 4));
            } catch (Throwable th) {
                if (executeQuery != null) {
                    try {
                        executeQuery.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (executeQuery != null) {
            executeQuery.close();
        }
        return newHashSet.size();
    }

    private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStorageScheme) throws Exception {
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
        Connection newConnection = newConnection();
        try {
            createTable(newConnection, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, true, immutableStorageScheme);
            if (this.forView) {
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
                createTable(newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = tableName2;
            }
            String generateUniqueName2 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + tableName, this.encodingScheme);
            if (newConnection != null) {
                newConnection.close();
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            List<CDCBaseIT.ChangeRow> generateChangesImmutableTable = generateChangesImmutableTable(System.currentTimeMillis(), strArr, generateUniqueName, tableName, tableName, this.COMMIT_SUCCESS, generateUniqueName2);
            String tableName3 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName2);
            TreeMap<String, String> treeMap = new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.7
                {
                    put("V1", "INTEGER");
                    put("V2", "INTEGER");
                }
            };
            Connection newConnection2 = newConnection(str);
            try {
                dumpCDCResults(newConnection2, generateUniqueName2, new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.8
                    {
                        put("K", "INTEGER");
                    }
                }, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K,\"CDC JSON\" FROM " + tableName3);
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + tableName3 + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), tableName, treeMap, generateChangesImmutableTable, PRE_POST_IMG);
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + tableName3 + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), tableName, treeMap, generateChangesImmutableTable, CHANGE_IMG);
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + tableName3 + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), tableName, treeMap, generateChangesImmutableTable, CHANGE_IMG);
                cdcIndexShouldNotBeUsedForDataTableQueries(newConnection2, tableName, generateUniqueName2);
                if (newConnection2 != null) {
                    newConnection2.close();
                }
            } catch (Throwable th) {
                if (newConnection2 != null) {
                    try {
                        newConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSelectCDCImmutableOneCellPerColumn() throws Exception {
        _testSelectCDCImmutable(PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
    }

    @Test
    public void testSelectCDCImmutableSingleCell() throws Exception {
        _testSelectCDCImmutable(PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
    }

    @Test
    public void testSelectWithTimeRange() throws Exception {
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
        TreeMap<String, String> treeMap = new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.9
            {
                put("K1", "INTEGER");
            }
        };
        TreeMap<String, String> treeMap2 = new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.10
            {
                put("V1", "INTEGER");
            }
        };
        Connection newConnection = newConnection();
        try {
            createTable(newConnection, tableName, treeMap, treeMap2, this.multitenant, this.encodingScheme, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
                createTable(newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = tableName2;
            }
            String generateUniqueName2 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + tableName + " INCLUDE (change)", this.encodingScheme);
            cdcIndexShouldNotBeUsedForDataTableQueries(newConnection, tableName, generateUniqueName2);
            if (newConnection != null) {
                newConnection.close();
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            long currentTimeMillis = System.currentTimeMillis();
            HashMap hashMap = new HashMap(strArr.length);
            for (String str2 : strArr) {
                hashMap.put(str2, generateMutations(str, currentTimeMillis, treeMap, treeMap2, 20, 5));
                applyMutations(this.COMMIT_SUCCESS, generateUniqueName, tableName, tableName, str2, (List) hashMap.get(str2), generateUniqueName2);
            }
            String tableName3 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName2);
            Connection newConnection2 = newConnection(str);
            try {
                dumpCDCResults(newConnection2, generateUniqueName2, treeMap, "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + tableName3);
                ArrayList arrayList = new ArrayList();
                Iterator it = ((List) hashMap.get(str)).iterator();
                while (it.hasNext()) {
                    arrayList.addAll((Set) it.next());
                }
                ArrayList arrayList2 = new ArrayList();
                Integer num = null;
                for (CDCBaseIT.ChangeRow changeRow : arrayList) {
                    if (arrayList2.size() == 0 || ((Long) arrayList2.get(arrayList2.size() - 1)).longValue() != changeRow.changeTS) {
                        arrayList2.add(Long.valueOf(changeRow.changeTS));
                    }
                    if (changeRow.change == null) {
                        num = Integer.valueOf(arrayList2.size() - 1);
                    }
                }
                Random random = new Random();
                int nextInt = random.nextInt(num.intValue() - 1);
                int nextInt2 = nextInt + 1 + random.nextInt(arrayList2.size() - (nextInt + 1));
                verifyChangesViaSCN(str, newConnection2, tableName3, treeMap, tableName, treeMap2, arrayList, 0L, System.currentTimeMillis());
                verifyChangesViaSCN(str, newConnection2, tableName3, treeMap, tableName, treeMap2, arrayList, nextInt, nextInt2);
                verifyChangesViaSCN(str, newConnection2, tableName3, treeMap, tableName, treeMap2, arrayList, nextInt, num.intValue());
                verifyChangesViaSCN(str, newConnection2, tableName3, treeMap, tableName, treeMap2, arrayList, num.intValue(), nextInt2);
                if (newConnection2 != null) {
                    newConnection2.close();
                }
            } catch (Throwable th) {
                if (newConnection2 != null) {
                    try {
                        newConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSelectCDCWithDDL() throws Exception {
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
        Connection newConnection = newConnection();
        try {
            createTable(newConnection, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v0 INTEGER, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
                createTable(newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = tableName2;
            }
            String generateUniqueName2 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + tableName, this.encodingScheme);
            newConnection.createStatement().execute("ALTER TABLE " + tableName + " DROP COLUMN v0");
            if (newConnection != null) {
                newConnection.close();
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            List<CDCBaseIT.ChangeRow> generateChanges = generateChanges(System.currentTimeMillis(), strArr, tableName, tableName, this.COMMIT_SUCCESS);
            TreeMap<String, String> treeMap = new TreeMap<String, String>() { // from class: org.apache.phoenix.end2end.CDCQueryIT.11
                {
                    put("V0", "INTEGER");
                    put("V1", "INTEGER");
                    put("V1V2", "INTEGER");
                    put("V2", "INTEGER");
                    put("B.VB", "INTEGER");
                    put("V3", "INTEGER");
                }
            };
            Connection newConnection2 = newConnection(str);
            try {
                verifyChangesViaSCN(str, newConnection2.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + SchemaUtil.getTableName(generateUniqueName, generateUniqueName2) + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), tableName, treeMap, generateChanges, CHANGE_IMG);
                cdcIndexShouldNotBeUsedForDataTableQueries(newConnection2, tableName, generateUniqueName2);
                if (newConnection2 != null) {
                    newConnection2.close();
                }
            } catch (Throwable th) {
                if (newConnection2 != null) {
                    try {
                        newConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSelectCDCFailDataTableUpdate() throws Exception {
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
        Connection newConnection = newConnection();
        try {
            createTable(newConnection, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName());
                createTable(newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = tableName2;
            }
            String generateUniqueName2 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + tableName, this.encodingScheme);
            cdcIndexShouldNotBeUsedForDataTableQueries(newConnection, tableName, generateUniqueName2);
            if (newConnection != null) {
                newConnection.close();
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            generateChanges(System.currentTimeMillis(), strArr, tableName, null, this.COMMIT_FAILURE_EXPECTED);
            Connection newConnection2 = newConnection(str);
            try {
                Assert.assertEquals(false, Boolean.valueOf(newConnection2.createStatement().executeQuery("SELECT * FROM " + SchemaUtil.getTableName(generateUniqueName, generateUniqueName2)).next()));
                if (newConnection2 != null) {
                    newConnection2.close();
                }
            } catch (Throwable th) {
                if (newConnection2 != null) {
                    try {
                        newConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCDCIndexBuildAndVerification() throws Exception {
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String generateUniqueName2 = generateUniqueName();
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName2);
        PhoenixConnection newConnection = newConnection();
        try {
            createTable(newConnection, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String generateUniqueName3 = generateUniqueName();
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName3);
                createTable((Connection) newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                generateUniqueName2 = generateUniqueName3;
                tableName = tableName2;
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            List<CDCBaseIT.ChangeRow> generateChanges = generateChanges(System.currentTimeMillis(), strArr, tableName, tableName, this.COMMIT_SUCCESS, null, 0);
            long currentTimeMillis = System.currentTimeMillis();
            long timestamp = generateChanges.get(generateChanges.size() - 1).getTimestamp() + 1;
            if (timestamp > currentTimeMillis) {
                Thread.sleep(timestamp - currentTimeMillis);
            }
            String generateUniqueName4 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName4 + " ON " + tableName, this.encodingScheme);
            PTable tableNoCache = newConnection.getTableNoCache(SchemaUtil.getTableName(generateUniqueName, CDCUtil.getCDCIndexName(generateUniqueName4)));
            Assert.assertEquals(tableNoCache.getIndexState(), PIndexState.ACTIVE);
            TestUtil.assertRawRowCount(newConnection, TableName.valueOf(tableNoCache.getPhysicalName().getString()), 0);
            IndexToolIT.runIndexTool(false, generateUniqueName, generateUniqueName2, CDCUtil.getCDCIndexName(generateUniqueName4));
            TestUtil.assertRawRowCount(newConnection, TableName.valueOf(tableNoCache.getPhysicalName().getString()), 0);
            List<CDCBaseIT.ChangeRow> generateChanges2 = generateChanges(System.currentTimeMillis(), strArr, tableName, tableName, this.COMMIT_SUCCESS, null, 1);
            long currentTimeMillis2 = System.currentTimeMillis();
            long timestamp2 = generateChanges2.get(generateChanges2.size() - 1).getTimestamp() + 1;
            if (timestamp2 > currentTimeMillis2) {
                Thread.sleep(timestamp2 - currentTimeMillis2);
            }
            IndexTool runIndexTool = IndexToolIT.runIndexTool(false, generateUniqueName, generateUniqueName2, CDCUtil.getCDCIndexName(generateUniqueName4), null, 0, IndexTool.IndexVerifyType.ONLY, new String[0]);
            Assert.assertEquals(0L, runIndexTool.getJob().getCounters().findCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals(0L, runIndexTool.getJob().getCounters().findCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals(0L, runIndexTool.getJob().getCounters().findCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals(0L, runIndexTool.getJob().getCounters().findCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals(0L, runIndexTool.getJob().getCounters().findCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals(0L, runIndexTool.getJob().getCounters().findCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals(0L, runIndexTool.getJob().getCounters().findCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
            if (newConnection != null) {
                newConnection.close();
            }
        } catch (Throwable th) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCDCIndexTTLEqualsToMaxLookbackAge() throws Exception {
        if (this.forView) {
            return;
        }
        String generateUniqueName = this.withSchemaName ? generateUniqueName() : null;
        String generateUniqueName2 = generateUniqueName();
        String tableName = SchemaUtil.getTableName(generateUniqueName, generateUniqueName2);
        PhoenixConnection newConnection = newConnection();
        try {
            createTable(newConnection, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String generateUniqueName3 = generateUniqueName();
                String tableName2 = SchemaUtil.getTableName(generateUniqueName, generateUniqueName3);
                createTable((Connection) newConnection, "CREATE VIEW " + tableName2 + " AS SELECT * FROM " + tableName, this.encodingScheme);
                generateUniqueName2 = generateUniqueName3;
                tableName = tableName2;
            }
            String str = this.multitenant ? "1000" : null;
            String[] strArr = {str};
            if (this.multitenant) {
                strArr = new String[]{str, "2000"};
            }
            String generateUniqueName4 = generateUniqueName();
            createCDC(newConnection, "CREATE CDC " + generateUniqueName4 + " ON " + tableName, this.encodingScheme);
            List<CDCBaseIT.ChangeRow> generateChanges = generateChanges(System.currentTimeMillis(), strArr, tableName, tableName, this.COMMIT_SUCCESS, null, 0);
            String obj = newConnection.getTableNoCache(SchemaUtil.getTableName(generateUniqueName, CDCUtil.getCDCIndexName(generateUniqueName4))).getPhysicalName().toString();
            int rawRowCount = TestUtil.getRawRowCount(newConnection, TableName.valueOf(obj));
            long currentTimeMillis = System.currentTimeMillis();
            long timestamp = generateChanges.get(generateChanges.size() - 1).getTimestamp() + 10000 + 1;
            if (timestamp > currentTimeMillis) {
                Thread.sleep(timestamp - currentTimeMillis);
            }
            TestUtil.doMajorCompaction(newConnection, obj);
            TestUtil.assertRawRowCount(newConnection, TableName.valueOf(obj), 0);
            IndexToolIT.runIndexTool(false, generateUniqueName, generateUniqueName2, CDCUtil.getCDCIndexName(generateUniqueName4));
            TestUtil.assertRawRowCount(newConnection, TableName.valueOf(obj), 0);
            List<CDCBaseIT.ChangeRow> generateChanges2 = generateChanges(System.currentTimeMillis(), strArr, tableName, tableName, this.COMMIT_SUCCESS, null, 0);
            List<CDCBaseIT.ChangeRow> generateChanges3 = generateChanges(generateChanges2.get(generateChanges2.size() - 1).getTimestamp() + 10000 + 1, strArr, tableName, tableName, this.COMMIT_SUCCESS, null, 10);
            long timestamp2 = generateChanges3.get(generateChanges3.size() - 1).getTimestamp() + 1;
            long currentTimeMillis2 = System.currentTimeMillis();
            if (timestamp2 > currentTimeMillis2) {
                Thread.sleep(timestamp2 - currentTimeMillis2);
            }
            TestUtil.doMajorCompaction(newConnection, obj);
            TestUtil.assertRawRowCount(newConnection, TableName.valueOf(obj), rawRowCount);
            IndexToolIT.runIndexTool(false, generateUniqueName, generateUniqueName2, CDCUtil.getCDCIndexName(generateUniqueName4));
            TestUtil.assertRawRowCount(newConnection, TableName.valueOf(obj), rawRowCount);
            if (newConnection != null) {
                newConnection.close();
            }
        } catch (Throwable th) {
            if (newConnection != null) {
                try {
                    newConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
