package org.apache.phoenix.end2end.index;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.phoenix.coprocessor.ReplicationSinkEndpoint;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({NeedsOwnMiniClusterTest.class})
/* loaded from: input_file:org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.class */
public class ReplicationWithWALAnnotationIT extends BaseTest {
    private static final long REPLICATION_WAIT_TIME_MILLIS = 10000;
    private static String url1;
    private static String url2;
    protected static Configuration conf2;
    protected static ZKWatcher zkw1;
    protected static ZKWatcher zkw2;
    protected static HBaseTestingUtility utility1;
    protected static HBaseTestingUtility utility2;
    protected static final int REPLICATION_RETRIES = 10;
    private static Map<String, String> props;
    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationWithWALAnnotationIT.class);
    private static final String SCHEMA_NAME = generateUniqueName();
    private static final String DATA_TABLE_NAME = generateUniqueName();
    private static final String INDEX_TABLE_NAME = "IDX_" + DATA_TABLE_NAME;
    private static final String TENANT_VIEW_NAME = generateUniqueName();
    private static final String TENANT_VIEW_INDEX_NAME = "IDX_" + TENANT_VIEW_NAME;
    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, DATA_TABLE_NAME);
    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, INDEX_TABLE_NAME);
    protected static Configuration conf1 = HBaseConfiguration.create();
    protected static final byte[] tableName = Bytes.toBytes("test");
    protected static final byte[] row = Bytes.toBytes("row");

    /* loaded from: input_file:org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT$TestCoprocessorForWALAnnotationAtSink.class */
    public static class TestCoprocessorForWALAnnotationAtSink implements RegionCoprocessor, RegionObserver {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> observerContext, MiniBatchOperationInProgress<Mutation> miniBatchOperationInProgress) throws IOException {
            String bytes = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString()));
            String bytes2 = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString()));
            String bytes3 = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString()));
            String bytes4 = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()));
            ReplicationWithWALAnnotationIT.LOGGER.info("TestCoprocessorForWALAnnotationAtSink preBatchMutate: tenantId: {}, schemaName: {}, logicalTableName: {}, tableType: {}", new Object[]{bytes, bytes2, bytes3, bytes4});
            if (bytes != null || !ReplicationWithWALAnnotationIT.SCHEMA_NAME.equals(bytes2) || !ReplicationWithWALAnnotationIT.DATA_TABLE_NAME.equals(bytes3) || !PTableType.TABLE.getValue().toString().equals(bytes4)) {
                throw new IOException("Replication Sink mutation attributes are not matching. Abort the mutation.");
            }
        }
    }

    /* loaded from: input_file:org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT$TestTenantCoprocessorForWALAnnotationAtSink.class */
    public static class TestTenantCoprocessorForWALAnnotationAtSink implements RegionCoprocessor, RegionObserver {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> observerContext, MiniBatchOperationInProgress<Mutation> miniBatchOperationInProgress) throws IOException {
            String bytes = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString()));
            String bytes2 = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString()));
            String bytes3 = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString()));
            String bytes4 = Bytes.toString(((Mutation) miniBatchOperationInProgress.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()));
            ReplicationWithWALAnnotationIT.LOGGER.info("TestCoprocessorForWALAnnotationAtSink preBatchMutate: tenantId: {}, schemaName: {}, logicalTableName: {}, tableType: {}", new Object[]{bytes, bytes2, bytes3, bytes4});
            if (!"tenant01".equals(bytes) || !"".equals(bytes2) || !ReplicationWithWALAnnotationIT.TENANT_VIEW_NAME.equals(bytes3) || !PTableType.VIEW.getValue().toString().equals(bytes4)) {
                throw new IOException("Replication Sink mutation attributes are not matching. Abort the mutation.");
            }
        }
    }

    @BeforeClass
    public static synchronized void setUpBeforeClass() throws Exception {
        setupConfigsAndStartCluster();
        props = Maps.newHashMapWithExpectedSize(3);
        props.put("phoenix.index.mutableBatchSizeThreshold", Integer.toString(2));
        props.put("phoenix.schema.dropMetaData", Boolean.toString(true));
        url1 = getLocalClusterUrl(utility1);
        url2 = getLocalClusterUrl(utility2);
    }

    @AfterClass
    public static void afterClass() throws Exception {
        try {
            utility1.shutdownMiniCluster();
            utility2.shutdownMiniCluster();
        } finally {
            ServerMetadataCacheTestImpl.resetCache();
        }
    }

    private static void setupConfigsAndStartCluster() throws Exception {
        conf1.setInt("zookeeper.recovery.retry", 1);
        conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
        conf1.setBoolean("dfs.support.append", true);
        conf1.setLong("hbase.server.thread.wakefrequency", 100L);
        conf1.setInt("replication.stats.thread.period.seconds", 5);
        conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        conf1.setStrings("hbase.coprocessor.regionserver.classes", new String[]{ReplicationSinkEndpoint.class.getName()});
        conf1.setBoolean("phoenix.append.metadata.to.wal", true);
        conf1.set("zookeeper.znode.parent", "/1");
        setUpConfigForMiniCluster(conf1);
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        conf1 = utility1.getConfiguration();
        zkw1 = new ZKWatcher(conf1, "cluster1", (Abortable) null, true);
        Admin admin = ConnectionFactory.createConnection(conf1).getAdmin();
        conf2 = HBaseConfiguration.create(conf1);
        conf2.setInt("hbase.client.retries.number", 6);
        conf2.setBoolean("dfs.support.append", true);
        conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        utility2 = new HBaseTestingUtility(conf2);
        utility2.startMiniZKCluster();
        zkw2 = new ZKWatcher(conf2, "cluster2", (Abortable) null, true);
        utility1.startMiniCluster(2);
        utility2.startMiniCluster(2);
        admin.addReplicationPeer("1", ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build());
    }

    private boolean isReplicationSinkEndpointEnabled() {
        String[] split = VersionInfo.getVersion().split("\\.");
        int parseInt = Integer.parseInt(split[0]);
        int parseInt2 = Integer.parseInt(split[1]);
        int parseInt3 = Integer.parseInt(split[2].split("-")[0]);
        if (parseInt > 2) {
            return true;
        }
        if (parseInt < 2) {
            return false;
        }
        if (parseInt2 > 5) {
            return true;
        }
        if (parseInt2 < 4) {
            return false;
        }
        return parseInt2 == 4 ? parseInt3 >= 16 : parseInt3 >= 3;
    }

    @Test
    public void testReplicationWithWALExtendedAttributes() throws Exception {
        Assume.assumeTrue("Replication sink endpoint on hbase versions 2.4.16+ or 2.5.3+", isReplicationSinkEndpointEnabled());
        driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        Connection connection = getConnection(url1);
        connection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        connection.createStatement().execute(String.format("CREATE TABLE %s (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) REPLICATION_SCOPE = 1", DATA_TABLE_FULL_NAME));
        connection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        Assert.assertFalse(connection.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next());
        Assert.assertFalse(connection.createStatement().executeQuery("SELECT * FROM " + INDEX_TABLE_FULL_NAME).next());
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        Connection connection2 = getConnection(url2);
        connection2.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        connection2.createStatement().execute(String.format("CREATE TABLE %s (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) REPLICATION_SCOPE = 1", DATA_TABLE_FULL_NAME));
        connection2.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        Assert.assertFalse(connection2.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next());
        Assert.assertFalse(connection2.createStatement().executeQuery("SELECT * FROM " + INDEX_TABLE_FULL_NAME).next());
        TestUtil.addCoprocessor(connection2, DATA_TABLE_FULL_NAME, TestCoprocessorForWALAnnotationAtSink.class);
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        Connection connection3 = getConnection(url1);
        PreparedStatement prepareStatement = connection3.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
        prepareStatement.setString(1, "a1");
        prepareStatement.setString(2, "x1");
        prepareStatement.setString(3, "11");
        prepareStatement.execute();
        prepareStatement.setString(1, "a2");
        prepareStatement.setString(2, "x2");
        prepareStatement.setString(3, "12");
        prepareStatement.execute();
        connection3.commit();
        ResultSet executeQuery = connection3.createStatement().executeQuery("SELECT * FROM " + INDEX_TABLE_FULL_NAME);
        Assert.assertTrue(executeQuery.next());
        Assert.assertEquals("x1", executeQuery.getString(1));
        Assert.assertTrue(executeQuery.next());
        Assert.assertEquals("x2", executeQuery.getString(1));
        Assert.assertFalse(executeQuery.next());
        connection3.close();
        assertReplicatedData(DATA_TABLE_FULL_NAME);
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        getConnection(url1).createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        getConnection(url2).createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        destroyDriver(driver);
    }

    @Test
    public void testReplicationWithWALExtendedAttributesWithTenants() throws Exception {
        Assume.assumeTrue("Replication sink endpoint on hbase versions 2.4.16+ or 2.5.3+", isReplicationSinkEndpointEnabled());
        driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        Connection connection = getConnection(url1);
        connection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        connection.createStatement().execute(String.format("CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, k VARCHAR NOT NULL, v1 VARCHAR, v2 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, k)) REPLICATION_SCOPE = 1, MULTI_TENANT = true", DATA_TABLE_FULL_NAME));
        connection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        Connection tenantConnection = getTenantConnection(url1, "tenant01");
        tenantConnection.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s ", TENANT_VIEW_NAME, DATA_TABLE_FULL_NAME));
        tenantConnection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", TENANT_VIEW_INDEX_NAME, TENANT_VIEW_NAME));
        Assert.assertFalse(connection.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next());
        Assert.assertFalse(connection.createStatement().executeQuery("SELECT * FROM " + INDEX_TABLE_FULL_NAME).next());
        Assert.assertFalse(tenantConnection.createStatement().executeQuery("SELECT * FROM " + TENANT_VIEW_NAME).next());
        Assert.assertFalse(tenantConnection.createStatement().executeQuery("SELECT * FROM " + TENANT_VIEW_INDEX_NAME).next());
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        Connection connection2 = getConnection(url2);
        connection2.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        connection2.createStatement().execute(String.format("CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, k VARCHAR NOT NULL, v1 VARCHAR, v2 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, k)) REPLICATION_SCOPE = 1, MULTI_TENANT = true", DATA_TABLE_FULL_NAME));
        connection2.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        Connection tenantConnection2 = getTenantConnection(url2, "tenant01");
        tenantConnection2.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s ", TENANT_VIEW_NAME, DATA_TABLE_FULL_NAME));
        tenantConnection2.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", TENANT_VIEW_INDEX_NAME, TENANT_VIEW_NAME));
        Assert.assertFalse(connection2.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next());
        Assert.assertFalse(connection2.createStatement().executeQuery("SELECT * FROM " + INDEX_TABLE_FULL_NAME).next());
        Assert.assertFalse(tenantConnection2.createStatement().executeQuery("SELECT * FROM " + TENANT_VIEW_NAME).next());
        Assert.assertFalse(tenantConnection2.createStatement().executeQuery("SELECT * FROM " + TENANT_VIEW_INDEX_NAME).next());
        TestUtil.addCoprocessor(connection2, DATA_TABLE_FULL_NAME, TestTenantCoprocessorForWALAnnotationAtSink.class);
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        Connection tenantConnection3 = getTenantConnection(url1, "tenant01");
        PreparedStatement prepareStatement = tenantConnection3.prepareStatement("UPSERT INTO " + TENANT_VIEW_NAME + " VALUES(?,?,?)");
        prepareStatement.setString(1, "a1");
        prepareStatement.setString(2, "x1");
        prepareStatement.setString(3, "11");
        prepareStatement.execute();
        prepareStatement.setString(1, "a2");
        prepareStatement.setString(2, "x2");
        prepareStatement.setString(3, "12");
        prepareStatement.execute();
        tenantConnection3.commit();
        ResultSet executeQuery = tenantConnection3.createStatement().executeQuery("SELECT * FROM " + TENANT_VIEW_INDEX_NAME);
        Assert.assertTrue(executeQuery.next());
        Assert.assertEquals("x1", executeQuery.getString(1));
        Assert.assertTrue(executeQuery.next());
        Assert.assertEquals("x2", executeQuery.getString(1));
        Assert.assertFalse(executeQuery.next());
        tenantConnection3.close();
        assertReplicatedData(DATA_TABLE_FULL_NAME);
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        getConnection(url1).createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        destroyDriver(driver);
        driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        getConnection(url2).createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        destroyDriver(driver);
    }

    private void assertReplicatedData(String str) throws IOException, InterruptedException {
        LOGGER.info("Looking up tables in replication target");
        Table table = ConnectionFactory.createConnection(utility2.getConfiguration()).getTable(TableName.valueOf(str));
        for (int i = 0; i < 10; i++) {
            if (i >= 9) {
                Assert.fail("Waited too much time for put replication on table " + table.getDescriptor().getTableName());
            }
            if (ensureRows(table, 2)) {
                break;
            }
            LOGGER.info("Sleeping for 10000 for edits to get replicated");
            Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
        }
        table.close();
    }

    private boolean ensureRows(Table table, int i) throws IOException {
        Scan scan = new Scan();
        scan.setRaw(true);
        ResultScanner scanner = table.getScanner(scan);
        int i2 = 0;
        Iterator it = scanner.iterator();
        while (it.hasNext()) {
            LOGGER.info("got row: {}", (Result) it.next());
            i2++;
        }
        scanner.close();
        return i2 == i;
    }

    private static Connection getConnection(String str) throws Exception {
        return DriverManager.getConnection(str, PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
    }

    private static Connection getTenantConnection(String str, String str2) throws Exception {
        Properties deepCopy = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
        deepCopy.setProperty("TenantId", str2);
        return DriverManager.getConnection(str, deepCopy);
    }
}
