package org.apache.hadoop.hdfs;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/hadoop/hdfs/TestReconstructStripedFile.class */
public class TestReconstructStripedFile {
    public static final Logger LOG = LoggerFactory.getLogger(TestReconstructStripedFile.class);
    private ErasureCodingPolicy ecPolicy;
    private int dataBlkNum;
    private int parityBlkNum;
    private int cellSize;
    private int blockSize;
    private int groupSize;
    private int dnNum;
    private Configuration conf;
    private MiniDFSCluster cluster;
    private DistributedFileSystem fs;

    @Rule
    public TemporaryFolder baseDir = new TemporaryFolder();
    private Map<DatanodeID, Integer> dnMap = new HashMap();
    private final Random random = new Random();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/TestReconstructStripedFile$ReconstructionType.class */
    public enum ReconstructionType {
        DataOnly,
        ParityOnly,
        Any
    }

    public ErasureCodingPolicy getEcPolicy() {
        return StripedFileTestUtil.getDefaultECPolicy();
    }

    public boolean isValidationEnabled() {
        return false;
    }

    public int getPendingTimeout() {
        return 300;
    }

    public int getBlockSize() {
        return this.blockSize;
    }

    public MiniDFSCluster getCluster() {
        return this.cluster;
    }

    @Before
    public void setup() throws IOException {
        this.ecPolicy = getEcPolicy();
        this.dataBlkNum = this.ecPolicy.getNumDataUnits();
        this.parityBlkNum = this.ecPolicy.getNumParityUnits();
        this.cellSize = this.ecPolicy.getCellSize();
        this.blockSize = this.cellSize * 3;
        this.groupSize = this.dataBlkNum + this.parityBlkNum;
        this.dnNum = this.groupSize + this.parityBlkNum;
        this.conf = new Configuration();
        this.conf.setLong("dfs.blocksize", this.blockSize);
        this.conf.setInt("dfs.datanode.ec.reconstruction.stripedread.buffer.size", this.cellSize - 1);
        this.conf.setInt("dfs.namenode.redundancy.interval.seconds", 1);
        if (ErasureCodeNative.isNativeCodeLoaded()) {
            this.conf.set("io.erasurecode.codec.rs.rawcoders", "rs_native");
        }
        this.conf.setInt("dfs.namenode.reconstruction.pending.timeout-sec", getPendingTimeout());
        this.conf.setBoolean("dfs.datanode.ec.reconstruction.validation", isValidationEnabled());
        this.cluster = new MiniDFSCluster.Builder(this.conf, this.baseDir.getRoot()).numDataNodes(this.dnNum).build();
        this.cluster.waitActive();
        this.fs = this.cluster.getFileSystem();
        this.fs.enableErasureCodingPolicy(this.ecPolicy.getName());
        this.fs.getClient().setErasureCodingPolicy("/", this.ecPolicy.getName());
        ArrayList<DataNode> dataNodes = this.cluster.getDataNodes();
        for (int i = 0; i < this.dnNum; i++) {
            this.dnMap.put(dataNodes.get(i).getDatanodeId(), Integer.valueOf(i));
        }
    }

