package org.apache.hadoop.contrib.bkjournal;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-hdfs-bkjournal-2.10.1-ODI-javadoc.jar:test-classes/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.class
 */
/* loaded from: input_file:test-classes/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.class */
public class TestBookKeeperSpeculativeRead {
    private ZooKeeper zkc;
    private static BKJMUtil bkutil;
    private static final Log LOG = LogFactory.getLog(TestBookKeeperSpeculativeRead.class);
    private static int numLocalBookies = 1;
    private static List<BookieServer> bks = new ArrayList();

    @BeforeClass
    public static void setupBookkeeper() throws Exception {
        bkutil = new BKJMUtil(1);
        bkutil.start();
    }

    @AfterClass
    public static void teardownBookkeeper() throws Exception {
        bkutil.teardown();
        Iterator<BookieServer> it = bks.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    @Before
    public void setup() throws Exception {
        this.zkc = BKJMUtil.connectZooKeeper();
    }

    @After
    public void teardown() throws Exception {
        this.zkc.close();
    }

    private NamespaceInfo newNSInfo() {
        return new NamespaceInfo(new Random().nextInt(), "testCluster", "TestBPID", -1L);
    }

    @Test(timeout = 120000)
    public void testSpeculativeRead() throws Exception {
        for (int i = 1; i < 10; i++) {
            bks.add(bkutil.newBookie());
        }
        NamespaceInfo newNSInfo = newNSInfo();
        Configuration configuration = new Configuration();
        int i2 = numLocalBookies + 9;
        configuration.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, i2);
        configuration.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, i2);
        configuration.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, 100);
        configuration.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600);
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(configuration, BKJMUtil.createJournalURI("/hdfsjournal-specread"), newNSInfo);
        bookKeeperJournalManager.format(newNSInfo);
        EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 1000) {
                break;
            }
            FSEditLogOp noOpInstance = FSEditLogTestUtil.getNoOpInstance();
            noOpInstance.setTransactionId(j2);
            startLogSegment.write(noOpInstance);
            j = j2 + 1;
        }
        startLogSegment.close();
        bookKeeperJournalManager.finalizeLogSegment(1L, 1000L);
        ArrayList arrayList = new ArrayList();
        bookKeeperJournalManager.selectInputStreams(arrayList, 1L, true);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Iterator<BookieServer> it = bks.iterator();
        while (it.hasNext()) {
            sleepBookie(countDownLatch, it.next());
        }
        try {
            Assert.assertEquals(1000L, FSEditLogTestUtil.countTransactionsInStream((EditLogInputStream) arrayList.get(0)));
            ((EditLogInputStream) arrayList.get(0)).close();
            countDownLatch.countDown();
            bookKeeperJournalManager.close();
        } catch (Throwable th) {
            ((EditLogInputStream) arrayList.get(0)).close();
            countDownLatch.countDown();
            bookKeeperJournalManager.close();
            throw th;
        }
    }

    private void sleepBookie(final CountDownLatch countDownLatch, final BookieServer bookieServer) throws Exception {
        Thread thread = new Thread() { // from class: org.apache.hadoop.contrib.bkjournal.TestBookKeeperSpeculativeRead.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    bookieServer.suspendProcessing();
                    countDownLatch.await(2L, TimeUnit.MINUTES);
                    bookieServer.resumeProcessing();
                } catch (Exception e) {
                    TestBookKeeperSpeculativeRead.LOG.error("Error suspending bookie", e);
                }
            }
        };
        thread.setName("BookieServerSleeper-" + bookieServer.getBookie().getId());
        thread.start();
    }
}
