Skip to content

Commit

Permalink
change code logic for BlockRescontructionWork
Browse files Browse the repository at this point in the history
  • Loading branch information
VicoWu committed Jul 4, 2024
1 parent c33d868 commit 02e0897
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.protocol.BlockType.CONTIGUOUS;
import static org.apache.hadoop.hdfs.protocol.BlockType.STRIPED;
import static org.apache.hadoop.hdfs.server.blockmanagement.LowRedundancyBlocks.LEVEL;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.now;

Expand Down Expand Up @@ -869,7 +870,7 @@ public void metaSave(PrintWriter out) {
synchronized (neededReconstruction) {
out.println("Metasave: Blocks waiting for reconstruction: "
+ neededReconstruction.getLowRedundancyBlockCount());
for (int i = 0; i < neededReconstruction.LEVEL; i++) {
for (int i = 0; i < LEVEL; i++) {
if (i != neededReconstruction.QUEUE_WITH_CORRUPT_BLOCKS) {
for (Iterator<BlockInfo> it = neededReconstruction.iterator(i);
it.hasNext();) {
Expand Down Expand Up @@ -969,7 +970,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
// source node returned is not used
chooseSourceDatanodes(blockInfo, containingNodes,
containingLiveReplicasNodes, numReplicas, new ArrayList<Byte>(),
new ArrayList<Byte>(), new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
new ArrayList<Byte>(), new ArrayList<Byte>(), LEVEL);

// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
Expand Down Expand Up @@ -2099,28 +2100,22 @@ int computeInvalidateWork(int nodesToProcess) {
* @return number of blocks scheduled for reconstruction during this
* iteration.
*/
int computeBlockReconstructionWork(int blocksToProcess) {
int scheduleBlockReconstructionWork(int blocksToProcess) {
List<List<BlockInfo>> blocksToReconstruct = null;
namesystem.writeLock();
try {
boolean reset = false;
if (replQueueResetToHeadThreshold > 0) {
if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
reset = true;
replQueueCallsSinceReset = 0;
} else {
replQueueCallsSinceReset++;
}
boolean reset = false;
if (replQueueResetToHeadThreshold > 0) {
if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
reset = true;
replQueueCallsSinceReset = 0;
} else {
replQueueCallsSinceReset++;
}
// Choose the blocks to be reconstructed
blocksToReconstruct = neededReconstruction
.chooseLowRedundancyBlocks(blocksToProcess, reset);
} finally {
namesystem.writeUnlock("computeBlockReconstructionWork");
}
return computeReconstructionWorkForBlocks(blocksToReconstruct);
return scheduleReconstructionWorkForBlocks(blocksToProcess, reset);
}


/**
* Reconstruct a set of blocks to full strength through replication or
* erasure coding
Expand All @@ -2129,29 +2124,39 @@ int computeBlockReconstructionWork(int blocksToProcess) {
* @return the number of blocks scheduled for replication
*/
@VisibleForTesting
int computeReconstructionWorkForBlocks(
List<List<BlockInfo>> blocksToReconstruct) {
int scheduleReconstructionWorkForBlocks(int blocksToProcess, boolean resetIterators) {
int scheduledWork = 0;
List<BlockReconstructionWork> reconWork = new ArrayList<>();

// Step 1: categorize at-risk blocks into replication and EC tasks
namesystem.writeLock();
int priority = 0;
// Step 1: categorize at-risk blocks into replication and EC tasks
try {
synchronized (neededReconstruction) {
for (int priority = 0; priority < blocksToReconstruct
.size(); priority++) {
for (BlockInfo block : blocksToReconstruct.get(priority)) {
BlockReconstructionWork rw = scheduleReconstruction(block,
priority);
for (; blocksToProcess > 0 && priority < LEVEL; priority++) {
List<BlockInfo> blocks = new ArrayList<>();
int processed = neededReconstruction.
chooseLowRedundancyBlocksForPriority(priority, blocksToProcess, blocks);
if(processed == 0)
break;
for (BlockInfo block : blocks) {
BlockReconstructionWork rw = generateReconstructionForBlock(block,
priority);
if (rw != null) {
reconWork.add(rw);
// if we constructed effective work, reduce the budget
blocksToProcess--;
}
}
}
}
} finally {
namesystem.writeUnlock("computeReconstructionWorkForBlocks");
namesystem.writeUnlock("generateReconstructionWorkForBlocks");
}
if (priority == LEVEL || resetIterators) {
// Reset all bookmarks because there were no recently added blocks.
neededReconstruction.resetIterators();
}


// Step 2: choose target nodes for each reconstruction task
for (BlockReconstructionWork rw : reconWork) {
Expand All @@ -2161,7 +2166,7 @@ int computeReconstructionWorkForBlocks(

// Exclude all nodes which already exists as targets for the block
List<DatanodeStorageInfo> targets =
pendingReconstruction.getTargets(rw.getBlock());
pendingReconstruction.getTargets(rw.getBlock());
if (targets != null) {
for (DatanodeStorageInfo dn : targets) {
excludedNodes.add(dn.getDatanodeDescriptor());
Expand All @@ -2170,7 +2175,7 @@ int computeReconstructionWorkForBlocks(

// choose replication targets: NOT HOLDING THE GLOBAL LOCK
final BlockPlacementPolicy placementPolicy =
placementPolicies.getPolicy(rw.getBlock().getBlockType());
placementPolicies.getPolicy(rw.getBlock().getBlockType());
rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
}

Expand All @@ -2191,7 +2196,7 @@ int computeReconstructionWorkForBlocks(
}
}
} finally {
namesystem.writeUnlock("computeReconstructionWorkForBlocks");
namesystem.writeUnlock("scheduleReconstructionWorkForBlocks");
}

if (blockLog.isDebugEnabled()) {
Expand All @@ -2204,16 +2209,17 @@ int computeReconstructionWorkForBlocks(
targetList.append(' ').append(target.getDatanodeDescriptor());
}
blockLog.debug("BLOCK* ask {} to replicate {} to {}",
rw.getSrcNodes(), rw.getBlock(), targetList);
rw.getSrcNodes(), rw.getBlock(), targetList);
}
}
blockLog.debug("BLOCK* neededReconstruction = {} pendingReconstruction = {}",
neededReconstruction.size(), pendingReconstruction.size());
neededReconstruction.size(), pendingReconstruction.size());
}

return scheduledWork;
}


// Check if the number of live + pending replicas satisfies
// the expected redundancy.
boolean hasEnoughEffectiveReplicas(BlockInfo block,
Expand All @@ -2225,7 +2231,7 @@ boolean hasEnoughEffectiveReplicas(BlockInfo block,
}

@VisibleForTesting
BlockReconstructionWork scheduleReconstruction(BlockInfo block,
BlockReconstructionWork generateReconstructionForBlock(BlockInfo block,
int priority) {
// skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
Expand Down Expand Up @@ -2615,7 +2621,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
}

// for EC here need to make sure the numReplicas replicates state correct
// because in the scheduleReconstruction it need the numReplicas to check
// because in the generateReconstructionForBlock it need the numReplicas to check
// whether need to reconstruct the ec internal block
byte blockIndex = -1;
if (isStriped) {
Expand Down Expand Up @@ -4954,7 +4960,7 @@ public void removeBlock(BlockInfo block) {
DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
.toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
}
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
neededReconstruction.remove(block, LEVEL);
postponedMisreplicatedBlocks.remove(block);
}

Expand Down Expand Up @@ -5405,7 +5411,7 @@ int computeDatanodeWork() {
final int nodesToProcess = (int) Math.ceil(numlive
* this.blocksInvalidateWorkPct);

int workFound = this.computeBlockReconstructionWork(blocksToProcess);
int workFound = this.scheduleBlockReconstructionWork(blocksToProcess);

// Update counters
namesystem.writeLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/**
* This class is used internally by
* {@link BlockManager#computeReconstructionWorkForBlocks} to represent a
* {@link BlockManager#scheduleReconstructionWorkForBlocks} to represent a
* task to reconstruct a block through replication or erasure coding.
* Reconstruction is done by transferring data from srcNodes to targets
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,44 +520,57 @@ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(

int count = 0;
int priority = 0;
HashSet<BlockInfo> toRemove = new HashSet<>();
for (; count < blocksToProcess && priority < LEVEL; priority++) {
// Go through all blocks that need reconstructions with current priority.
// Set the iterator to the first unprocessed block at this priority level
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
// to look for deleted blocks if any.
List<BlockInfo> blocks = new ArrayList<>();
int processed = chooseLowRedundancyBlocksForPriority(priority, blocksToProcess, blocks);
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
final List<BlockInfo> blocks = new LinkedList<>();
if (!inCorruptLevel) {
blocksToReconstruct.add(blocks);
}
for(; count < blocksToProcess && i.hasNext(); count++) {
BlockInfo block = i.next();
if (block.isDeleted()) {
toRemove.add(block);
continue;
}
if (!inCorruptLevel) {
blocks.add(block);
}
}
for (BlockInfo bInfo : toRemove) {
remove(bInfo, priority);
}
toRemove.clear();
count += processed;
}

if (priority == LEVEL || resetIterators) {
// Reset all bookmarks because there were no recently added blocks.
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
q.resetBookmark();
}
resetIterators();
}

return blocksToReconstruct;
}

synchronized void resetIterators(){
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
q.resetBookmark();
}
}

synchronized int chooseLowRedundancyBlocksForPriority(
int priority, int blocksToProcess, List<BlockInfo> blocks) {
HashSet<BlockInfo> toRemove = new HashSet<>();
int count = 0;
// Go through all blocks that need reconstructions with current priority.
// Set the iterator to the first unprocessed block at this priority level
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
// to look for deleted blocks if any.
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
for(; count < blocksToProcess && i.hasNext(); count++) {
BlockInfo block = i.next();
if (block.isDeleted()) {
toRemove.add(block);
continue;
}
if (!inCorruptLevel) {
blocks.add(block);
}
}
for (BlockInfo bInfo : toRemove) {
remove(bInfo, priority);
}
toRemove.clear();
return count;
}


/** Returns an iterator of all blocks in a given priority queue. */
synchronized Iterator<BlockInfo> iterator(int level) {
return priorityQueues.get(level).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public static void checkRedundancy(final BlockManager blockManager) {
*/
public static int computeAllPendingWork(BlockManager bm) {
int work = computeInvalidationWork(bm);
work += bm.computeBlockReconstructionWork(Integer.MAX_VALUE);
work += bm.scheduleBlockReconstructionWork(Integer.MAX_VALUE);
return work;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) {
assertEquals("Block not initially pending reconstruction", 0,
bm.pendingReconstruction.getNumReplicas(block));
assertEquals(
"computeBlockReconstructionWork should indicate reconstruction is needed",
1, bm.computeReconstructionWorkForBlocks(list_all));
"scheduleBlockReconstructionWork should indicate reconstruction is needed",
1, bm.scheduleReconstructionWorkForBlocks(list_all));
assertTrue("reconstruction is pending after work is computed",
bm.pendingReconstruction.getNumReplicas(block) > 0);

Expand Down Expand Up @@ -897,7 +897,7 @@ public void testSkipReconstructionWithManyBusyNodes() {
aBlockInfoStriped.setBlockCollectionId(mockINodeId);

// reconstruction should be scheduled
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
BlockReconstructionWork work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 2 nodes reach maxReplicationStreams
Expand All @@ -907,7 +907,7 @@ public void testSkipReconstructionWithManyBusyNodes() {
}

// reconstruction should be skipped since the number of non-busy nodes are not enough
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
assertNull(work);
}

Expand Down Expand Up @@ -941,7 +941,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
aBlockInfoStriped.setBlockCollectionId(mockINodeId);

// reconstruction should be scheduled
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
BlockReconstructionWork work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 1 node reaches maxReplicationStreams
Expand All @@ -950,7 +950,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
}

// reconstruction should still be scheduled since there are 2 source nodes to create 2 blocks
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
assertNotNull(work);

// simulate the 1 more node reaches maxReplicationStreams
Expand All @@ -959,7 +959,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
}

// reconstruction should be skipped since the number of non-busy nodes are not enough
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
assertNull(work);
}

Expand Down Expand Up @@ -995,7 +995,7 @@ public void testSkipReconstructionWithManyBusyNodes3() {
aBlockInfoStriped.setBlockCollectionId(mockINodeId);

// Reconstruction should be scheduled.
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
BlockReconstructionWork work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
assertNotNull(work);

ExtendedBlock dummyBlock = new ExtendedBlock("bpid", 1, 1, 1);
Expand All @@ -1011,7 +1011,7 @@ public void testSkipReconstructionWithManyBusyNodes3() {
}

// Reconstruction should be skipped since the number of non-busy nodes are not enough.
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
assertNull(work);
}

Expand Down Expand Up @@ -2062,7 +2062,7 @@ public void testValidateReconstructionWorkAndRacksNotEnough() {
assertFalse(status.isPlacementPolicySatisfied());
DatanodeStorageInfo newNode = DFSTestUtil.createDatanodeStorageInfo(
"storage8", "8.8.8.8", "/rackA", "host8");
BlockReconstructionWork work = bm.scheduleReconstruction(blockInfo, 3);
BlockReconstructionWork work = bm.generateReconstructionForBlock(blockInfo, 3);
assertNotNull(work);
assertEquals(1, work.getAdditionalReplRequired());
// the new targets in rack A.
Expand Down
Loading

0 comments on commit 02e0897

Please sign in to comment.