    @After
    public void tearDown() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    @Test(timeout = 120000)
    public void testRecoverOneParityBlock() throws Exception {
        assertFileBlocksReconstruction("/testRecoverOneParityBlock", ((this.dataBlkNum + 1) * this.blockSize) + (this.blockSize / 10), ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout = 120000)
    public void testRecoverOneParityBlock1() throws Exception {
        assertFileBlocksReconstruction("/testRecoverOneParityBlock1", this.cellSize + (this.cellSize / 10), ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout = 120000)
    public void testRecoverOneParityBlock2() throws Exception {
        assertFileBlocksReconstruction("/testRecoverOneParityBlock2", 1, ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout = 120000)
    public void testRecoverOneParityBlock3() throws Exception {
        assertFileBlocksReconstruction("/testRecoverOneParityBlock3", ((this.dataBlkNum - 1) * this.blockSize) + (this.blockSize / 10), ReconstructionType.ParityOnly, 1);
    }

    @Test(timeout = 120000)
    public void testRecoverAllParityBlocks() throws Exception {
        assertFileBlocksReconstruction("/testRecoverAllParityBlocks", (this.dataBlkNum * this.blockSize) + (this.blockSize / 10), ReconstructionType.ParityOnly, this.parityBlkNum);
    }

    @Test(timeout = 120000)
    public void testRecoverAllDataBlocks() throws Exception {
        assertFileBlocksReconstruction("/testRecoverAllDataBlocks", ((this.dataBlkNum + this.parityBlkNum) * this.blockSize) + (this.blockSize / 10), ReconstructionType.DataOnly, this.parityBlkNum);
    }

    @Test(timeout = 120000)
    public void testRecoverAllDataBlocks1() throws Exception {
        assertFileBlocksReconstruction("/testRecoverAllDataBlocks1", (this.parityBlkNum * this.blockSize) + (this.blockSize / 10), ReconstructionType.DataOnly, this.parityBlkNum);
    }

    @Test(timeout = 120000)
    public void testRecoverOneDataBlock() throws Exception {
        assertFileBlocksReconstruction("/testRecoverOneDataBlock", ((this.dataBlkNum + 1) * this.blockSize) + (this.blockSize / 10), ReconstructionType.DataOnly, 1);
    }

    @Test(timeout = 120000)
    public void testRecoverOneDataBlock1() throws Exception {
        assertFileBlocksReconstruction("/testRecoverOneDataBlock1", this.cellSize + (this.cellSize / 10), ReconstructionType.DataOnly, 1);
    }

    @Test(timeout = 120000)
    public void testRecoverOneDataBlock2() throws Exception {
        assertFileBlocksReconstruction("/testRecoverOneDataBlock2", 1, ReconstructionType.DataOnly, 1);
    }

    @Test(timeout = 120000)
    public void testRecoverAnyBlocks() throws Exception {
        assertFileBlocksReconstruction("/testRecoverAnyBlocks", (this.parityBlkNum * this.blockSize) + (this.blockSize / 10), ReconstructionType.Any, this.random.nextInt(this.parityBlkNum) + 1);
    }

    @Test(timeout = 120000)
    public void testRecoverAnyBlocks1() throws Exception {
        assertFileBlocksReconstruction("/testRecoverAnyBlocks1", ((this.dataBlkNum + this.parityBlkNum) * this.blockSize) + (this.blockSize / 10), ReconstructionType.Any, this.random.nextInt(this.parityBlkNum) + 1);
    }

    private int[] generateDeadDnIndices(ReconstructionType reconstructionType, int i, byte[] bArr) {
        ArrayList arrayList = new ArrayList(i);
        while (arrayList.size() < i) {
            int nextInt = this.random.nextInt(bArr.length);
            boolean z = true;
            if (reconstructionType == ReconstructionType.DataOnly) {
                z = bArr[nextInt] < this.dataBlkNum;
            } else if (reconstructionType == ReconstructionType.ParityOnly) {
                z = bArr[nextInt] >= this.dataBlkNum;
            }
            if (z && !arrayList.contains(Integer.valueOf(nextInt))) {
                arrayList.add(Integer.valueOf(nextInt));
            }
        }
        int[] iArr = new int[i];
        for (int i2 = 0; i2 < i; i2++) {
            iArr[i2] = ((Integer) arrayList.get(i2)).intValue();
        }
        return iArr;
    }

    private void shutdownDataNode(DataNode dataNode) throws IOException {
        dataNode.shutdown();
        this.cluster.setDataNodeDead(dataNode.getDatanodeId());
    }

    private int generateErrors(Map<ExtendedBlock, DataNode> map, ReconstructionType reconstructionType) throws IOException {
        int i = 0;
        for (Map.Entry<ExtendedBlock, DataNode> entry : map.entrySet()) {
            if (i == 0 || reconstructionType != ReconstructionType.DataOnly || this.random.nextBoolean()) {
                LOG.info("Note: stop DataNode " + entry.getValue().getDisplayName() + " with internal block " + entry.getKey());
                shutdownDataNode(entry.getValue());
                i++;
            } else {
                LOG.info("Note: corrupt data on " + entry.getValue().getDisplayName() + " with internal block " + entry.getKey());
                this.cluster.corruptReplica(entry.getValue(), entry.getKey());
            }
        }
        return i;
    }

    private static void writeFile(DistributedFileSystem distributedFileSystem, String str, int i) throws Exception {
        byte[] bArr = new byte[i];
        Arrays.fill(bArr, (byte) 1);
        DFSTestUtil.writeFile((FileSystem) distributedFileSystem, new Path(str), bArr);
        StripedFileTestUtil.waitBlockGroupsReported(distributedFileSystem, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void assertFileBlocksReconstruction(String str, int i, ReconstructionType reconstructionType, int i2) throws Exception {
        if (i2 < 1 || i2 > this.parityBlkNum) {
            Assert.fail("toRecoverBlockNum should be between 1 ~ " + this.parityBlkNum);
        }
        Assert.assertTrue("File length must be positive.", i > 0);
        Path path = new Path(str);
        writeFile(this.fs, str, i);
        LocatedBlocks locatedBlocks = StripedFileTestUtil.getLocatedBlocks(path, this.fs);
        Assert.assertEquals(locatedBlocks.getFileLength(), i);
        LocatedStripedBlock lastLocatedBlock = locatedBlocks.getLastLocatedBlock();
        DatanodeInfo[] locations = lastLocatedBlock.getLocations();
        byte[] blockIndices = lastLocatedBlock.getBlockIndices();
        BitSet bitSet = new BitSet(this.dnNum);
        for (DatanodeInfo datanodeInfo : locations) {
            bitSet.set(this.dnMap.get(datanodeInfo).intValue());
        }
        int[] generateDeadDnIndices = generateDeadDnIndices(reconstructionType, i2, blockIndices);
        LOG.info("Note: indices == " + Arrays.toString(blockIndices) + ". Generate errors on datanodes: " + Arrays.toString(generateDeadDnIndices));
        DatanodeInfo[] datanodeInfoArr = new DatanodeInfo[i2];
        int[] iArr = new int[i2];
        ExtendedBlock[] extendedBlockArr = new ExtendedBlock[i2];
        File[] fileArr = new File[i2];
        long[] jArr = new long[i2];
        File[] fileArr2 = new File[i2];
        byte[] bArr = new byte[i2];
        Map<ExtendedBlock, DataNode> hashMap = new HashMap<>(generateDeadDnIndices.length);
        for (int i3 = 0; i3 < i2; i3++) {
            datanodeInfoArr[i3] = locations[generateDeadDnIndices[i3]];
            iArr[i3] = this.dnMap.get(datanodeInfoArr[i3]).intValue();
            extendedBlockArr[i3] = StripedBlockUtil.constructInternalBlock(lastLocatedBlock.getBlock(), this.cellSize, this.dataBlkNum, blockIndices[generateDeadDnIndices[i3]]);
            hashMap.put(extendedBlockArr[i3], this.cluster.getDataNodes().get(iArr[i3]));
            fileArr[i3] = this.cluster.getBlockFile(iArr[i3], extendedBlockArr[i3]);
            jArr[i3] = fileArr[i3].length();
            fileArr2[i3] = this.cluster.getBlockMetadataFile(iArr[i3], extendedBlockArr[i3]);
            Assert.assertEquals(jArr[i3], StripedBlockUtil.getInternalBlockLength(lastLocatedBlock.getBlockSize(), this.cellSize, this.dataBlkNum, blockIndices[generateDeadDnIndices[i3]]));
            Assert.assertTrue(fileArr2[i3].getName().endsWith(extendedBlockArr[i3].getGenerationStamp() + ".meta"));
            LOG.info("replica " + i3 + " locates in file: " + fileArr[i3]);
            bArr[i3] = DFSTestUtil.readFileAsBytes(fileArr[i3]);
        }
        int i4 = i % (this.dataBlkNum * this.blockSize);
        int min = (i4 == 0 ? this.dataBlkNum : Math.min(this.dataBlkNum, ((i4 - 1) / this.cellSize) + 1)) + this.parityBlkNum;
        Assert.assertEquals(StripedFileTestUtil.getLocatedBlocks(path, this.fs).getLastLocatedBlock().getLocations().length, min - generateErrors(hashMap, reconstructionType));
        int[] iArr2 = new int[this.dnNum - min];
        int i5 = 0;
        for (int i6 = 0; i6 < this.dnNum; i6++) {
            if (!bitSet.get(i6)) {
                int i7 = i5;
                i5++;
                iArr2[i7] = i6;
            }
        }
        StripedFileTestUtil.waitForReconstructionFinished(path, this.fs, min);
        int[] sortTargetsByReplicas = sortTargetsByReplicas(extendedBlockArr, iArr2);
        for (int i8 = 0; i8 < i2; i8++) {
            File blockFile = this.cluster.getBlockFile(sortTargetsByReplicas[i8], extendedBlockArr[i8]);
            LOG.info("replica after reconstruction " + blockFile);
            File blockMetadataFile = this.cluster.getBlockMetadataFile(sortTargetsByReplicas[i8], extendedBlockArr[i8]);
            Assert.assertEquals(jArr[i8], blockFile.length());
            LOG.info("replica before " + fileArr[i8]);
            Assert.assertTrue(blockMetadataFile.getName().endsWith(extendedBlockArr[i8].getGenerationStamp() + ".meta"));
            Assert.assertArrayEquals(bArr[i8], DFSTestUtil.readFileAsBytes(blockFile));
        }
    }

    private int[] sortTargetsByReplicas(ExtendedBlock[] extendedBlockArr, int[] iArr) {
        int[] iArr2 = new int[extendedBlockArr.length];
        for (int i = 0; i < extendedBlockArr.length; i++) {
            iArr2[i] = -1;
            int i2 = 0;
            while (true) {
                if (i2 >= iArr.length) {
                    break;
                }
                if (iArr[i2] != -1 && this.cluster.getBlockFile(iArr[i2], extendedBlockArr[i]) != null) {
                    iArr2[i] = iArr[i2];
                    iArr[i2] = -1;
                    break;
                }
                i2++;
            }
            if (iArr2[i] == -1) {
                Assert.fail("Failed to reconstruct striped block: " + extendedBlockArr[i].getBlockId());
            }
        }
        return iArr2;
    }

    @Test
    public void testProcessErasureCodingTasksSubmitionShouldSucceed() throws Exception {
        DataNode dataNode = this.cluster.dataNodes.get(0).datanode;
        int size = this.cluster.dataNodes.size();
        BlockECReconstructionCommand.BlockECReconstructionInfo blockECReconstructionInfo = new BlockECReconstructionCommand.BlockECReconstructionInfo(new ExtendedBlock("bp-id", 123456L), new DatanodeInfo[size + 1], new DatanodeStorageInfo[]{BlockManagerTestUtil.newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("s01"))}, new byte[size], new byte[0], this.ecPolicy);
        ArrayList arrayList = new ArrayList();
        arrayList.add(blockECReconstructionInfo);
        dataNode.getErasureCodingWorker().processErasureCodingTasks(arrayList);
    }

    @Test(timeout = 120000)
    public void testNNSendsErasureCodingTasks() throws Exception {
        testNNSendsErasureCodingTasks(1);
        testNNSendsErasureCodingTasks(2);
    }

    private void testNNSendsErasureCodingTasks(int i) throws Exception {
        this.cluster.shutdown();
        int i2 = this.dnNum + 1;
        this.conf.setInt("dfs.namenode.reconstruction.pending.timeout-sec", 1);
        this.conf.setInt("dfs.namenode.replication.max-streams", 20);
        this.conf.setInt("dfs.datanode.ec.reconstruction.threads", 2);
        this.conf.setInt("dfs.client.socket-timeout", 5000);
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(i2).build();
        this.cluster.waitActive();
        this.fs = this.cluster.getFileSystem();
        ErasureCodingPolicy erasureCodingPolicy = this.ecPolicy;
        this.fs.enableErasureCodingPolicy(erasureCodingPolicy.getName());
        this.fs.getClient().setErasureCodingPolicy("/", erasureCodingPolicy.getName());
        int numDataUnits = this.cellSize * this.ecPolicy.getNumDataUnits();
        for (int i3 = 0; i3 < 50; i3++) {
            writeFile(this.fs, "/ec-file-" + i3, numDataUnits);
        }
        Assume.assumeTrue("Ignore case where num dead DNs > num parity units", erasureCodingPolicy.getNumParityUnits() >= i);
        ArrayList arrayList = new ArrayList(this.cluster.getDataNodes());
        Collections.shuffle(arrayList);
        Iterator it = arrayList.subList(0, i).iterator();
        while (it.hasNext()) {
            shutdownDataNode((DataNode) it.next());
        }
        FSNamesystem namesystem = this.cluster.getNamesystem();
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(namesystem.getPendingDeletionBlocks() == 0);
        }, 500L, 30000L);
        while (namesystem.getPendingReconstructionBlocks() > 0) {
            long numTimedOutPendingReconstructions = namesystem.getNumTimedOutPendingReconstructions();
            Assert.assertEquals(String.format("Found %d timeout pending reconstruction tasks", Long.valueOf(numTimedOutPendingReconstructions)), 0L, numTimedOutPendingReconstructions);
            Thread.sleep(1000L);
        }
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(this.cluster.getDataNodes().stream().mapToInt((v0) -> {
                return v0.getXmitsInProgress();
            }).sum() == 0);
        }, 500L, 30000L);
    }

