package org.apache.phoenix.end2end;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ParallelStatsDisabledTest.class})
/* loaded from: input_file:org/apache/phoenix/end2end/MapReduceIT.class */
public class MapReduceIT extends ParallelStatsDisabledIT {
    private static final String STOCK_NAME = "STOCK_NAME";
    private static final String RECORDING_YEAR = "RECORDING_YEAR";
    private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
    private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s (  STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER  DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) SPLIT ON ('AA')";
    private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS %s (v1 VARCHAR) AS  SELECT * FROM %s WHERE RECORDING_YEAR = 2008";
    private static final String MAX_RECORDING = "MAX_RECORDING";
    private static final String CREATE_STOCK_STATS_TABLE = "CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL ,  MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME ))";
    private static final String UPSERT = "UPSERT into %s values (?, ?, ?)";
    private static final String TENANT_ID = "1234567890";

    /* loaded from: input_file:org/apache/phoenix/end2end/MapReduceIT$StockMapper.class */
    public static class StockMapper extends Mapper<NullWritable, StockWritable, Text, DoubleWritable> {
        private Text stock = new Text();
        private DoubleWritable price = new DoubleWritable();

        protected void map(NullWritable nullWritable, StockWritable stockWritable, Mapper<NullWritable, StockWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
            double[] recordings = stockWritable.getRecordings();
            String stockName = stockWritable.getStockName();
            double d = Double.MIN_VALUE;
            for (double d2 : recordings) {
                if (d < d2) {
                    d = d2;
                }
            }
            this.stock.set(stockName);
            this.price.set(d);
            context.write(this.stock, this.price);
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((NullWritable) obj, (StockWritable) obj2, (Mapper<NullWritable, StockWritable, Text, DoubleWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/phoenix/end2end/MapReduceIT$StockReducer.class */
    public static class StockReducer extends Reducer<Text, DoubleWritable, NullWritable, StockWritable> {
        protected void reduce(Text text, Iterable<DoubleWritable> iterable, Reducer<Text, DoubleWritable, NullWritable, StockWritable>.Context context) throws IOException, InterruptedException {
            double d = Double.MIN_VALUE;
            for (DoubleWritable doubleWritable : iterable) {
                if (d < doubleWritable.get()) {
                    d = doubleWritable.get();
                }
            }
            StockWritable stockWritable = new StockWritable();
            stockWritable.setStockName(text.toString());
            stockWritable.setMaxPrice(d);
            context.write(NullWritable.get(), stockWritable);
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<DoubleWritable>) iterable, (Reducer<Text, DoubleWritable, NullWritable, StockWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/phoenix/end2end/MapReduceIT$StockWritable.class */
    public static class StockWritable implements DBWritable {
        private String stockName;
        private double[] recordings;
        private double maxPrice;

        public void readFields(ResultSet resultSet) throws SQLException {
            this.stockName = resultSet.getString(MapReduceIT.STOCK_NAME);
            this.recordings = (double[]) resultSet.getArray(MapReduceIT.RECORDINGS_QUARTER).getArray();
        }

        public void write(PreparedStatement preparedStatement) throws SQLException {
            preparedStatement.setString(1, this.stockName);
            preparedStatement.setDouble(2, this.maxPrice);
        }

        public double[] getRecordings() {
            return this.recordings;
        }

        public String getStockName() {
            return this.stockName;
        }

        public void setStockName(String str) {
            this.stockName = str;
        }

        public void setMaxPrice(double d) {
            this.maxPrice = d;
        }
    }

    @Before
    public void setupTables() throws Exception {
    }

    @After
    public void clearCountersForScanGrouper() throws Exception {
        boolean isAnyStoreRefCountLeaked = isAnyStoreRefCountLeaked();
        TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();
        Assert.assertFalse("refCount leaked", isAnyStoreRefCountLeaked);
    }

    @Test
    public void testNoConditionsOnSelect() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl());
        Throwable th = null;
        try {
            createAndTestJob(connection, null, 91.04d, null);
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testConditionsOnSelect() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl());
        Throwable th = null;
        try {
            createAndTestJob(connection, "RECORDING_YEAR  < 2009", 81.04d, null);
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMapReduceWithVerySmallPhoenixQueryTimeout() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl());
        Throwable th = null;
        try {
            createPagedJobAndTestFailedJobDueToTimeOut(connection, "RECORDING_YEAR % 2 = 0", 82.89d, null, true);
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMapReduceWithVerySmallPhoenixQueryTimeoutWithTenantId() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl());
        Throwable th = null;
        try {
            createPagedJobAndTestFailedJobDueToTimeOut(connection, "RECORDING_YEAR % 2 = 0", 82.89d, TENANT_ID, true);
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMapReduceWithNormalPhoenixQueryTimeout() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl());
        Throwable th = null;
        try {
            createPagedJobAndTestFailedJobDueToTimeOut(connection, "RECORDING_YEAR % 2 = 0", 82.89d, null, false);
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMapReduceWithNormalPhoenixQueryTimeoutWithTenantId() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl());
        Throwable th = null;
        try {
            createPagedJobAndTestFailedJobDueToTimeOut(connection, "RECORDING_YEAR % 2 = 0", 81.04d, TENANT_ID, false);
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWithTenantId() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl());
        Throwable th = null;
        try {
            createAndTestJob(connection, null, 81.04d, TENANT_ID);
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    private void createAndTestJob(Connection connection, String str, double d, String str2) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
        String generateUniqueName = generateUniqueName();
        String generateUniqueName2 = generateUniqueName();
        connection.createStatement().execute(String.format(CREATE_STOCK_TABLE, generateUniqueName));
        connection.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, generateUniqueName2));
        connection.commit();
        Job job = Job.getInstance(getUtility().getConfiguration());
        if (str2 != null) {
            setInputForTenant(job, str2, generateUniqueName, str);
        } else {
            PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class, generateUniqueName, str, new String[]{STOCK_NAME, RECORDING_YEAR, "0.RECORDINGS_QUARTER"});
        }
        testJob(connection, job, generateUniqueName, generateUniqueName2, d);
    }

    private void createPagedJobAndTestFailedJobDueToTimeOut(Connection connection, String str, double d, String str2, boolean z) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
        String generateUniqueName = generateUniqueName();
        String generateUniqueName2 = generateUniqueName();
        connection.createStatement().execute(String.format(CREATE_STOCK_TABLE, generateUniqueName));
        connection.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, generateUniqueName2));
        connection.commit();
        Configuration configuration = new Configuration(getUtility().getConfiguration());
        if (z) {
            configuration.set("phoenix.server.page.size.ms", Integer.toString(0));
            configuration.set("phoenix.query.timeoutMs", Integer.toString(1));
        } else {
            configuration.set("phoenix.server.page.size.ms", Integer.toString(0));
            configuration.set("phoenix.query.timeoutMs", Integer.toString(600000));
        }
        Job job = Job.getInstance(configuration);
        if (str2 != null) {
            setInputForTenant(job, str2, generateUniqueName, str);
        } else {
            PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class, generateUniqueName, str, new String[]{STOCK_NAME, RECORDING_YEAR, "0.RECORDINGS_QUARTER"});
        }
        Assert.assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0L, TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
        upsertData(connection, generateUniqueName);
        job.getConfiguration().set("mapreduce.framework.name", "local");
        setOutput(job, generateUniqueName2);
        job.setMapperClass(StockMapper.class);
        job.setReducerClass(StockReducer.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(StockWritable.class);
        if (z) {
            try {
                Assert.assertFalse("Job should fail with QueryTimeout.", job.waitForCompletion(true));
                return;
            } catch (RuntimeException e) {
                Assert.assertTrue("Job execution failed with unexpected error.", e.getCause() instanceof SQLTimeoutException);
                return;
            }
        }
        Assert.assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));
        ResultSet executeQuery = DriverManager.getConnection(getUrl()).createStatement().executeQuery("SELECT * FROM " + generateUniqueName2);
        Assert.assertTrue("No data stored in stats table!", executeQuery.next());
        String string = executeQuery.getString(1);
        double d2 = executeQuery.getDouble(2);
        Assert.assertEquals("Got the wrong stock name!", "AAPL", string);
        Assert.assertEquals("Max value didn't match the expected!", d, d2, 0.0d);
        Assert.assertFalse("Should only have stored one row in stats table!", executeQuery.next());
        Assert.assertEquals("There should have been only be 1 call to getRegionBoundaries (corresponding to the driver code)", 1L, TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
    }

    private void setInputForTenant(Job job, String str, String str2, String str3) throws SQLException {
        Properties properties = new Properties();
        properties.setProperty("TenantId", TENANT_ID);
        Connection connection = DriverManager.getConnection(getUrl(), properties);
        Throwable th = null;
        try {
            PhoenixMapReduceUtil.setTenantId(job, str);
            String generateUniqueName = generateUniqueName();
            connection.createStatement().execute(String.format(CREATE_STOCK_VIEW, generateUniqueName, str2));
            connection.commit();
            PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class, generateUniqueName, str3, new String[]{STOCK_NAME, RECORDING_YEAR, "0.RECORDINGS_QUARTER"});
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    private void testJob(Connection connection, Job job, String str, String str2, double d) throws SQLException, InterruptedException, IOException, ClassNotFoundException {
        Assert.assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0L, TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
        upsertData(connection, str);
        job.getConfiguration().set("mapreduce.framework.name", "local");
        setOutput(job, str2);
        job.setMapperClass(StockMapper.class);
        job.setReducerClass(StockReducer.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(StockWritable.class);
        Assert.assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));
        ResultSet executeQuery = DriverManager.getConnection(getUrl()).createStatement().executeQuery("SELECT * FROM " + str2);
        Assert.assertTrue("No data stored in stats table!", executeQuery.next());
        String string = executeQuery.getString(1);
        double d2 = executeQuery.getDouble(2);
        Assert.assertEquals("Got the wrong stock name!", "AAPL", string);
        Assert.assertEquals("Max value didn't match the expected!", d, d2, 0.0d);
        Assert.assertFalse("Should only have stored one row in stats table!", executeQuery.next());
        Assert.assertEquals("There should have been only be 1 call to getRegionBoundaries (corresponding to the driver code)", 1L, TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
    }

    private void setOutput(Job job, String str) {
        Configuration configuration = job.getConfiguration();
        PhoenixConfigurationUtil.setOutputTableName(configuration, str);
        configuration.set("phoenix.upsert.stmt", "UPSERT into " + str + " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)");
        job.setOutputFormatClass(PhoenixOutputFormat.class);
    }

    private void upsertData(Connection connection, String str) throws SQLException {
        PreparedStatement prepareStatement = connection.prepareStatement(String.format(UPSERT, str));
        upsertData(prepareStatement, "AAPL", 2010, new Double[]{Double.valueOf(73.48d), Double.valueOf(82.25d), Double.valueOf(75.2d), Double.valueOf(82.89d)});
        upsertData(prepareStatement, "AAPL", 2009, new Double[]{Double.valueOf(85.88d), Double.valueOf(91.04d), Double.valueOf(88.5d), Double.valueOf(90.3d)});
        upsertData(prepareStatement, "AAPL", 2008, new Double[]{Double.valueOf(75.88d), Double.valueOf(81.04d), Double.valueOf(78.5d), Double.valueOf(80.3d)});
        upsertData(prepareStatement, "AAPL", 2007, new Double[]{Double.valueOf(73.88d), Double.valueOf(80.24d), Double.valueOf(78.9d), Double.valueOf(66.3d)});
        connection.commit();
    }

    private void upsertData(PreparedStatement preparedStatement, String str, int i, Double[] dArr) throws SQLException {
        int i2 = 1 + 1;
        preparedStatement.setString(1, str);
        int i3 = i2 + 1;
        preparedStatement.setInt(i2, i);
        int i4 = i3 + 1;
        preparedStatement.setArray(i3, new PhoenixArray.PrimitiveDoublePhoenixArray(PDouble.INSTANCE, dArr));
        preparedStatement.execute();
    }
}
