package org.apache.phoenix.end2end;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServicesTestImpl;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({NeedsOwnMiniClusterTest.class})
/* loaded from: input_file:org/apache/phoenix/end2end/CDCStreamIT.class */
public class CDCStreamIT extends CDCBaseIT {
    private static RegionCoprocessorEnvironment TaskRegionEnvironment;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/phoenix/end2end/CDCStreamIT$PartitionMetadata.class */
    public class PartitionMetadata {
        public String partitionId;
        public String parentPartitionId;
        public Long startTime;
        public Long endTime;
        public byte[] startKey;
        public byte[] endKey;

        public PartitionMetadata(ResultSet resultSet) throws SQLException {
            this.partitionId = resultSet.getString(3);
            this.parentPartitionId = resultSet.getString(4);
            this.startTime = Long.valueOf(resultSet.getLong(5));
            this.endTime = Long.valueOf(resultSet.getLong(6));
            this.startKey = resultSet.getBytes(7);
            this.endKey = resultSet.getBytes(8);
        }
    }

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
        newHashMapWithExpectedSize.put("phoenix.max.lookback.age.seconds", Integer.toString(IndexToolForNonTxGlobalIndexIT.MAX_LOOKBACK_AGE));
        newHashMapWithExpectedSize.put("phoenix.use.stats.parallelization", Boolean.toString(false));
        newHashMapWithExpectedSize.put("phoenix.task.handling.interval.ms", Long.toString(QueryServicesTestImpl.DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY));
        newHashMapWithExpectedSize.put("phoenix.task.handling.initial.delay.ms", Long.toString(QueryServicesTestImpl.DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY));
        newHashMapWithExpectedSize.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName());
        setUpTestDriver(new ReadOnlyProps(newHashMapWithExpectedSize.entrySet().iterator()));
        TaskRegionEnvironment = ((HRegion) getUtility().getRSForFirstRegionInTable(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME).getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME).get(0)).getCoprocessorHost().findCoprocessorEnvironment(TaskRegionObserver.class.getName());
    }

    @Test
    public void testStreamPartitionMetadataBootstrap() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        String generateUniqueName2 = generateUniqueName();
        newConnection.createStatement().execute("CREATE TABLE  " + generateUniqueName + " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 DATE)");
        createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + generateUniqueName, null);
        String streamName = getStreamName(newConnection, generateUniqueName, generateUniqueName2);
        assertStreamStatus(newConnection, generateUniqueName, streamName, CDCUtil.CdcStreamStatus.ENABLING);
        new TaskRegionObserver.SelfHealingTask(TaskRegionEnvironment, 1800000L).run();
        assertPartitionMetadata(newConnection, generateUniqueName, generateUniqueName2);
        assertStreamStatus(newConnection, generateUniqueName, streamName, CDCUtil.CdcStreamStatus.ENABLED);
    }

    @Test
    public void testOnlyOneStreamAllowed() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        String generateUniqueName2 = generateUniqueName();
        newConnection.createStatement().execute("CREATE TABLE  " + generateUniqueName + " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 DATE)");
        createCDC(newConnection, "CREATE CDC " + generateUniqueName2 + " ON " + generateUniqueName, null);
        String streamName = getStreamName(newConnection, generateUniqueName, generateUniqueName2);
        String str = "CREATE CDC " + generateUniqueName() + " ON " + generateUniqueName;
        try {
            createCDC(newConnection, str, null);
            Assert.fail("Only one CDC entity is allowed per table");
        } catch (SQLException e) {
            Assert.assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), e.getErrorCode());
            Assert.assertTrue(e.getMessage().contains(streamName));
        }
        new TaskRegionObserver.SelfHealingTask(TaskRegionEnvironment, 1800000L).run();
        try {
            createCDC(newConnection, str, null);
            Assert.fail("Only one CDC entity is allowed per table");
        } catch (SQLException e2) {
            Assert.assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), e2.getErrorCode());
            Assert.assertTrue(e2.getMessage().contains(streamName));
        }
        dropCDC(newConnection, generateUniqueName2, generateUniqueName);
        assertStreamStatus(newConnection, generateUniqueName, streamName, CDCUtil.CdcStreamStatus.DISABLED);
        String generateUniqueName3 = generateUniqueName();
        createCDC(newConnection, "CREATE CDC " + generateUniqueName3 + " ON " + generateUniqueName, null);
        assertStreamStatus(newConnection, generateUniqueName, getStreamName(newConnection, generateUniqueName, generateUniqueName3), CDCUtil.CdcStreamStatus.ENABLING);
    }

    @Test
    public void testPartitionMetadataTableWithSingleRegionSplits() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        createTableAndEnableCDC(newConnection, generateUniqueName);
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("m"));
        ResultSet executeQuery = newConnection.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + generateUniqueName + "'");
        PartitionMetadata partitionMetadata = null;
        ArrayList arrayList = new ArrayList();
        while (executeQuery.next()) {
            PartitionMetadata partitionMetadata2 = new PartitionMetadata(executeQuery);
            if (partitionMetadata2.endTime.longValue() > 0) {
                partitionMetadata = partitionMetadata2;
            } else {
                arrayList.add(partitionMetadata2);
            }
        }
        Assert.assertNotNull(partitionMetadata);
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).startTime, partitionMetadata.endTime);
        Assert.assertEquals(((PartitionMetadata) arrayList.get(1)).startTime, partitionMetadata.endTime);
        Assert.assertEquals(partitionMetadata.partitionId, ((PartitionMetadata) arrayList.get(0)).parentPartitionId);
        Assert.assertEquals(partitionMetadata.partitionId, ((PartitionMetadata) arrayList.get(1)).parentPartitionId);
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata3 -> {
            return partitionMetadata3.startKey == null && partitionMetadata3.endKey != null && partitionMetadata3.endKey[0] == 109;
        }));
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata4 -> {
            return partitionMetadata4.endKey == null && partitionMetadata4.startKey != null && partitionMetadata4.startKey[0] == 109;
        }));
    }

    @Test
    public void testPartitionMetadataFirstRegionSplits() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        createTableAndEnableCDC(newConnection, generateUniqueName);
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("l"));
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes(TestUtil.D_VALUE));
        ResultSet executeQuery = newConnection.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + generateUniqueName + "'");
        PartitionMetadata partitionMetadata = null;
        PartitionMetadata partitionMetadata2 = null;
        PartitionMetadata partitionMetadata3 = null;
        ArrayList arrayList = new ArrayList();
        while (executeQuery.next()) {
            PartitionMetadata partitionMetadata4 = new PartitionMetadata(executeQuery);
            if (partitionMetadata4.endTime.longValue() > 0) {
                if (partitionMetadata4.startKey == null && partitionMetadata4.endKey == null) {
                    partitionMetadata = partitionMetadata4;
                } else {
                    partitionMetadata2 = partitionMetadata4;
                }
            } else if (partitionMetadata4.endKey == null) {
                partitionMetadata3 = partitionMetadata4;
            } else {
                arrayList.add(partitionMetadata4);
            }
        }
        Assert.assertNotNull(partitionMetadata);
        Assert.assertNotNull(partitionMetadata3);
        Assert.assertNotNull(partitionMetadata2);
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).startTime, partitionMetadata2.endTime);
        Assert.assertEquals(((PartitionMetadata) arrayList.get(1)).startTime, partitionMetadata2.endTime);
        Assert.assertEquals(partitionMetadata2.partitionId, ((PartitionMetadata) arrayList.get(0)).parentPartitionId);
        Assert.assertEquals(partitionMetadata2.partitionId, ((PartitionMetadata) arrayList.get(1)).parentPartitionId);
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata5 -> {
            return partitionMetadata5.startKey == null && partitionMetadata5.endKey != null && partitionMetadata5.endKey[0] == 100;
        }));
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata6 -> {
            return partitionMetadata6.startKey != null && partitionMetadata6.startKey[0] == 100 && partitionMetadata6.endKey[0] == 108;
        }));
    }

    @Test
    public void testPartitionMetadataLastRegionSplits() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        createTableAndEnableCDC(newConnection, generateUniqueName);
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("l"));
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("q"));
        ResultSet executeQuery = newConnection.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + generateUniqueName + "'");
        PartitionMetadata partitionMetadata = null;
        PartitionMetadata partitionMetadata2 = null;
        PartitionMetadata partitionMetadata3 = null;
        ArrayList arrayList = new ArrayList();
        while (executeQuery.next()) {
            PartitionMetadata partitionMetadata4 = new PartitionMetadata(executeQuery);
            if (partitionMetadata4.endTime.longValue() > 0) {
                if (partitionMetadata4.startKey == null && partitionMetadata4.endKey == null) {
                    partitionMetadata = partitionMetadata4;
                } else {
                    partitionMetadata2 = partitionMetadata4;
                }
            } else if (partitionMetadata4.startKey == null) {
                partitionMetadata3 = partitionMetadata4;
            } else {
                arrayList.add(partitionMetadata4);
            }
        }
        Assert.assertNotNull(partitionMetadata);
        Assert.assertNotNull(partitionMetadata3);
        Assert.assertNotNull(partitionMetadata2);
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).startTime, partitionMetadata2.endTime);
        Assert.assertEquals(((PartitionMetadata) arrayList.get(1)).startTime, partitionMetadata2.endTime);
        Assert.assertEquals(partitionMetadata2.partitionId, ((PartitionMetadata) arrayList.get(0)).parentPartitionId);
        Assert.assertEquals(partitionMetadata2.partitionId, ((PartitionMetadata) arrayList.get(1)).parentPartitionId);
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata5 -> {
            return partitionMetadata5.startKey[0] == 108 && partitionMetadata5.endKey[0] == 113;
        }));
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata6 -> {
            return partitionMetadata6.endKey == null && partitionMetadata6.startKey != null && partitionMetadata6.startKey[0] == 113;
        }));
    }

    @Test
    public void testPartitionMetadataMiddleRegionSplits() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        createTableAndEnableCDC(newConnection, generateUniqueName);
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes(TestUtil.D_VALUE));
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("q"));
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("j"));
        ResultSet executeQuery = newConnection.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + generateUniqueName + "'");
        PartitionMetadata partitionMetadata = null;
        ArrayList arrayList = new ArrayList();
        while (executeQuery.next()) {
            PartitionMetadata partitionMetadata2 = new PartitionMetadata(executeQuery);
            if (partitionMetadata2.startKey != null && partitionMetadata2.endKey != null) {
                if (partitionMetadata2.endTime.longValue() > 0) {
                    partitionMetadata = partitionMetadata2;
                } else {
                    arrayList.add(partitionMetadata2);
                }
            }
        }
        Assert.assertNotNull(partitionMetadata);
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).startTime, partitionMetadata.endTime);
        Assert.assertEquals(((PartitionMetadata) arrayList.get(1)).startTime, partitionMetadata.endTime);
        Assert.assertEquals(partitionMetadata.partitionId, ((PartitionMetadata) arrayList.get(0)).parentPartitionId);
        Assert.assertEquals(partitionMetadata.partitionId, ((PartitionMetadata) arrayList.get(1)).parentPartitionId);
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata3 -> {
            return partitionMetadata3.startKey[0] == 100 && partitionMetadata3.endKey[0] == 106;
        }));
        Assert.assertTrue(arrayList.stream().anyMatch(partitionMetadata4 -> {
            return partitionMetadata4.startKey[0] == 106 && partitionMetadata4.endKey[0] == 113;
        }));
    }

    @Test
    public void testPartitionMetadataMergedRegionSplits() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        createTableAndEnableCDC(newConnection, generateUniqueName);
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes(TestUtil.D_VALUE));
        TestUtil.mergeTableRegions(newConnection, generateUniqueName, (List) TestUtil.getAllTableRegions(newConnection, generateUniqueName).stream().map((v0) -> {
            return v0.getRegion();
        }).map((v0) -> {
            return v0.getEncodedName();
        }).collect(Collectors.toList()));
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("l"));
        ResultSet executeQuery = newConnection.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + generateUniqueName + "'");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        while (executeQuery.next()) {
            PartitionMetadata partitionMetadata = new PartitionMetadata(executeQuery);
            if (partitionMetadata.startKey == null && partitionMetadata.endKey == null && partitionMetadata.parentPartitionId != null) {
                arrayList.add(partitionMetadata);
            }
            if (partitionMetadata.endTime.longValue() == 0) {
                arrayList2.add(partitionMetadata);
            }
        }
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(2L, arrayList2.size());
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).partitionId, ((PartitionMetadata) arrayList.get(1)).partitionId);
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).partitionId, ((PartitionMetadata) arrayList2.get(0)).parentPartitionId);
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).partitionId, ((PartitionMetadata) arrayList2.get(1)).parentPartitionId);
        Assert.assertEquals(((PartitionMetadata) arrayList2.get(0)).startTime, ((PartitionMetadata) arrayList2.get(1)).startTime);
        Assert.assertEquals(((PartitionMetadata) arrayList2.get(0)).startTime, ((PartitionMetadata) arrayList.get(0)).endTime);
        Assert.assertEquals(((PartitionMetadata) arrayList2.get(0)).startTime, ((PartitionMetadata) arrayList.get(1)).endTime);
    }

    @Test
    public void testPartitionMetadataSplitRegionsMerge() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        createTableAndEnableCDC(newConnection, generateUniqueName);
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("l"));
        TestUtil.mergeTableRegions(newConnection, generateUniqueName, (List) TestUtil.getAllTableRegions(newConnection, generateUniqueName).stream().map((v0) -> {
            return v0.getRegion();
        }).map((v0) -> {
            return v0.getEncodedName();
        }).collect(Collectors.toList()));
        ResultSet executeQuery = newConnection.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + generateUniqueName + "'");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        while (executeQuery.next()) {
            PartitionMetadata partitionMetadata = new PartitionMetadata(executeQuery);
            if (partitionMetadata.startKey == null && partitionMetadata.endKey == null && partitionMetadata.endTime.longValue() == 0) {
                arrayList2.add(partitionMetadata);
            }
            if (partitionMetadata.startKey != null || partitionMetadata.endKey != null) {
                arrayList.add(partitionMetadata);
            }
        }
        Assert.assertEquals(2L, arrayList2.size());
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(((PartitionMetadata) arrayList2.get(0)).startTime, ((PartitionMetadata) arrayList2.get(1)).startTime);
        Assert.assertEquals(((PartitionMetadata) arrayList2.get(0)).endTime, ((PartitionMetadata) arrayList2.get(1)).endTime);
        Assert.assertEquals(((PartitionMetadata) arrayList2.get(0)).partitionId, ((PartitionMetadata) arrayList2.get(1)).partitionId);
        Assert.assertTrue(arrayList2.stream().anyMatch(partitionMetadata2 -> {
            return Objects.equals(partitionMetadata2.parentPartitionId, ((PartitionMetadata) arrayList.get(0)).partitionId);
        }));
        Assert.assertTrue(arrayList2.stream().anyMatch(partitionMetadata3 -> {
            return Objects.equals(partitionMetadata3.parentPartitionId, ((PartitionMetadata) arrayList.get(1)).partitionId);
        }));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((PartitionMetadata) arrayList2.get(0)).startTime, ((PartitionMetadata) it.next()).endTime);
        }
    }

    @Test
    public void testPartitionMetadataMergedRegionsMerge() throws Exception {
        Connection newConnection = newConnection();
        String generateUniqueName = generateUniqueName();
        createTableAndEnableCDC(newConnection, generateUniqueName);
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("l"));
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes(TestUtil.D_VALUE));
        TestUtil.splitTable(newConnection, generateUniqueName, Bytes.toBytes("q"));
        List<HRegionLocation> allTableRegions = TestUtil.getAllTableRegions(newConnection, generateUniqueName);
        TestUtil.mergeTableRegions(newConnection, generateUniqueName, (List) allTableRegions.subList(0, 2).stream().map((v0) -> {
            return v0.getRegion();
        }).map((v0) -> {
            return v0.getEncodedName();
        }).collect(Collectors.toList()));
        TestUtil.mergeTableRegions(newConnection, generateUniqueName, (List) allTableRegions.subList(2, 4).stream().map((v0) -> {
            return v0.getRegion();
        }).map((v0) -> {
            return v0.getEncodedName();
        }).collect(Collectors.toList()));
        TestUtil.mergeTableRegions(newConnection, generateUniqueName, (List) TestUtil.getAllTableRegions(newConnection, generateUniqueName).stream().map((v0) -> {
            return v0.getRegion();
        }).map((v0) -> {
            return v0.getEncodedName();
        }).collect(Collectors.toList()));
        ResultSet executeQuery = newConnection.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + generateUniqueName + "'");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        while (executeQuery.next()) {
            PartitionMetadata partitionMetadata = new PartitionMetadata(executeQuery);
            if (partitionMetadata.endTime.longValue() == 0) {
                arrayList.add(partitionMetadata);
            } else if (partitionMetadata.startKey == null || partitionMetadata.endKey == null) {
                arrayList2.add(partitionMetadata);
            }
        }
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(9L, arrayList2.size());
        Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).startTime, ((PartitionMetadata) arrayList.get(1)).startTime);
        Collections.sort(arrayList2, Comparator.comparing(partitionMetadata2 -> {
            return partitionMetadata2.endTime;
        }));
        Iterator it = arrayList2.subList(arrayList2.size() - 4, arrayList2.size()).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((PartitionMetadata) arrayList.get(0)).startTime, ((PartitionMetadata) it.next()).endTime);
        }
    }

    private String getStreamName(Connection connection, String str, String str2) throws SQLException {
        return String.format(CDCUtil.CDC_STREAM_NAME_FORMAT, str, str2, Long.valueOf(CDCUtil.getCDCCreationTimestamp(((PhoenixConnection) connection.unwrap(PhoenixConnection.class)).getTableNoCache(str))));
    }

    private void assertStreamStatus(Connection connection, String str, String str2, CDCUtil.CdcStreamStatus cdcStreamStatus) throws SQLException {
        ResultSet executeQuery = connection.createStatement().executeQuery("SELECT STREAM_STATUS FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" + str + "' AND STREAM_NAME='" + str2 + "'");
        Assert.assertTrue(executeQuery.next());
        Assert.assertEquals(cdcStreamStatus.getSerializedValue(), executeQuery.getString(1));
    }

    private void assertPartitionMetadata(Connection connection, String str, String str2) throws SQLException {
        String format = String.format(CDCUtil.CDC_STREAM_NAME_FORMAT, str, str2, Long.valueOf(CDCUtil.getCDCCreationTimestamp(((PhoenixConnection) connection.unwrap(PhoenixConnection.class)).getTableNoCache(str))));
        Iterator it = ((PhoenixConnection) connection.unwrap(PhoenixConnection.class)).getQueryServices().getAllTableRegions(str.getBytes()).iterator();
        while (it.hasNext()) {
            HRegionInfo regionInfo = ((HRegionLocation) it.next()).getRegionInfo();
            PreparedStatement prepareStatement = connection.prepareStatement("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_ID= ?");
            prepareStatement.setString(1, str);
            prepareStatement.setString(2, format);
            prepareStatement.setString(3, regionInfo.getEncodedName());
            Assert.assertTrue(prepareStatement.executeQuery().next());
        }
    }

    private void createTableAndEnableCDC(Connection connection, String str) throws Exception {
        String generateUniqueName = generateUniqueName();
        connection.createStatement().execute("CREATE TABLE  " + str + " ( k VARCHAR PRIMARY KEY, v1 INTEGER, v2 VARCHAR)");
        createCDC(connection, "CREATE CDC " + generateUniqueName + " ON " + str, null);
        String streamName = getStreamName(connection, str, generateUniqueName);
        new TaskRegionObserver.SelfHealingTask(TaskRegionEnvironment, 1800000L).run();
        assertStreamStatus(connection, str, streamName, CDCUtil.CdcStreamStatus.ENABLED);
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('a', 1, 'foo')");
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('b', 2, 'bar')");
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('e', 3, 'alice')");
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('j', 4, 'bob')");
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('m', 5, 'cat')");
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('p', 6, 'cat')");
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('t', 7, 'cat')");
        connection.createStatement().execute("UPSERT INTO " + str + " VALUES ('z', 8, 'cat')");
    }
}