    @Test(timeout = 180000)
    public void testErasureCodingWorkerXmitsWeight() throws Exception {
        testErasureCodingWorkerXmitsWeight(0.5f, (int) (this.ecPolicy.getNumDataUnits() * 0.5f));
        testErasureCodingWorkerXmitsWeight(1.0f, this.ecPolicy.getNumDataUnits());
        testErasureCodingWorkerXmitsWeight(0.0f, 1);
        testErasureCodingWorkerXmitsWeight(10.0f, 10 * this.ecPolicy.getNumDataUnits());
    }

    private void testErasureCodingWorkerXmitsWeight(float f, int i) throws Exception {
        this.conf.setFloat("dfs.datanode.ec.reconstruction.xmits.weight", f);
        this.cluster.shutdown();
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(this.dnNum).build();
        this.cluster.waitActive();
        this.fs = this.cluster.getFileSystem();
        this.fs.enableErasureCodingPolicy(this.ecPolicy.getName());
        this.fs.getClient().setErasureCodingPolicy("/", this.ecPolicy.getName());
        writeFile(this.fs, "/ec-xmits-weight", this.cellSize * this.ecPolicy.getNumDataUnits() * 2);
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        int size = dataNode.getFSDataset().getFinalizedBlocks(this.cluster.getNameNode().getNamesystem().getBlockPoolId()).size();
        int i2 = size * i;
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(size + 1);
        DataNodeFaultInjector dataNodeFaultInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set(new DataNodeFaultInjector() { // from class: org.apache.hadoop.hdfs.TestReconstructStripedFile.1
            public void stripedBlockReconstruction() throws IOException {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    throw new IOException(e);
                }
            }
        });
        try {
            shutdownDataNode(dataNode);
            LambdaTestUtils.await(30000, 500, () -> {
                return Boolean.valueOf(this.cluster.getDataNodes().stream().mapToInt((v0) -> {
                    return v0.getXmitsInProgress();
                }).sum() == i2);
            });
            cyclicBarrier.await();
            DataNodeFaultInjector.set(dataNodeFaultInjector);
            Iterator<DataNode> it = this.cluster.getDataNodes().iterator();
            while (it.hasNext()) {
                DataNode next = it.next();
                GenericTestUtils.waitFor(() -> {
                    return Boolean.valueOf(next.getXceiverCount() <= 1);
                }, 10L, 60000L);
                GenericTestUtils.waitFor(() -> {
                    return Boolean.valueOf(next.getXmitsInProgress() == 0);
                }, 10L, 2500L);
            }
        } catch (Throwable th) {
            cyclicBarrier.await();
            DataNodeFaultInjector.set(dataNodeFaultInjector);
            Iterator<DataNode> it2 = this.cluster.getDataNodes().iterator();
            while (it2.hasNext()) {
                DataNode next2 = it2.next();
                GenericTestUtils.waitFor(() -> {
                    return Boolean.valueOf(next2.getXceiverCount() <= 1);
                }, 10L, 60000L);
                GenericTestUtils.waitFor(() -> {
                    return Boolean.valueOf(next2.getXmitsInProgress() == 0);
                }, 10L, 2500L);
            }
            throw th;
        }
    }

    @Test(timeout = 120000)
    public void testTimeoutReadBlockInReconstruction() throws Exception {
        Assume.assumeTrue("Ignore case where num parity units <= 1", this.ecPolicy.getNumParityUnits() > 1);
        int i = this.conf.getInt("dfs.datanode.ec.reconstruction.stripedread.buffer.size", this.cellSize);
        ErasureCodingPolicy erasureCodingPolicy = this.ecPolicy;
        this.fs.enableErasureCodingPolicy(erasureCodingPolicy.getName());
        this.fs.getClient().setErasureCodingPolicy("/", erasureCodingPolicy.getName());
        int numDataUnits = i * 2 * this.ecPolicy.getNumDataUnits();
        Path path = new Path("/timeout-read-block");
        writeFile(this.fs, "/timeout-read-block", numDataUnits);
        this.fs.getFileBlockLocations(path, 0L, numDataUnits);
        LocatedBlocks locatedBlocks = StripedFileTestUtil.getLocatedBlocks(path, this.fs);
        Assert.assertEquals(1L, locatedBlocks.getLocatedBlocks().size());
        DataNode dataNode = this.cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
        final int i2 = this.conf.getInt("dfs.datanode.ec.reconstruction.stripedread.timeout.millis", 5000);
        Assert.assertTrue("dfs.datanode.ec.reconstruction.stripedread.timeout.millis must be greater than 2000", i2 > 2000);
        DataNodeFaultInjector dataNodeFaultInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set(new DataNodeFaultInjector() { // from class: org.apache.hadoop.hdfs.TestReconstructStripedFile.2
            private AtomicInteger numDelayReader = new AtomicInteger(0);

            public void delayBlockReader() {
                int incrementAndGet = this.numDelayReader.incrementAndGet();
                TestReconstructStripedFile.LOG.info("Delay the {}th read block", Integer.valueOf(incrementAndGet));
                if (incrementAndGet == 1) {
                    try {
                        GenericTestUtils.waitFor(() -> {
                            return Boolean.valueOf(this.numDelayReader.get() >= TestReconstructStripedFile.this.ecPolicy.getNumDataUnits() + 1);
                        }, 50L, i2 * 3);
                    } catch (InterruptedException e) {
                    } catch (TimeoutException e2) {
                        Assert.fail("Can't reconstruct the file's first part.");
                    }
                }
                if (incrementAndGet <= (3 * TestReconstructStripedFile.this.ecPolicy.getNumDataUnits()) + 1) {
                } else {
                    while (true) {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e3) {
                        }
                    }
                }
            }
        });
        try {
            shutdownDataNode(dataNode);
            StripedFileTestUtil.waitForReconstructionFinished(path, this.fs, this.groupSize);
            DataNodeFaultInjector.set(dataNodeFaultInjector);
        } catch (Throwable th) {
            DataNodeFaultInjector.set(dataNodeFaultInjector);
            throw th;
        }
    }

    @Test(timeout = 120000)
    public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception {
        Assume.assumeTrue("Ignore case where num parity units <= 1", this.ecPolicy.getNumParityUnits() > 1);
        int i = this.conf.getInt("dfs.datanode.ec.reconstruction.stripedread.buffer.size", this.cellSize) * 2 * this.ecPolicy.getNumDataUnits();
        Path path = new Path("/no-dirty-buffer");
        writeFile(this.fs, "/no-dirty-buffer", i);
        this.fs.getFileBlockLocations(path, 0L, i);
        LocatedBlocks locatedBlocks = StripedFileTestUtil.getLocatedBlocks(path, this.fs);
        Assert.assertEquals(1L, locatedBlocks.getLocatedBlocks().size());
        DataNode dataNode = this.cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
        final int i2 = this.conf.getInt("dfs.datanode.ec.reconstruction.stripedread.timeout.millis", 5000);
        Assert.assertTrue("dfs.datanode.ec.reconstruction.stripedread.timeout.millis must be greater than 2000", i2 > 2000);
        ElasticByteBufferPool elasticByteBufferPool = (ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool();
        emptyBufferPool(elasticByteBufferPool, true);
        emptyBufferPool(elasticByteBufferPool, false);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        DataNodeFaultInjector dataNodeFaultInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set(new DataNodeFaultInjector() { // from class: org.apache.hadoop.hdfs.TestReconstructStripedFile.3
            private AtomicInteger numDelayReader = new AtomicInteger(0);
            private AtomicBoolean continueRead = new AtomicBoolean(false);
            private AtomicBoolean closeByNPE = new AtomicBoolean(false);
            private AtomicInteger numFreeBuffer = new AtomicInteger(0);

            public void delayBlockReader() {
                int incrementAndGet = this.numDelayReader.incrementAndGet();
                TestReconstructStripedFile.LOG.info("Delay the {}th read block", Integer.valueOf(incrementAndGet));
                if (incrementAndGet == 1) {
                    try {
                        GenericTestUtils.waitFor(() -> {
                            return Boolean.valueOf(this.numDelayReader.get() >= TestReconstructStripedFile.this.ecPolicy.getNumDataUnits() + 1);
                        }, 50L, i2 * 3);
                    } catch (InterruptedException e) {
                    } catch (TimeoutException e2) {
                        Assert.fail("Can't reconstruct the file's first part.");
                    }
                }
                if (incrementAndGet > TestReconstructStripedFile.this.ecPolicy.getNumDataUnits() + 1) {
                    try {
                        AtomicInteger atomicInteger2 = atomicInteger;
                        GenericTestUtils.waitFor(() -> {
                            TestReconstructStripedFile.LOG.info("Close by NPE: {}, continue read: {}", this.closeByNPE, this.continueRead);
                            return Boolean.valueOf(this.closeByNPE.get() ? this.continueRead.get() : incrementAndGet == atomicInteger2.get() + 1);
                        }, 5L, i2 * 3);
                    } catch (InterruptedException e3) {
                    } catch (TimeoutException e4) {
                        Assert.fail("Can't reconstruct the file's remaining part.");
                    }
                }
            }

            public void interceptBlockReader() {
                TestReconstructStripedFile.LOG.info("Intercept the end of {}th read block.", Integer.valueOf(atomicInteger.incrementAndGet()));
            }

            public void interceptFreeBlockReaderBuffer() {
                this.closeByNPE.compareAndSet(false, true);
                int incrementAndGet = this.numFreeBuffer.incrementAndGet();
                TestReconstructStripedFile.LOG.info("Intercept the {} free block buffer.", Integer.valueOf(incrementAndGet));
                if (incrementAndGet >= TestReconstructStripedFile.this.ecPolicy.getNumDataUnits() + 1) {
                    this.continueRead.compareAndSet(false, true);
                    try {
                        AtomicInteger atomicInteger2 = atomicInteger;
                        GenericTestUtils.waitFor(() -> {
                            return Boolean.valueOf(atomicInteger2.get() >= (2 * TestReconstructStripedFile.this.ecPolicy.getNumDataUnits()) + 1);
                        }, 50L, i2 * 3);
                    } catch (InterruptedException e) {
                    } catch (TimeoutException e2) {
                        Assert.fail("Can't finish the file's reconstruction.");
                    }
                }
            }
        });
        try {
            shutdownDataNode(dataNode);
            GenericTestUtils.waitFor(() -> {
                return Boolean.valueOf(atomicInteger.get() >= (2 * this.ecPolicy.getNumDataUnits()) + 1);
            }, 50L, i2 * 3);
            assertBufferPoolIsEmpty(elasticByteBufferPool, false);
            assertBufferPoolIsEmpty(elasticByteBufferPool, true);
            StripedFileTestUtil.waitForReconstructionFinished(path, this.fs, this.groupSize);
            DataNodeFaultInjector.set(dataNodeFaultInjector);
        } catch (Throwable th) {
            DataNodeFaultInjector.set(dataNodeFaultInjector);
            throw th;
        }
    }

    private void assertBufferPoolIsEmpty(ElasticByteBufferPool elasticByteBufferPool, boolean z) {
        while (elasticByteBufferPool.size(z) != 0) {
            Assert.assertEquals(0L, elasticByteBufferPool.getBuffer(z, 0).position());
        }
    }

    private void emptyBufferPool(ElasticByteBufferPool elasticByteBufferPool, boolean z) {
        while (elasticByteBufferPool.size(z) != 0) {
            elasticByteBufferPool.getBuffer(z, 0);
        }
    }

    static {
        GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
        GenericTestUtils.setLogLevel(BlockManager.LOG, Level.TRACE);
        GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.TRACE);
    }
}
