package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStreamTestBase;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.class */
public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream$FailingWALEntryFilter.class */
    public static class FailingWALEntryFilter implements WALEntryFilter {
        private int numFailures;
        private static int countFailures = 0;

        public FailingWALEntryFilter(int i) {
            this.numFailures = 0;
            this.numFailures = i;
        }

        public WAL.Entry filter(WAL.Entry entry) {
            if (countFailures == this.numFailures) {
                return entry;
            }
            countFailures++;
            throw new WALEntryFilterRetryableException("failing filter");
        }

        public static int numFailures() {
            return countFailures;
        }
    }

    @Before
    public void setUp() throws Exception {
        initWAL();
    }

    @Test
    public void testAppendsWithRolls() throws Exception {
        appendToLogAndSync();
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            Assert.assertTrue(wALEntryStream.hasNext());
            WAL.Entry peek = wALEntryStream.peek();
            Assert.assertSame(peek, wALEntryStream.next());
            Assert.assertNotNull(peek);
            Assert.assertFalse(wALEntryStream.hasNext());
            Assert.assertNull(wALEntryStream.peek());
            Assert.assertNull(wALEntryStream.next());
            long position = wALEntryStream.getPosition();
            wALEntryStream.close();
            appendToLogAndSync();
            WALEntryStreamTestBase.WALEntryStreamWithRetries wALEntryStreamWithRetries = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, CONF, position, this.log, null, new MetricsSource("1"), "fake-wal-group-id");
            try {
                WAL.Entry next = wALEntryStreamWithRetries.next();
                Assert.assertNotEquals(position, wALEntryStreamWithRetries.getPosition());
                Assert.assertNotNull(next);
                long position2 = wALEntryStreamWithRetries.getPosition();
                wALEntryStreamWithRetries.close();
                appendToLogAndSync();
                this.log.rollWriter();
                appendToLogAndSync();
                wALEntryStreamWithRetries = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, CONF, position2, this.log, null, new MetricsSource("1"), "fake-wal-group-id");
                try {
                    WAL.Entry next2 = wALEntryStreamWithRetries.next();
                    Assert.assertNotEquals(position2, wALEntryStreamWithRetries.getPosition());
                    Assert.assertNotNull(next2);
                    WAL.Entry next3 = wALEntryStreamWithRetries.next();
                    Assert.assertNotEquals(position2, wALEntryStreamWithRetries.getPosition());
                    Assert.assertNotNull(next3);
                    Assert.assertFalse(wALEntryStreamWithRetries.hasNext());
                    wALEntryStreamWithRetries.getPosition();
                    wALEntryStreamWithRetries.close();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testLogrollWhileStreaming() throws Exception {
        appendToLog("1");
        appendToLog("2");
        WALEntryStreamTestBase.WALEntryStreamWithRetries wALEntryStreamWithRetries = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, CONF, 0L, this.log, null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            Assert.assertEquals("1", getRow(wALEntryStreamWithRetries.next()));
            appendToLog("3");
            this.log.rollWriter();
            appendToLog("4");
            Assert.assertEquals("2", getRow(wALEntryStreamWithRetries.next()));
            Assert.assertEquals(2L, getQueue().size());
            Assert.assertEquals("3", getRow(wALEntryStreamWithRetries.next()));
            Assert.assertEquals("4", getRow(wALEntryStreamWithRetries.next()));
            Assert.assertEquals(1L, getQueue().size());
            Assert.assertFalse(wALEntryStreamWithRetries.hasNext());
            wALEntryStreamWithRetries.close();
        } catch (Throwable th) {
            try {
                wALEntryStreamWithRetries.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testNewEntriesWhileStreaming() throws Exception {
        appendToLog("1");
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            wALEntryStream.next();
            appendToLog("2");
            appendToLog("3");
            Assert.assertFalse(wALEntryStream.hasNext());
            wALEntryStream.reset();
            Assert.assertEquals("2", getRow(wALEntryStream.next()));
            Assert.assertEquals("3", getRow(wALEntryStream.next()));
            Assert.assertFalse(wALEntryStream.hasNext());
            wALEntryStream.close();
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testResumeStreamingFromPosition() throws Exception {
        appendToLog("1");
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            wALEntryStream.next();
            appendToLog("2");
            appendToLog("3");
            long position = wALEntryStream.getPosition();
            wALEntryStream.close();
            wALEntryStream = new WALEntryStream(this.logQueue, CONF, position, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
            try {
                Assert.assertEquals("2", getRow(wALEntryStream.next()));
                Assert.assertEquals("3", getRow(wALEntryStream.next()));
                Assert.assertFalse(wALEntryStream.hasNext());
                Assert.assertEquals(1L, getQueue().size());
                wALEntryStream.close();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testPosition() throws Exception {
        appendEntriesToLogAndSync(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            wALEntryStream.next();
            long position = wALEntryStream.getPosition();
            wALEntryStream.close();
            wALEntryStream = new WALEntryStream(this.logQueue, CONF, position, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
            try {
                Assert.assertNotNull(wALEntryStream.next());
                Assert.assertNotNull(wALEntryStream.next());
                Assert.assertFalse(wALEntryStream.hasNext());
                wALEntryStream.close();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testEmptyStream() throws Exception {
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            Assert.assertFalse(wALEntryStream.hasNext());
            wALEntryStream.close();
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testWALKeySerialization() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", Bytes.toBytes("foo-value"));
        hashMap.put("bar", Bytes.toBytes("bar-value"));
        WALKeyImpl wALKeyImpl = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), new ArrayList(), 0L, 0L, this.mvcc, scopes, hashMap);
        Assert.assertEquals(hashMap, wALKeyImpl.getExtendedAttributes());
        WALProtos.WALKey build = wALKeyImpl.getBuilder(WALCellCodec.getNoneCompressor()).build();
        WALKeyImpl wALKeyImpl2 = new WALKeyImpl();
        wALKeyImpl2.readFieldsFromPb(build, WALCellCodec.getNoneUncompressor());
        Assert.assertEquals(wALKeyImpl, wALKeyImpl2);
        Assert.assertEquals(wALKeyImpl.getExtendedAttributes().keySet(), wALKeyImpl2.getExtendedAttributes().keySet());
        for (Map.Entry entry : wALKeyImpl2.getExtendedAttributes().entrySet()) {
            Assert.assertArrayEquals(wALKeyImpl.getExtendedAttribute((String) entry.getKey()), (byte[]) entry.getValue());
        }
        Assert.assertEquals(wALKeyImpl.getReplicationScopes(), wALKeyImpl2.getReplicationScopes());
    }

    private ReplicationSource mockReplicationSource(boolean z, Configuration configuration) {
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
        Mockito.when(Long.valueOf(replicationSourceManager.getTotalBufferLimit())).thenReturn(268435456L);
        Server server = (Server) Mockito.mock(Server.class);
        ReplicationSource replicationSource = (ReplicationSource) Mockito.mock(ReplicationSource.class);
        Mockito.when(replicationSource.getSourceManager()).thenReturn(replicationSourceManager);
        Mockito.when(replicationSource.getSourceMetrics()).thenReturn(new MetricsSource("1"));
        Mockito.when(replicationSource.getWALFileLengthProvider()).thenReturn(this.log);
        Mockito.when(replicationSource.getServer()).thenReturn(server);
        Mockito.when(Boolean.valueOf(replicationSource.isRecovered())).thenReturn(Boolean.valueOf(z));
        Mockito.when(replicationSourceManager.getGlobalMetrics()).thenReturn((MetricsReplicationGlobalSourceSource) Mockito.mock(MetricsReplicationGlobalSourceSource.class));
        return replicationSource;
    }

    private ReplicationSourceWALReader createReader(boolean z, Configuration configuration) {
        ReplicationSource mockReplicationSource = mockReplicationSource(z, configuration);
        Mockito.when(Boolean.valueOf(mockReplicationSource.isPeerEnabled())).thenReturn(true);
        ReplicationSourceWALReader replicationSourceWALReader = new ReplicationSourceWALReader(fs, configuration, this.logQueue, 0L, getDummyFilter(), mockReplicationSource, "fake-wal-group-id");
        replicationSourceWALReader.start();
        return replicationSourceWALReader;
    }

    private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int i, Configuration configuration) {
        ReplicationSource mockReplicationSource = mockReplicationSource(false, configuration);
        Mockito.when(Boolean.valueOf(mockReplicationSource.isPeerEnabled())).thenReturn(true);
        ReplicationSourceWALReader replicationSourceWALReader = new ReplicationSourceWALReader(fs, configuration, this.logQueue, 0L, getIntermittentFailingFilter(i), mockReplicationSource, "fake-wal-group-id");
        replicationSourceWALReader.start();
        return replicationSourceWALReader;
    }

    @Test
    public void testReplicationSourceWALReader() throws Exception {
        appendEntriesToLogAndSync(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            wALEntryStream.next();
            wALEntryStream.next();
            wALEntryStream.next();
            long position = wALEntryStream.getPosition();
            wALEntryStream.close();
            Path peek = getQueue().peek();
            ReplicationSourceWALReader createReader = createReader(false, CONF);
            WALEntryBatch take = createReader.take();
            Assert.assertNotNull(take);
            Assert.assertEquals(3L, take.getWalEntries().size());
            Assert.assertEquals(position, take.getLastWalPosition());
            Assert.assertEquals(peek, take.getLastWalPath());
            Assert.assertEquals(3L, take.getNbRowKeys());
            appendToLog("foo");
            WALEntryBatch take2 = createReader.take();
            Assert.assertEquals(1L, take2.getNbEntries());
            Assert.assertEquals("foo", getRow((WAL.Entry) take2.getWalEntries().get(0)));
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
        appendEntriesToLogAndSync(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            wALEntryStream.next();
            wALEntryStream.next();
            wALEntryStream.next();
            long position = wALEntryStream.getPosition();
            wALEntryStream.close();
            Path peek = getQueue().peek();
            WALEntryBatch take = createReaderWithBadReplicationFilter(5, CONF).take();
            Assert.assertEquals(5, FailingWALEntryFilter.numFailures());
            Assert.assertNotNull(take);
            Assert.assertEquals(3L, take.getWalEntries().size());
            Assert.assertEquals(position, take.getLastWalPosition());
            Assert.assertEquals(peek, take.getLastWalPath());
            Assert.assertEquals(3L, take.getNbRowKeys());
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testReplicationSourceWALReaderRecovered() throws Exception {
        appendEntriesToLogAndSync(10);
        Path peek = getQueue().peek();
        this.log.rollWriter();
        appendEntriesToLogAndSync(5);
        this.log.shutdown();
        Configuration configuration = new Configuration(CONF);
        configuration.setInt("replication.source.nb.capacity", 10);
        ReplicationSourceWALReader createReader = createReader(true, configuration);
        WALEntryBatch take = createReader.take();
        Assert.assertEquals(peek, take.getLastWalPath());
        Assert.assertEquals(10L, take.getNbEntries());
        Assert.assertFalse(take.isEndOfFile());
        WALEntryBatch take2 = createReader.take();
        Assert.assertEquals(peek, take2.getLastWalPath());
        Assert.assertEquals(0L, take2.getNbEntries());
        Assert.assertTrue(take2.isEndOfFile());
        Path peek2 = getQueue().peek();
        WALEntryBatch take3 = createReader.take();
        Assert.assertEquals(peek2, take3.getLastWalPath());
        Assert.assertEquals(5L, take3.getNbEntries());
        Assert.assertTrue(take3.isEndOfFile());
        Assert.assertSame(WALEntryBatch.NO_MORE_DATA, createReader.take());
    }

    @Test
    public void testReplicationSourceWALReaderWrongPosition() throws Exception {
        appendEntriesToLogAndSync(1);
        final Path peek = getQueue().peek();
        this.log.rollWriter();
        appendEntriesToLogAndSync(20);
        TEST_UTIL.waitFor(5000L, new Waiter.ExplainingPredicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestBasicWALEntryStream.1
            public boolean evaluate() throws Exception {
                return WALEntryStreamTestBase.fs.getFileStatus(peek).getLen() > 0 && TestBasicWALEntryStream.this.log.getInflightWALCloseCount() == 0;
            }

            public String explainFailure() throws Exception {
                return peek + " has not been closed yet";
            }
        });
        ReplicationSourceWALReader createReader = createReader(false, CONF);
        WALEntryBatch take = createReader.take();
        Assert.assertEquals(peek, take.getLastWalPath());
        long len = fs.getFileStatus(peek).getLen();
        Assert.assertTrue("Position " + take.getLastWalPosition() + " is out of range, file length is " + len, take.getLastWalPosition() <= len);
        Assert.assertEquals(1L, take.getNbEntries());
        Assert.assertTrue(take.isEndOfFile());
        Path peek2 = getQueue().peek();
        WALEntryBatch take2 = createReader.take();
        Assert.assertEquals(peek2, take2.getLastWalPath());
        Assert.assertEquals(20L, take2.getNbEntries());
        Assert.assertFalse(take2.isEndOfFile());
        this.log.rollWriter();
        appendEntriesToLogAndSync(10);
        WALEntryBatch take3 = createReader.take();
        Assert.assertEquals(peek2, take3.getLastWalPath());
        Assert.assertEquals(0L, take3.getNbEntries());
        Assert.assertTrue(take3.isEndOfFile());
        Path peek3 = getQueue().peek();
        WALEntryBatch take4 = createReader.take();
        Assert.assertEquals(peek3, take4.getLastWalPath());
        Assert.assertEquals(10L, take4.getNbEntries());
        Assert.assertFalse(take4.isEndOfFile());
    }

    @Test
    public void testReplicationSourceWALReaderDisabled() throws IOException, InterruptedException, ExecutionException {
        appendEntriesToLogAndSync(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            wALEntryStream.next();
            wALEntryStream.next();
            wALEntryStream.next();
            long position = wALEntryStream.getPosition();
            wALEntryStream.close();
            Path peek = getQueue().peek();
            ReplicationSource mockReplicationSource = mockReplicationSource(false, CONF);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Mockito.when(Boolean.valueOf(mockReplicationSource.isPeerEnabled())).then(invocationOnMock -> {
                atomicInteger.incrementAndGet();
                return Boolean.valueOf(atomicBoolean.get());
            });
            ReplicationSourceWALReader replicationSourceWALReader = new ReplicationSourceWALReader(fs, CONF, this.logQueue, 0L, getDummyFilter(), mockReplicationSource, "fake-wal-group-id");
            replicationSourceWALReader.start();
            ForkJoinTask submit = ForkJoinPool.commonPool().submit(() -> {
                return replicationSourceWALReader.take();
            });
            TEST_UTIL.waitFor(30000L, () -> {
                return atomicInteger.get() >= 5;
            });
            Assert.assertFalse(submit.isDone());
            atomicBoolean.set(true);
            WALEntryBatch wALEntryBatch = (WALEntryBatch) submit.get();
            Assert.assertNotNull(wALEntryBatch);
            Assert.assertEquals(3L, wALEntryBatch.getWalEntries().size());
            Assert.assertEquals(position, wALEntryBatch.getLastWalPosition());
            Assert.assertEquals(peek, wALEntryBatch.getLastWalPath());
            Assert.assertEquals(3L, wALEntryBatch.getNbRowKeys());
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private String getRow(WAL.Entry entry) {
        Cell cell = (Cell) entry.getEdit().getCells().get(0);
        return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
    }

    private void appendToLog(String str) throws IOException {
        this.log.sync(this.log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), this.mvcc, scopes), getWALEdit(str)));
    }

    private void appendEntriesToLogAndSync(int i) throws IOException {
        long j = -1;
        for (int i2 = 0; i2 < i; i2++) {
            j = appendToLog(1);
        }
        this.log.sync(j);
    }

    private WALEdit getWALEdit(String str) {
        WALEdit wALEdit = new WALEdit();
        wALEdit.add(new KeyValue(Bytes.toBytes(str), family, qualifier, EnvironmentEdgeManager.currentTime(), qualifier));
        return wALEdit;
    }

    private WALEntryFilter getDummyFilter() {
        return new WALEntryFilter() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestBasicWALEntryStream.2
            public WAL.Entry filter(WAL.Entry entry) {
                return entry;
            }
        };
    }

    private WALEntryFilter getIntermittentFailingFilter(int i) {
        return new FailingWALEntryFilter(i);
    }

    @Test
    public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
        appendToLog("1");
        appendToLog("2");
        long asLong = this.log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
        AtomicLong atomicLong = new AtomicLong(asLong - 1);
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, path -> {
            return OptionalLong.of(atomicLong.get());
        }, (ServerName) null, new MetricsSource("1"), "fake-wal-group-id");
        try {
            Assert.assertTrue(wALEntryStream.hasNext());
            Assert.assertNotNull(wALEntryStream.next());
            Assert.assertFalse(wALEntryStream.hasNext());
            Thread.sleep(1000L);
            wALEntryStream.reset();
            Assert.assertFalse(wALEntryStream.hasNext());
            atomicLong.set(asLong);
            wALEntryStream.reset();
            Assert.assertTrue(wALEntryStream.hasNext());
            Assert.assertNotNull(wALEntryStream.next());
            Assert.assertFalse(wALEntryStream.hasNext());
            wALEntryStream.close();
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testEOFExceptionForRecoveredQueue() throws Exception {
        Path path = new Path("emptyLog");
        fs.create(path).close();
        Assert.assertEquals(0L, fs.getFileStatus(path).getLen());
        Configuration configuration = new Configuration(CONF);
        configuration.setInt("replication.source.maxretriesmultiplier", 1);
        configuration.setBoolean("replication.source.eof.autorecovery", true);
        configuration.setInt("replication.source.nb.batches", 10);
        ReplicationSource mockReplicationSource = mockReplicationSource(true, configuration);
        Mockito.when(Boolean.valueOf(mockReplicationSource.isPeerEnabled())).thenReturn(true);
        MetricsSource metricsSource = (MetricsSource) Mockito.mock(MetricsSource.class);
        ((MetricsSource) Mockito.doNothing().when(metricsSource)).incrSizeOfLogQueue();
        ((MetricsSource) Mockito.doNothing().when(metricsSource)).decrSizeOfLogQueue();
        ReplicationSourceLogQueue replicationSourceLogQueue = new ReplicationSourceLogQueue(configuration, metricsSource, mockReplicationSource);
        replicationSourceLogQueue.enqueueLog(path, "fake-wal-group-id");
        new ReplicationSourceWALReader(fs, configuration, replicationSourceLogQueue, 0L, getDummyFilter(), mockReplicationSource, "fake-wal-group-id").run();
        Assert.assertEquals(0L, replicationSourceLogQueue.getQueueSize("fake-wal-group-id"));
    }

    @Test
    public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
        Configuration configuration = new Configuration(CONF);
        MetricsSource metricsSource = (MetricsSource) Mockito.mock(MetricsSource.class);
        ReplicationSourceInterface mockReplicationSource = mockReplicationSource(true, configuration);
        ReplicationSourceLogQueue replicationSourceLogQueue = new ReplicationSourceLogQueue(configuration, metricsSource, mockReplicationSource);
        Path path = new Path(fs.getHomeDirectory(), "log.2");
        fs.create(path).close();
        Assert.assertEquals(0L, fs.getFileStatus(path).getLen());
        replicationSourceLogQueue.enqueueLog(path, "fake-wal-group-id");
        Path path2 = new Path(fs.getHomeDirectory(), "log.1");
        appendEntries(WALFactory.createWALWriter(fs, path2, TEST_UTIL.getConfiguration()), 3);
        replicationSourceLogQueue.enqueueLog(path2, "fake-wal-group-id");
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getOldSources()).thenReturn(new ArrayList(Arrays.asList(mockReplicationSource)));
        Mockito.when(Boolean.valueOf(mockReplicationSource.isPeerEnabled())).thenReturn(true);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
        configuration.setInt("replication.source.maxretriesmultiplier", 1);
        configuration.setBoolean("replication.source.eof.autorecovery", true);
        configuration.setInt("replication.source.nb.batches", 10);
        ReplicationSourceWALReader replicationSourceWALReader = new ReplicationSourceWALReader(fs, configuration, replicationSourceLogQueue, 0L, getDummyFilter(), mockReplicationSource, "fake-wal-group-id");
        Assert.assertEquals("Initial log queue size is not correct", 2L, replicationSourceLogQueue.getQueueSize("fake-wal-group-id"));
        replicationSourceWALReader.run();
        Assert.assertEquals(0L, replicationSourceLogQueue.getQueueSize("fake-wal-group-id"));
        Assert.assertEquals("Log queue should be empty", 0L, replicationSourceLogQueue.getQueueSize("fake-wal-group-id"));
    }

    private PriorityBlockingQueue<Path> getQueue() {
        return this.logQueue.getQueue("fake-wal-group-id");
    }

    private void appendEntries(WALProvider.Writer writer, int i) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            byte[] bytes = Bytes.toBytes(Integer.toString(i2));
            KeyValue keyValue = new KeyValue(bytes, bytes, bytes);
            WALEdit wALEdit = new WALEdit();
            wALEdit.add(keyValue);
            WALKeyImpl wALKeyImpl = new WALKeyImpl(bytes, TableName.valueOf(bytes), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID);
            new TreeMap(Bytes.BYTES_COMPARATOR).put(bytes, 1);
            writer.append(new WAL.Entry(wALKeyImpl, wALEdit));
            writer.sync(false);
        }
        writer.close();
    }

    @Test
    public void testSizeOfLogQueue() throws Exception {
        Assert.assertEquals(1L, this.logQueue.getMetrics().getSizeOfLogQueue());
        appendToLogAndSync();
        this.log.rollWriter();
        Assert.assertEquals(2L, this.logQueue.getMetrics().getSizeOfLogQueue());
        WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, CONF, 0L, this.log, (ServerName) null, this.logQueue.getMetrics(), "fake-wal-group-id");
        try {
            Assert.assertTrue(wALEntryStream.hasNext());
            Assert.assertNotNull(wALEntryStream.next());
            Assert.assertFalse(wALEntryStream.hasNext());
            wALEntryStream.close();
            Assert.assertEquals(1L, this.logQueue.getMetrics().getSizeOfLogQueue());
        } catch (Throwable th) {
            try {
                wALEntryStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCleanClosedWALs() throws Exception {
        WALEntryStreamTestBase.WALEntryStreamWithRetries wALEntryStreamWithRetries = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, CONF, 0L, this.log, null, this.logQueue.getMetrics(), "fake-wal-group-id");
        try {
            Assert.assertEquals(0L, this.logQueue.getMetrics().getUncleanlyClosedWALs());
            appendToLogAndSync();
            Assert.assertNotNull(wALEntryStreamWithRetries.next());
            this.log.rollWriter();
            appendToLogAndSync();
            Assert.assertNotNull(wALEntryStreamWithRetries.next());
            Assert.assertEquals(0L, this.logQueue.getMetrics().getUncleanlyClosedWALs());
            wALEntryStreamWithRetries.close();
        } catch (Throwable th) {
            try {
                wALEntryStreamWithRetries.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testEOFExceptionInOldWALsDirectory() throws Exception {
        Assert.assertEquals(1L, this.logQueue.getQueueSize("fake-wal-group-id"));
        AbstractFSWAL abstractFSWAL = this.log;
        Path currentFileName = abstractFSWAL.getCurrentFileName();
        this.log.rollWriter(true);
        Waiter.waitFor(CONF, 5000L, () -> {
            return abstractFSWAL.getInflightWALCloseCount() == 0;
        });
        Assert.assertEquals(2L, this.logQueue.getQueueSize("fake-wal-group-id"));
        Path findArchivedLog = AbstractFSWALProvider.findArchivedLog(currentFileName, CONF);
        Assert.assertNotNull(findArchivedLog);
        Assert.assertTrue(fs.exists(findArchivedLog));
        fs.truncate(findArchivedLog, 0L);
        Assert.assertEquals(0L, fs.getFileStatus(findArchivedLog).getLen());
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(Boolean.valueOf(((ReplicationSource) Mockito.mock(ReplicationSource.class)).isPeerEnabled())).thenReturn(true);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
        Configuration configuration = new Configuration(CONF);
        configuration.setInt("replication.source.maxretriesmultiplier", 1);
        configuration.setBoolean("replication.source.eof.autorecovery", true);
        createReader(false, configuration);
        Waiter.waitFor(configuration, 10000L, () -> {
            return this.logQueue.getQueueSize("fake-wal-group-id") == 1;
        });
    }
}
