Skip to content

Commit

Permalink
p2p: Add additional peers to download from when close to the tip
Browse files Browse the repository at this point in the history
If we are 1024 or less blocks away from the tip and haven't requested or received
a block from any peer for 30 seconds, add another peer to download the critical
block from. Add up to two additional peers this way.
  • Loading branch information
mzumsande committed Oct 10, 2024
1 parent b78b78f commit 8c0c5ff
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
33 changes: 32 additions & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16;
static constexpr auto BLOCK_STALLING_TIMEOUT_DEFAULT{2s};
/** Maximum timeout for stalling block download. */
static constexpr auto BLOCK_STALLING_TIMEOUT_MAX{64s};
/** Timeout for stalling when close to the tip, after which we may add additional peers to download from */
static constexpr auto BLOCK_NEARTIP_TIMEOUT_MAX{30s};
/** Maximum depth of blocks we're willing to serve as compact blocks to peers
* when requested. For older blocks, a regular BLOCK response will be sent. */
static const int MAX_CMPCTBLOCK_DEPTH = 5;
Expand Down Expand Up @@ -787,7 +789,10 @@ class PeerManagerImpl final : public PeerManager
std::atomic<int> m_best_height{-1};
/** The time of the best chain tip block */
std::atomic<std::chrono::seconds> m_best_block_time{0s};

/** The last time we requested a block from any peer */
std::atomic<std::chrono::seconds> m_last_block_requested{0s};
/** The last time we received a block from any peer */
std::atomic<std::chrono::seconds> m_last_block_received{0s};
/** Next time to check for stale tip */
std::chrono::seconds m_stale_tip_check_time GUARDED_BY(cs_main){0s};

Expand Down Expand Up @@ -1375,6 +1380,7 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st
if (pit) {
*pit = &itInFlight->second.second;
}
m_last_block_requested = GetTime<std::chrono::seconds>();
return true;
}

Expand Down Expand Up @@ -1623,6 +1629,30 @@ void PeerManagerImpl::FindNextBlocks(std::vector<const CBlockIndex*>& vBlocks, c
if (waitingfor == -1) {
// This is the first already-in-flight block.
waitingfor = mapBlocksInFlight.lower_bound(pindex->GetBlockHash())->second.first;

// Decide whether to request this block from additional peers in parallel.
// This is done if we are close (<=1024 blocks) from the tip, so that the usual
// stalling mechanism doesn't work. To reduce excessive waste of bandwith, do this only
// 30 seconds (BLOCK_NEARTIP_TIMEOUT_MAX) after a block was requested or received from any peer,
// and only with up to 3 peers in parallel.
bool already_requested_from_peer{false};
auto range{mapBlocksInFlight.equal_range(pindex->GetBlockHash())};
while (range.first != range.second) {
if (range.first->second.first == peer.m_id) {
already_requested_from_peer = true;
break;
}
range.first++;
}
if (nMaxHeight <= nWindowEnd && // we have 1024 or less blocks left to download
m_last_block_requested.load() > 0s &&
GetTime<std::chrono::microseconds>() > m_last_block_requested.load() + BLOCK_NEARTIP_TIMEOUT_MAX &&
GetTime<std::chrono::microseconds>() > m_last_block_received.load() + BLOCK_NEARTIP_TIMEOUT_MAX &&
!already_requested_from_peer &&
mapBlocksInFlight.count(pindex->GetBlockHash()) <= 2) {
LogDebug(BCLog::NET, "Possible stalling close to tip: Requesting block %s additionally from peer %d\n", pindex->GetBlockHash().ToString(), peer.m_id);
vBlocks.push_back(pindex);
}
}
continue;
}
Expand Down Expand Up @@ -3608,6 +3638,7 @@ void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptr<const CBlo
m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block);
if (new_block) {
node.m_last_block_time = GetTime<std::chrono::seconds>();
m_last_block_received = GetTime<std::chrono::seconds>();
// In case this block came from a different peer than we requested
// from, we can erase the block request now anyway (as we just stored
// this block to disk).
Expand Down
27 changes: 16 additions & 11 deletions test/functional/p2p_ibd_stalling.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,27 +160,32 @@ def near_tip_stalling(self):
# only send 1024 headers, so that the window can't overshoot and the ibd stalling mechanism isn't triggered
headers_message = msg_headers()
headers_message.headers = [CBlockHeader(b) for b in self.blocks[:self.NUM_BLOCKS-1]]
self.log.info("Add two stalling peers")
for id in range(2):
self.log.info("Add three stalling peers")
for id in range(3):
peers.append(node.add_outbound_p2p_connection(P2PStaller(stall_block), p2p_idx=id, connection_type="outbound-full-relay"))
peers[-1].block_store = self.block_dict
peers[-1].send_message(headers_message)

self.wait_until(lambda: sum(len(peer['inflight']) for peer in node.getpeerinfo()) == 1)
self.all_sync_send_with_ping(peers)
assert peers[0].stall_block_requested
assert not peers[1].stall_block_requested
assert_equal(sum(peer.stall_block_requested for peer in peers), 1)

self.log.info("Check that after 9 minutes, nothing is done against the stalling")
self.mocktime = int(time.time()) + 9 * 60
self.log.info("Check that after 30 seconds we request the block from a second peer")
self.mocktime = int(time.time()) + 31
node.setmocktime(self.mocktime)
self.all_sync_send_with_ping(peers)
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 2)

self.log.info("Check that after more than 10 minutes, the stalling peer is disconnected")
self.mocktime += 2 * 60
self.log.info("Check that after another 30 seconds we request the block from a third peer")
self.mocktime += 31
node.setmocktime(self.mocktime)
peers[0].wait_for_disconnect()
self.wait_until(lambda: peers[1].stall_block_requested)
self.wait_until(lambda: sum(peer.stall_block_requested for peer in peers) == 3)

self.log.info("Check that after another 20 minutes, all stalling peers are disconnected")
# 10 minutes BLOCK_DOWNLOAD_TIMEOUT_BASE + 2*5 minutes BLOCK_DOWNLOAD_TIMEOUT_PER_PEER
self.mocktime += 20 * 60
node.setmocktime(self.mocktime)
for peer in peers:
peer.wait_for_disconnect()

def total_bytes_recv_for_blocks(self, node):
total = 0
Expand Down

0 comments on commit 8c0c5ff

Please sign in to comment.