diff --git a/crates/networking/p2p/kademlia.rs b/crates/networking/p2p/kademlia.rs index a9a8e4638e..ce78c3fa81 100644 --- a/crates/networking/p2p/kademlia.rs +++ b/crates/networking/p2p/kademlia.rs @@ -1,12 +1,15 @@ +use std::sync::Arc; + use crate::{ discv4::messages::FindNodeRequest, - peer_channels::PeerChannels, rlpx::p2p::Capability, types::{Node, NodeRecord}, + RLPxMessage, }; use ethrex_core::{H256, H512, U256}; use sha3::{Digest, Keccak256}; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{mpsc, Mutex}; use tracing::{debug, info}; pub const MAX_NODES_PER_BUCKET: usize = 16; @@ -395,6 +398,34 @@ impl PeerData { } } +pub const MAX_MESSAGES_IN_PEER_CHANNEL: usize = 25; + +#[derive(Debug, Clone)] +/// Holds the respective sender and receiver ends of the communication channels bewteen the peer data and its active connection +pub struct PeerChannels { + pub(crate) sender: mpsc::Sender, + pub(crate) receiver: Arc>>, +} + +impl PeerChannels { + /// Sets up the communication channels for the peer + /// Returns the channel endpoints to send to the active connection's listen loop + pub(crate) fn create() -> (Self, mpsc::Sender, mpsc::Receiver) { + let (sender, connection_receiver) = + mpsc::channel::(MAX_MESSAGES_IN_PEER_CHANNEL); + let (connection_sender, receiver) = + mpsc::channel::(MAX_MESSAGES_IN_PEER_CHANNEL); + ( + Self { + sender, + receiver: Arc::new(Mutex::new(receiver)), + }, + connection_sender, + connection_receiver, + ) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/networking/p2p/net.rs b/crates/networking/p2p/net.rs index 8bf426c811..14bd944188 100644 --- a/crates/networking/p2p/net.rs +++ b/crates/networking/p2p/net.rs @@ -23,7 +23,7 @@ use types::Node; pub mod bootnode; pub(crate) mod discv4; pub(crate) mod kademlia; -pub mod peer_channels; +pub mod peer_handler; pub mod rlpx; pub(crate) mod snap; pub mod sync; diff --git a/crates/networking/p2p/peer_channels.rs b/crates/networking/p2p/peer_channels.rs deleted file mode 100644 index aca46ef7c6..0000000000 --- a/crates/networking/p2p/peer_channels.rs +++ /dev/null @@ -1,514 +0,0 @@ -use std::{collections::BTreeMap, sync::Arc, time::Duration}; - -use bytes::Bytes; -use ethrex_core::{ - types::{AccountState, BlockBody, BlockHeader, Receipt}, - H256, U256, -}; -use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::Nibbles; -use ethrex_trie::{verify_range, Node}; -use tokio::sync::{mpsc, Mutex}; - -use crate::{ - rlpx::{ - eth::{ - blocks::{ - BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, - }, - receipts::{GetReceipts, Receipts}, - }, - snap::{ - AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, - StorageRanges, TrieNodes, - }, - }, - snap::encodable_to_proof, - RLPxMessage, -}; - -pub const PEER_REPLY_TIMOUT: Duration = Duration::from_secs(45); -pub const MAX_MESSAGES_IN_PEER_CHANNEL: usize = 25; -pub const MAX_RESPONSE_BYTES: u64 = 512 * 1024; -pub const HASH_MAX: H256 = H256([0xFF; 32]); - -#[derive(Debug, Clone)] -/// Holds the respective sender and receiver ends of the communication channels bewteen the peer data and its active connection -pub struct PeerChannels { - sender: mpsc::Sender, - receiver: Arc>>, -} - -pub enum BlockRequestOrder { - OldToNew, - NewToOld, -} - -impl PeerChannels { - /// Sets up the communication channels for the peer - /// Returns the channel endpoints to send to the active connection's listen loop - pub(crate) fn create() -> (Self, mpsc::Sender, mpsc::Receiver) { - let (sender, connection_receiver) = - mpsc::channel::(MAX_MESSAGES_IN_PEER_CHANNEL); - let (connection_sender, receiver) = - mpsc::channel::(MAX_MESSAGES_IN_PEER_CHANNEL); - ( - Self { - sender, - receiver: Arc::new(Mutex::new(receiver)), - }, - connection_sender, - connection_receiver, - ) - } - - /// Requests block headers from the peer, starting from the `start` block hash towards either older or newer blocks depending on the order - /// Returns the block headers or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_block_headers( - &self, - start: H256, - order: BlockRequestOrder, - ) -> Option> { - let request_id = rand::random(); - let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders { - id: request_id, - startblock: start.into(), - limit: BLOCK_HEADER_LIMIT, - skip: 0, - reverse: matches!(order, BlockRequestOrder::NewToOld), - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let block_headers = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers })) - if id == request_id => - { - return Some(block_headers) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - (!block_headers.is_empty()).then_some(block_headers) - } - - /// Requests block bodies from the peer given their block hashes - /// Returns the block bodies or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_block_bodies(&self, block_hashes: Vec) -> Option> { - let block_hashes_len = block_hashes.len(); - let request_id = rand::random(); - let request = RLPxMessage::GetBlockBodies(GetBlockBodies { - id: request_id, - block_hashes, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let block_bodies = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::BlockBodies(BlockBodies { id, block_bodies })) - if id == request_id => - { - return Some(block_bodies) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - // Check that the response is not empty and does not contain more bodies than the ones requested - (!block_bodies.is_empty() && block_bodies.len() <= block_hashes_len).then_some(block_bodies) - } - - /// Requests all receipts in a set of blocks from the peer given their block hashes - /// Returns the lists of receipts or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_receipts(&self, block_hashes: Vec) -> Option>> { - let block_hashes_len = block_hashes.len(); - let request_id = rand::random(); - let request = RLPxMessage::GetReceipts(GetReceipts { - id: request_id, - block_hashes, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let receipts = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::Receipts(Receipts { id, receipts })) if id == request_id => { - return Some(receipts) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - // Check that the response is not empty and does not contain more bodies than the ones requested - (!receipts.is_empty() && receipts.len() <= block_hashes_len).then_some(receipts) - } - - /// Requests an account range from the peer given the state trie's root and the starting hash (the limit hash will be the maximum value of H256) - /// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie - /// Returns the response message or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was not valid - pub async fn request_account_range( - &self, - state_root: H256, - start: H256, - ) -> Option<(Vec, Vec, bool)> { - let request_id = rand::random(); - let request = RLPxMessage::GetAccountRange(GetAccountRange { - id: request_id, - root_hash: state_root, - starting_hash: start, - limit_hash: HASH_MAX, - response_bytes: MAX_RESPONSE_BYTES, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let (accounts, proof) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::AccountRange(AccountRange { - id, - accounts, - proof, - })) if id == request_id => return Some((accounts, proof)), - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - // Unzip & validate response - let proof = encodable_to_proof(&proof); - let (account_hashes, accounts): (Vec<_>, Vec<_>) = accounts - .into_iter() - .map(|unit| (unit.hash, AccountState::from(unit.account))) - .unzip(); - let encoded_accounts = accounts - .iter() - .map(|acc| acc.encode_to_vec()) - .collect::>(); - let should_continue = verify_range( - state_root, - &start, - &account_hashes, - &encoded_accounts, - &proof, - ) - .ok()?; - Some((account_hashes, accounts, should_continue)) - } - - /// Requests bytecodes for the given code hashes - /// Returns the bytecodes or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_bytecodes(&self, hashes: Vec) -> Option> { - let request_id = rand::random(); - let hashes_len = hashes.len(); - let request = RLPxMessage::GetByteCodes(GetByteCodes { - id: request_id, - hashes, - bytes: MAX_RESPONSE_BYTES, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let codes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::ByteCodes(ByteCodes { id, codes })) if id == request_id => { - return Some(codes) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - (!codes.is_empty() && codes.len() <= hashes_len).then_some(codes) - } - - /// Requests storage ranges for accounts given their hashed address and storage roots, and the root of their state trie - /// account_hashes & storage_roots must have the same length - /// storage_roots must not contain empty trie hashes, we will treat empty ranges as invalid responses - /// Returns true if the last account's storage was not completely fetched by the request - /// Returns the list of hashed storage keys and values for each account's storage or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_storage_ranges( - &self, - state_root: H256, - mut storage_roots: Vec, - account_hashes: Vec, - start: H256, - ) -> Option<(Vec>, Vec>, bool)> { - let request_id = rand::random(); - let request = RLPxMessage::GetStorageRanges(GetStorageRanges { - id: request_id, - root_hash: state_root, - account_hashes, - starting_hash: start, - limit_hash: HASH_MAX, - response_bytes: MAX_RESPONSE_BYTES, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let (mut slots, proof) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof })) - if id == request_id => - { - return Some((slots, proof)) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - // Check we got a reasonable amount of storage ranges - if slots.len() > storage_roots.len() || slots.is_empty() { - return None; - } - // Unzip & validate response - let proof = encodable_to_proof(&proof); - let mut storage_keys = vec![]; - let mut storage_values = vec![]; - let mut should_continue = false; - // Validate each storage range - while !slots.is_empty() { - let (hahsed_keys, values): (Vec<_>, Vec<_>) = slots - .remove(0) - .into_iter() - .map(|slot| (slot.hash, slot.data)) - .unzip(); - // We won't accept empty storage ranges - if hahsed_keys.is_empty() { - return None; - } - let encoded_values = values - .iter() - .map(|val| val.encode_to_vec()) - .collect::>(); - let storage_root = storage_roots.remove(0); - - // The proof corresponds to the last slot, for the previous ones the slot must be the full range without edge proofs - if slots.is_empty() && !proof.is_empty() { - should_continue = - verify_range(storage_root, &start, &hahsed_keys, &encoded_values, &proof) - .ok()?; - } else { - verify_range(storage_root, &start, &hahsed_keys, &encoded_values, &[]).ok()?; - } - - storage_keys.push(hahsed_keys); - storage_values.push(values); - } - Some((storage_keys, storage_values, should_continue)) - } - - /// Requests state trie nodes given the root of the trie where they are contained and their path (be them full or partial) - /// Returns the nodes or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_state_trienodes( - &self, - state_root: H256, - paths: Vec, - ) -> Option> { - let request_id = rand::random(); - let expected_nodes = paths.len(); - let request = RLPxMessage::GetTrieNodes(GetTrieNodes { - id: request_id, - root_hash: state_root, - // [acc_path, acc_path,...] -> [[acc_path], [acc_path]] - paths: paths - .into_iter() - .map(|vec| vec![Bytes::from(vec.encode_compact())]) - .collect(), - bytes: MAX_RESPONSE_BYTES, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => { - return Some(nodes) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - (!nodes.is_empty() && nodes.len() <= expected_nodes) - .then(|| { - nodes - .iter() - .map(|node| Node::decode_raw(node)) - .collect::, _>>() - .ok() - }) - .flatten() - } - - /// Requests storage trie nodes given the root of the state trie where they are contained and - /// a hashmap mapping the path to the account in the state trie (aka hashed address) to the paths to the nodes in its storage trie (can be full or partial) - /// Returns the nodes or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_storage_trienodes( - &self, - state_root: H256, - paths: BTreeMap>, - ) -> Option> { - let request_id = rand::random(); - let expected_nodes = paths.iter().fold(0, |acc, item| acc + item.1.len()); - let request = RLPxMessage::GetTrieNodes(GetTrieNodes { - id: request_id, - root_hash: state_root, - // {acc_path: [path, path, ...]} -> [[acc_path, path, path, ...]] - paths: paths - .into_iter() - .map(|(acc_path, paths)| { - [ - vec![Bytes::from(acc_path.0.to_vec())], - paths - .into_iter() - .map(|path| Bytes::from(path.encode_compact())) - .collect(), - ] - .concat() - }) - .collect(), - bytes: MAX_RESPONSE_BYTES, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let nodes = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) if id == request_id => { - return Some(nodes) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - (!nodes.is_empty() && nodes.len() <= expected_nodes) - .then(|| { - nodes - .iter() - .map(|node| Node::decode_raw(node)) - .collect::, _>>() - .ok() - }) - .flatten() - } - - /// Requests a single storage range for an accouns given its hashed address and storage root, and the root of its state trie - /// This is a simplified version of `request_storage_range` meant to be used for large tries that require their own single requests - /// account_hashes & storage_roots must have the same length - /// storage_root must not be an empty trie hash, we will treat empty ranges as invalid responses - /// Returns true if the account's storage was not completely fetched by the request - /// Returns the list of hashed storage keys and values for the account's storage or None if: - /// - There are no available peers (the node just started up or was rejected by all other nodes) - /// - The response timed out - /// - The response was empty or not valid - pub async fn request_storage_range( - &self, - state_root: H256, - storage_root: H256, - account_hash: H256, - start: H256, - ) -> Option<(Vec, Vec, bool)> { - let request_id = rand::random(); - let request = RLPxMessage::GetStorageRanges(GetStorageRanges { - id: request_id, - root_hash: state_root, - account_hashes: vec![account_hash], - starting_hash: start, - limit_hash: HASH_MAX, - response_bytes: MAX_RESPONSE_BYTES, - }); - let mut receiver = self.receiver.lock().await; - self.sender.send(request).await.ok()?; - let (mut slots, proof) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof })) - if id == request_id => - { - return Some((slots, proof)) - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, - } - } - }) - .await - .ok()??; - // Check we got a reasonable amount of storage ranges - if slots.len() != 1 { - return None; - } - // Unzip & validate response - let proof = encodable_to_proof(&proof); - let (storage_keys, storage_values): (Vec, Vec) = slots - .remove(0) - .into_iter() - .map(|slot| (slot.hash, slot.data)) - .unzip(); - let encoded_values = storage_values - .iter() - .map(|val| val.encode_to_vec()) - .collect::>(); - // Verify storage range - let should_continue = - verify_range(storage_root, &start, &storage_keys, &encoded_values, &proof).ok()?; - Some((storage_keys, storage_values, should_continue)) - } -} diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs new file mode 100644 index 0000000000..dcd055b3b7 --- /dev/null +++ b/crates/networking/p2p/peer_handler.rs @@ -0,0 +1,604 @@ +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use bytes::Bytes; +use ethrex_core::{ + types::{AccountState, BlockBody, BlockHeader, Receipt}, + H256, U256, +}; +use ethrex_rlp::encode::RLPEncode; +use ethrex_trie::Nibbles; +use ethrex_trie::{verify_range, Node}; +use tokio::sync::Mutex; + +use crate::{ + kademlia::PeerChannels, + rlpx::{ + eth::{ + blocks::{ + BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, + }, + receipts::{GetReceipts, Receipts}, + }, + p2p::Capability, + snap::{ + AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, + StorageRanges, TrieNodes, + }, + }, + snap::encodable_to_proof, + KademliaTable, RLPxMessage, +}; +use tracing::info; +pub const PEER_REPLY_TIMOUT: Duration = Duration::from_secs(45); +pub const PEER_SELECT_RETRY_ATTEMPTS: usize = 3; +pub const REQUEST_RETRY_ATTEMPTS: usize = 5; +pub const MAX_RESPONSE_BYTES: u64 = 512 * 1024; +pub const HASH_MAX: H256 = H256([0xFF; 32]); + +/// An abstraction over the [KademliaTable] containing logic to make requests to peers +#[derive(Debug, Clone)] +pub struct PeerHandler { + peer_table: Arc>, +} + +pub enum BlockRequestOrder { + OldToNew, + NewToOld, +} + +impl PeerHandler { + pub fn new(peer_table: Arc>) -> PeerHandler { + Self { peer_table } + } + /// Returns the channel ends to an active peer connection that supports the given capability + /// The peer is selected randomly, and doesn't guarantee that the selected peer is not currently busy + /// If no peer is found, this method will try again after 10 seconds + async fn get_peer_channel_with_retry(&self, capability: Capability) -> Option { + for _ in 0..PEER_SELECT_RETRY_ATTEMPTS { + let table = self.peer_table.lock().await; + table.show_peer_stats(); + if let Some(channels) = table.get_peer_channels(capability.clone()) { + return Some(channels); + }; + // drop the lock early to no block the rest of processes + drop(table); + info!("[Sync] No peers available, retrying in 10 sec"); + // This is the unlikely case where we just started the node and don't have peers, wait a bit and try again + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + } + None + } + + /// Requests block headers from any suitable peer, starting from the `start` block hash towards either older or newer blocks depending on the order + /// Returns the block headers or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_block_headers( + &self, + start: H256, + order: BlockRequestOrder, + ) -> Option> { + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders { + id: request_id, + startblock: start.into(), + limit: BLOCK_HEADER_LIMIT, + skip: 0, + reverse: matches!(order, BlockRequestOrder::NewToOld), + }); + let peer = self.get_peer_channel_with_retry(Capability::Eth).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some(block_headers) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers })) + if id == request_id => + { + return Some(block_headers) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + .and_then(|headers| (!headers.is_empty()).then_some(headers)) + { + return Some(block_headers); + } + } + None + } + + /// Requests block bodies from any suitable peer given their block hashes + /// Returns the block bodies or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_block_bodies(&self, block_hashes: Vec) -> Option> { + let block_hashes_len = block_hashes.len(); + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetBlockBodies(GetBlockBodies { + id: request_id, + block_hashes: block_hashes.clone(), + }); + let peer = self.get_peer_channel_with_retry(Capability::Eth).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some(block_bodies) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::BlockBodies(BlockBodies { id, block_bodies })) + if id == request_id => + { + return Some(block_bodies) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + .and_then(|bodies| { + // Check that the response is not empty and does not contain more bodies than the ones requested + (!bodies.is_empty() && bodies.len() <= block_hashes_len).then_some(bodies) + }) { + return Some(block_bodies); + } + } + None + } + + /// Requests all receipts in a set of blocks from any suitable peer given their block hashes + /// Returns the lists of receipts or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_receipts(&self, block_hashes: Vec) -> Option>> { + let block_hashes_len = block_hashes.len(); + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetReceipts(GetReceipts { + id: request_id, + block_hashes: block_hashes.clone(), + }); + let peer = self.get_peer_channel_with_retry(Capability::Eth).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some(receipts) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::Receipts(Receipts { id, receipts })) + if id == request_id => + { + return Some(receipts) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + .and_then(|receipts| + // Check that the response is not empty and does not contain more bodies than the ones requested + (!receipts.is_empty() && receipts.len() <= block_hashes_len).then_some(receipts)) + { + return Some(receipts); + } + } + None + } + + /// Requests an account range from any suitable peer given the state trie's root and the starting hash (the limit hash will be the maximum value of H256) + /// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie + /// Returns the account range or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_account_range( + &self, + state_root: H256, + start: H256, + ) -> Option<(Vec, Vec, bool)> { + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetAccountRange(GetAccountRange { + id: request_id, + root_hash: state_root, + starting_hash: start, + limit_hash: HASH_MAX, + response_bytes: MAX_RESPONSE_BYTES, + }); + let peer = self.get_peer_channel_with_retry(Capability::Snap).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some((accounts, proof)) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::AccountRange(AccountRange { + id, + accounts, + proof, + })) if id == request_id => return Some((accounts, proof)), + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + { + // Unzip & validate response + let proof = encodable_to_proof(&proof); + let (account_hashes, accounts): (Vec<_>, Vec<_>) = accounts + .into_iter() + .map(|unit| (unit.hash, AccountState::from(unit.account))) + .unzip(); + let encoded_accounts = accounts + .iter() + .map(|acc| acc.encode_to_vec()) + .collect::>(); + if let Ok(should_continue) = verify_range( + state_root, + &start, + &account_hashes, + &encoded_accounts, + &proof, + ) { + return Some((account_hashes, accounts, should_continue)); + } + } + } + None + } + + /// Requests bytecodes for the given code hashes + /// Returns the bytecodes or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_bytecodes(&self, hashes: Vec) -> Option> { + let hashes_len = hashes.len(); + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetByteCodes(GetByteCodes { + id: request_id, + hashes: hashes.clone(), + bytes: MAX_RESPONSE_BYTES, + }); + let peer = self.get_peer_channel_with_retry(Capability::Snap).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some(codes) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::ByteCodes(ByteCodes { id, codes })) + if id == request_id => + { + return Some(codes) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + .and_then(|codes| (!codes.is_empty() && codes.len() <= hashes_len).then_some(codes)) + { + return Some(codes); + } + } + None + } + + /// Requests storage ranges for accounts given their hashed address and storage roots, and the root of their state trie + /// account_hashes & storage_roots must have the same length + /// storage_roots must not contain empty trie hashes, we will treat empty ranges as invalid responses + /// Returns true if the last account's storage was not completely fetched by the request + /// Returns the list of hashed storage keys and values for each account's storage or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_storage_ranges( + &self, + state_root: H256, + mut storage_roots: Vec, + account_hashes: Vec, + start: H256, + ) -> Option<(Vec>, Vec>, bool)> { + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetStorageRanges(GetStorageRanges { + id: request_id, + root_hash: state_root, + account_hashes: account_hashes.clone(), + starting_hash: start, + limit_hash: HASH_MAX, + response_bytes: MAX_RESPONSE_BYTES, + }); + let peer = self.get_peer_channel_with_retry(Capability::Snap).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some((mut slots, proof)) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof })) + if id == request_id => + { + return Some((slots, proof)) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + { + // Check we got a reasonable amount of storage ranges + if slots.len() > storage_roots.len() || slots.is_empty() { + return None; + } + // Unzip & validate response + let proof = encodable_to_proof(&proof); + let mut storage_keys = vec![]; + let mut storage_values = vec![]; + let mut should_continue = false; + // Validate each storage range + while !slots.is_empty() { + let (hahsed_keys, values): (Vec<_>, Vec<_>) = slots + .remove(0) + .into_iter() + .map(|slot| (slot.hash, slot.data)) + .unzip(); + // We won't accept empty storage ranges + if hahsed_keys.is_empty() { + continue; + } + let encoded_values = values + .iter() + .map(|val| val.encode_to_vec()) + .collect::>(); + let storage_root = storage_roots.remove(0); + + // The proof corresponds to the last slot, for the previous ones the slot must be the full range without edge proofs + if slots.is_empty() && !proof.is_empty() { + let Ok(sc) = verify_range( + storage_root, + &start, + &hahsed_keys, + &encoded_values, + &proof, + ) else { + continue; + }; + should_continue = sc; + } else if verify_range(storage_root, &start, &hahsed_keys, &encoded_values, &[]) + .is_err() + { + continue; + } + + storage_keys.push(hahsed_keys); + storage_values.push(values); + } + return Some((storage_keys, storage_values, should_continue)); + } + } + None + } + + /// Requests state trie nodes given the root of the trie where they are contained and their path (be them full or partial) + /// Returns the nodes or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_state_trienodes( + &self, + state_root: H256, + paths: Vec, + ) -> Option> { + let expected_nodes = paths.len(); + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetTrieNodes(GetTrieNodes { + id: request_id, + root_hash: state_root, + // [acc_path, acc_path,...] -> [[acc_path], [acc_path]] + paths: paths + .iter() + .map(|vec| vec![Bytes::from(vec.encode_compact())]) + .collect(), + bytes: MAX_RESPONSE_BYTES, + }); + let peer = self.get_peer_channel_with_retry(Capability::Snap).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some(nodes) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) + if id == request_id => + { + return Some(nodes) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + .and_then(|nodes| { + (!nodes.is_empty() && nodes.len() <= expected_nodes) + .then(|| { + nodes + .iter() + .map(|node| Node::decode_raw(node)) + .collect::, _>>() + .ok() + }) + .flatten() + }) { + return Some(nodes); + } + } + None + } + + /// Requests storage trie nodes given the root of the state trie where they are contained and + /// a hashmap mapping the path to the account in the state trie (aka hashed address) to the paths to the nodes in its storage trie (can be full or partial) + /// Returns the nodes or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_storage_trienodes( + &self, + state_root: H256, + paths: BTreeMap>, + ) -> Option> { + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let expected_nodes = paths.iter().fold(0, |acc, item| acc + item.1.len()); + let request = RLPxMessage::GetTrieNodes(GetTrieNodes { + id: request_id, + root_hash: state_root, + // {acc_path: [path, path, ...]} -> [[acc_path, path, path, ...]] + paths: paths + .iter() + .map(|(acc_path, paths)| { + [ + vec![Bytes::from(acc_path.0.to_vec())], + paths + .iter() + .map(|path| Bytes::from(path.encode_compact())) + .collect(), + ] + .concat() + }) + .collect(), + bytes: MAX_RESPONSE_BYTES, + }); + let peer = self.get_peer_channel_with_retry(Capability::Snap).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some(nodes) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes })) + if id == request_id => + { + return Some(nodes) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + .and_then(|nodes| { + (!nodes.is_empty() && nodes.len() <= expected_nodes) + .then(|| { + nodes + .iter() + .map(|node| Node::decode_raw(node)) + .collect::, _>>() + .ok() + }) + .flatten() + }) { + return Some(nodes); + } + } + None + } + + /// Requests a single storage range for an accouns given its hashed address and storage root, and the root of its state trie + /// This is a simplified version of `request_storage_range` meant to be used for large tries that require their own single requests + /// account_hashes & storage_roots must have the same length + /// storage_root must not be an empty trie hash, we will treat empty ranges as invalid responses + /// Returns true if the account's storage was not completely fetched by the request + /// Returns the list of hashed storage keys and values for the account's storage or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - No peer returned a valid response in the given time and retry limits + pub async fn request_storage_range( + &self, + state_root: H256, + storage_root: H256, + account_hash: H256, + start: H256, + ) -> Option<(Vec, Vec, bool)> { + for _ in 0..REQUEST_RETRY_ATTEMPTS { + let request_id = rand::random(); + let request = RLPxMessage::GetStorageRanges(GetStorageRanges { + id: request_id, + root_hash: state_root, + account_hashes: vec![account_hash], + starting_hash: start, + limit_hash: HASH_MAX, + response_bytes: MAX_RESPONSE_BYTES, + }); + let peer = self.get_peer_channel_with_retry(Capability::Snap).await?; + let mut receiver = peer.receiver.lock().await; + peer.sender.send(request).await.ok()?; + if let Some((mut slots, proof)) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof })) + if id == request_id => + { + return Some((slots, proof)) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok() + .flatten() + { + // Check we got a reasonable amount of storage ranges + if slots.len() != 1 { + return None; + } + // Unzip & validate response + let proof = encodable_to_proof(&proof); + let (storage_keys, storage_values): (Vec, Vec) = slots + .remove(0) + .into_iter() + .map(|slot| (slot.hash, slot.data)) + .unzip(); + let encoded_values = storage_values + .iter() + .map(|val| val.encode_to_vec()) + .collect::>(); + // Verify storage range + if let Ok(should_continue) = + verify_range(storage_root, &start, &storage_keys, &encoded_values, &proof) + { + return Some((storage_keys, storage_values, should_continue)); + } + } + } + None + } +} diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 2c00d377fa..45d7300ac0 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -1,5 +1,5 @@ use crate::{ - peer_channels::PeerChannels, + kademlia::PeerChannels, rlpx::{ error::RLPxError, eth::{ diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 9c9f68281a..b9d7fb07af 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -16,15 +16,11 @@ use tokio::{ }; use tracing::{debug, info, warn}; -use crate::{kademlia::KademliaTable, peer_channels::BlockRequestOrder}; use crate::{ - peer_channels::{PeerChannels, HASH_MAX}, - rlpx::p2p::Capability, + kademlia::KademliaTable, + peer_handler::{BlockRequestOrder, PeerHandler, HASH_MAX}, }; -/// Maximum amount of times we will ask a peer for an account/storage range -/// If the max amount of retries is exceeded we will asume that the state we are requesting is old and no longer available -const MAX_RETRIES: usize = 5; /// The minimum amount of blocks from the head that we want to full sync during a snap sync const MIN_FULL_BLOCKS: usize = 64; /// Max size of a bach to stat a fetch request in queues @@ -43,7 +39,7 @@ pub enum SyncMode { #[derive(Debug)] pub struct SyncManager { sync_mode: SyncMode, - peers: Arc>, + peers: PeerHandler, /// The last block number used as a pivot for snap-sync /// Syncing beyond this pivot should re-enable snap-sync (as we will not have that state stored) /// TODO: Reorgs @@ -51,10 +47,10 @@ pub struct SyncManager { } impl SyncManager { - pub fn new(peers: Arc>, sync_mode: SyncMode) -> Self { + pub fn new(peer_table: Arc>, sync_mode: SyncMode) -> Self { Self { sync_mode, - peers, + peers: PeerHandler::new(peer_table), last_snap_pivot: 0, } } @@ -65,7 +61,7 @@ impl SyncManager { let dummy_peer_table = Arc::new(Mutex::new(KademliaTable::new(Default::default()))); Self { sync_mode: SyncMode::Full, - peers: dummy_peer_table, + peers: PeerHandler::new(dummy_peer_table), last_snap_pivot: 0, } } @@ -113,65 +109,63 @@ impl SyncManager { current_head = last_header; } } - let mut retry_count = 0; - while retry_count <= MAX_RETRIES { - let peer = get_peer_channel_with_retry(self.peers.clone(), Capability::Eth).await; + loop { debug!("Requesting Block Headers from {current_head}"); // Request Block Headers from Peer - if let Some(mut block_headers) = peer + match self + .peers .request_block_headers(current_head, BlockRequestOrder::OldToNew) .await { - retry_count = 0; - debug!( - "Received {} block headers| Last Number: {}", - block_headers.len(), - block_headers.last().as_ref().unwrap().number - ); - let mut block_hashes = block_headers - .iter() - .map(|header| header.compute_block_hash()) - .collect::>(); - // Check if we already found the sync head - let sync_head_found = block_hashes.contains(&sync_head); - // Update current fetch head if needed - if !sync_head_found { - current_head = *block_hashes.last().unwrap(); - } - if matches!(self.sync_mode, SyncMode::Snap) { + Some(mut block_headers) => { + debug!( + "Received {} block headers| Last Number: {}", + block_headers.len(), + block_headers.last().as_ref().unwrap().number + ); + let mut block_hashes = block_headers + .iter() + .map(|header| header.compute_block_hash()) + .collect::>(); + // Check if we already found the sync head + let sync_head_found = block_hashes.contains(&sync_head); + // Update current fetch head if needed if !sync_head_found { - // Update snap state - store.set_header_download_checkpoint(current_head)?; - } else { - // If the sync head is less than 64 blocks away from our current head switch to full-sync - let last_header_number = block_headers.last().unwrap().number; - let latest_block_number = store.get_latest_block_number()?; - if last_header_number.saturating_sub(latest_block_number) - < MIN_FULL_BLOCKS as u64 - { - // Too few blocks for a snap sync, switching to full sync - store.clear_snap_state()?; - self.sync_mode = SyncMode::Full + current_head = *block_hashes.last().unwrap(); + } + if matches!(self.sync_mode, SyncMode::Snap) { + if !sync_head_found { + // Update snap state + store.set_header_download_checkpoint(current_head)?; + } else { + // If the sync head is less than 64 blocks away from our current head switch to full-sync + let last_header_number = block_headers.last().unwrap().number; + let latest_block_number = store.get_latest_block_number()?; + if last_header_number.saturating_sub(latest_block_number) + < MIN_FULL_BLOCKS as u64 + { + // Too few blocks for a snap sync, switching to full sync + store.clear_snap_state()?; + self.sync_mode = SyncMode::Full + } } } - } - // Discard the first header as we already have it - block_hashes.remove(0); - block_headers.remove(0); - // Store headers and save hashes for full block retrieval - all_block_hashes.extend_from_slice(&block_hashes[..]); - store.add_block_headers(block_hashes, block_headers)?; + // Discard the first header as we already have it + block_hashes.remove(0); + block_headers.remove(0); + // Store headers and save hashes for full block retrieval + all_block_hashes.extend_from_slice(&block_hashes[..]); + store.add_block_headers(block_hashes, block_headers)?; - if sync_head_found { - // No more headers to request - break; + if sync_head_found { + // No more headers to request + break; + } + } + _ => { + warn!("Sync failed to find target block header, aborting"); + return Ok(()); } - } else { - retry_count += 1; - } - if retry_count > MAX_RETRIES { - warn!("Sync failed to find target block header, aborting"); - return Ok(()); } } // We finished fetching all headers, now we can process them @@ -232,13 +226,12 @@ impl SyncManager { /// Returns an error if there was a problem while executing or validating the blocks async fn download_and_run_blocks( mut block_hashes: Vec, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result<(), SyncError> { loop { - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Eth).await; debug!("Requesting Block Bodies "); - if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone()).await { + if let Some(block_bodies) = peers.request_block_bodies(block_hashes.clone()).await { let block_bodies_len = block_bodies.len(); debug!("Received {} Block Bodies", block_bodies_len); // Execute and store blocks @@ -271,13 +264,12 @@ async fn download_and_run_blocks( /// Fetches all block bodies for the given block hashes via p2p and stores them async fn store_block_bodies( mut block_hashes: Vec, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result<(), SyncError> { loop { - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Eth).await; debug!("Requesting Block Headers "); - if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone()).await { + if let Some(block_bodies) = peers.request_block_bodies(block_hashes.clone()).await { debug!(" Received {} Block Bodies", block_bodies.len()); // Track which bodies we have already fetched let current_block_hashes = block_hashes.drain(..block_bodies.len()); @@ -300,13 +292,12 @@ async fn store_block_bodies( #[allow(unused)] async fn store_receipts( mut block_hashes: Vec, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result<(), SyncError> { loop { - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Eth).await; debug!("Requesting Block Headers "); - if let Some(receipts) = peer.request_receipts(block_hashes.clone()).await { + if let Some(receipts) = peers.request_receipts(block_hashes.clone()).await { debug!(" Received {} Receipts", receipts.len()); // Track which blocks we have already fetched receipts for for (block_hash, receipts) in block_hashes.drain(0..receipts.len()).zip(receipts) { @@ -327,7 +318,7 @@ async fn store_receipts( /// Returns true if all state was fetched or false if the block is too old and the state is no longer available async fn rebuild_state_trie( state_root: H256, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result { // Spawn storage & bytecode fetchers @@ -355,12 +346,12 @@ async fn rebuild_state_trie( debug!("Starting/Resuming state trie download from key {start_account_hash}"); // Fetch Account Ranges // If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available - let mut retry_count = 0; let mut progress_timer = Instant::now(); let initial_timestamp = Instant::now(); let initial_account_hash = start_account_hash.into_uint(); + let mut stale = false; const PROGRESS_OUTPUT_TIMER: std::time::Duration = std::time::Duration::from_secs(30); - while retry_count <= MAX_RETRIES { + loop { // Show Progress stats (this task is not vital so we can detach it) if Instant::now().duration_since(progress_timer) >= PROGRESS_OUTPUT_TIMER { progress_timer = Instant::now(); @@ -370,15 +361,12 @@ async fn rebuild_state_trie( initial_timestamp, )); } - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}"); - if let Some((account_hashes, accounts, should_continue)) = peer + if let Some((account_hashes, accounts, should_continue)) = peers .request_account_range(state_root, start_account_hash) .await { debug!("Received {} account ranges", accounts.len()); - // Reset retry counter - retry_count = 0; // Update starting hash for next batch if should_continue { start_account_hash = *account_hashes.last().unwrap(); @@ -425,12 +413,13 @@ async fn rebuild_state_trie( break; } } else { - retry_count += 1; + stale = true; + break; } } // Store current checkpoint store.set_state_trie_root_checkpoint(current_state_root)?; - if retry_count > MAX_RETRIES { + if stale { store.set_state_trie_key_checkpoint(start_account_hash)?; } else { // Set highest key value so we know state sync is already complete on the next cycle @@ -453,7 +442,7 @@ async fn rebuild_state_trie( ); store.set_pending_storage_heal_accounts(stored_pending_storages)?; } - if retry_count > MAX_RETRIES || pending_storages { + if stale || pending_storages { // Skip healing and return stale status return Ok(false); } @@ -480,7 +469,7 @@ async fn rebuild_state_trie( /// Waits for incoming code hashes from the receiver channel endpoint, queues them, and fetches and stores their bytecodes in batches async fn bytecode_fetcher( mut receiver: Receiver>, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result<(), SyncError> { let mut pending_bytecodes: Vec = vec![]; @@ -511,21 +500,18 @@ async fn bytecode_fetcher( /// Receives a batch of code hahses, fetches their respective bytecodes via p2p and returns a list of the code hashes that couldn't be fetched in the request (if applicable) async fn fetch_bytecode_batch( mut batch: Vec, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result, StoreError> { - loop { - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - if let Some(bytecodes) = peer.request_bytecodes(batch.clone()).await { - debug!("Received {} bytecodes", bytecodes.len()); - // Store the bytecodes - for code in bytecodes.into_iter() { - store.add_account_code(batch.remove(0), code)?; - } - // Return remaining code hashes in the batch if we couldn't fetch all of them - return Ok(batch); + if let Some(bytecodes) = peers.request_bytecodes(batch.clone()).await { + debug!("Received {} bytecodes", bytecodes.len()); + // Store the bytecodes + for code in bytecodes.into_iter() { + store.add_account_code(batch.remove(0), code)?; } } + // Return remaining code hashes in the batch if we couldn't fetch all of them + Ok(batch) } /// Waits for incoming account hashes & storage roots from the receiver channel endpoint, queues them, and fetches and stores their bytecodes in batches @@ -533,7 +519,7 @@ async fn fetch_bytecode_batch( /// In the last case, the fetcher will return the account hashes of the accounts in the queue async fn storage_fetcher( mut receiver: Receiver>, - peers: Arc>, + peers: PeerHandler, store: Store, state_root: H256, ) -> Result, SyncError> { @@ -603,7 +589,7 @@ async fn storage_fetcher( async fn fetch_storage_batch( mut batch: Vec<(H256, H256)>, state_root: H256, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result<(Vec<(H256, H256)>, bool), SyncError> { debug!( @@ -611,56 +597,53 @@ async fn fetch_storage_batch( batch.first().unwrap().0, batch.last().unwrap().0 ); - for _ in 0..MAX_RETRIES { - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - let (batch_hahses, batch_roots) = batch.clone().into_iter().unzip(); - if let Some((mut keys, mut values, incomplete)) = peer - .request_storage_ranges(state_root, batch_roots, batch_hahses, H256::zero()) - .await - { - debug!("Received {} storage ranges", keys.len(),); - // Handle incomplete ranges - if incomplete { - // An incomplete range cannot be empty - let (last_keys, last_values) = (keys.pop().unwrap(), values.pop().unwrap()); - // If only one incomplete range is returned then it must belong to a trie that is too big to fit into one request - // We will handle this large trie separately - if keys.is_empty() { - debug!("Large storage trie encountered, handling separately"); - let (account_hash, storage_root) = batch.remove(0); - if handle_large_storage_range( - state_root, - account_hash, - storage_root, - last_keys, - last_values, - peers.clone(), - store.clone(), - ) - .await? - { - // Pivot became stale - // Add trie back to the queue and return stale pivot status - batch.push((account_hash, storage_root)); - return Ok((batch, true)); - } - } - // The incomplete range is not the first, we cannot asume it is a large trie, so lets add it back to the queue - } - // Store the storage ranges & rebuild the storage trie for each account - for (keys, values) in keys.into_iter().zip(values.into_iter()) { + let (batch_hahses, batch_roots) = batch.clone().into_iter().unzip(); + if let Some((mut keys, mut values, incomplete)) = peers + .request_storage_ranges(state_root, batch_roots, batch_hahses, H256::zero()) + .await + { + debug!("Received {} storage ranges", keys.len(),); + // Handle incomplete ranges + if incomplete { + // An incomplete range cannot be empty + let (last_keys, last_values) = (keys.pop().unwrap(), values.pop().unwrap()); + // If only one incomplete range is returned then it must belong to a trie that is too big to fit into one request + // We will handle this large trie separately + if keys.is_empty() { + debug!("Large storage trie encountered, handling separately"); let (account_hash, storage_root) = batch.remove(0); - let mut trie = store.open_storage_trie(account_hash, *EMPTY_TRIE_HASH); - for (key, value) in keys.into_iter().zip(values.into_iter()) { - trie.insert(key.0.to_vec(), value.encode_to_vec())?; - } - if trie.hash()? != storage_root { - warn!("State sync failed for storage root {storage_root}"); + if handle_large_storage_range( + state_root, + account_hash, + storage_root, + last_keys, + last_values, + peers.clone(), + store.clone(), + ) + .await? + { + // Pivot became stale + // Add trie back to the queue and return stale pivot status + batch.push((account_hash, storage_root)); + return Ok((batch, true)); } } - // Return remaining code hashes in the batch if we couldn't fetch all of them - return Ok((batch, false)); + // The incomplete range is not the first, we cannot asume it is a large trie, so lets add it back to the queue } + // Store the storage ranges & rebuild the storage trie for each account + for (keys, values) in keys.into_iter().zip(values.into_iter()) { + let (account_hash, storage_root) = batch.remove(0); + let mut trie = store.open_storage_trie(account_hash, *EMPTY_TRIE_HASH); + for (key, value) in keys.into_iter().zip(values.into_iter()) { + trie.insert(key.0.to_vec(), value.encode_to_vec())?; + } + if trie.hash()? != storage_root { + warn!("State sync failed for storage root {storage_root}"); + } + } + // Return remaining code hashes in the batch if we couldn't fetch all of them + return Ok((batch, false)); } // Pivot became stale Ok((batch, true)) @@ -677,7 +660,7 @@ async fn handle_large_storage_range( storage_root: H256, keys: Vec, values: Vec, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result { // First process the initial range @@ -693,33 +676,29 @@ async fn handle_large_storage_range( }; let mut should_continue = true; // Fetch the remaining range - let mut retry_count = 0; while should_continue { - while retry_count <= MAX_RETRIES { - debug!("Fetching large storage trie, current key: {}", next_key); - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - if let Some((keys, values, incomplete)) = peer - .request_storage_range(state_root, storage_root, account_hash, next_key) - .await - { - next_key = *keys.last().unwrap(); - should_continue = incomplete; - let mut trie = store.open_storage_trie(account_hash, current_root); - for (key, value) in keys.into_iter().zip(values.into_iter()) { - trie.insert(key.0.to_vec(), value.encode_to_vec())?; - } - // Compute current root so we can extend this trie later - current_root = trie.hash()?; - break; - } else { - retry_count += 1; + debug!("Fetching large storage trie, current key: {}", next_key); + + if let Some((keys, values, incomplete)) = peers + .request_storage_range(state_root, storage_root, account_hash, next_key) + .await + { + next_key = *keys.last().unwrap(); + should_continue = incomplete; + let mut trie = store.open_storage_trie(account_hash, current_root); + for (key, value) in keys.into_iter().zip(values.into_iter()) { + trie.insert(key.0.to_vec(), value.encode_to_vec())?; } + // Compute current root so we can extend this trie later + current_root = trie.hash()?; + } else { + return Ok(true); } } - if current_root != storage_root && retry_count <= MAX_RETRIES { + if current_root != storage_root { warn!("State sync failed for storage root {storage_root}"); } - Ok(retry_count > MAX_RETRIES) + Ok(false) } /// Heals the trie given its state_root by fetching any missing nodes in it via p2p @@ -729,7 +708,7 @@ async fn heal_state_trie( state_root: H256, mut current_root: H256, store: Store, - peers: Arc>, + peers: PeerHandler, ) -> Result { // Spawn a storage healer for this blocks's storage let (storage_sender, storage_receiver) = mpsc::channel::>(500); @@ -749,21 +728,17 @@ async fn heal_state_trie( } // Begin by requesting the root node let mut paths = vec![Nibbles::default()]; - // Count the number of request retries so we don't get stuck requesting old state - let mut retry_count = 0; - while !paths.is_empty() && retry_count < MAX_RETRIES { + while !paths.is_empty() { // Fetch the latests paths first to prioritize reaching leaves as soon as possible let batch: Vec = paths .drain(paths.len().saturating_sub(NODE_BATCH_SIZE)..) .collect(); - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - if let Some(nodes) = peer + + if let Some(nodes) = peers .request_state_trienodes(state_root, batch.clone()) .await { debug!("Received {} state nodes", nodes.len()); - // Reset retry counter for next request - retry_count = 0; let mut hahsed_addresses = vec![]; let mut code_hashes = vec![]; // For each fetched node: @@ -810,7 +785,7 @@ async fn heal_state_trie( bytecode_sender.send(code_hashes).await?; } } else { - retry_count += 1; + break; } } debug!("State Healing stopped, signaling storage healer"); @@ -823,7 +798,7 @@ async fn heal_state_trie( if !storage_healing_succesful { store.set_pending_storage_heal_accounts(pending_storage_heal_accounts)?; } - Ok(retry_count < MAX_RETRIES && storage_healing_succesful) + Ok(paths.is_empty() && storage_healing_succesful) } /// Waits for incoming hashed addresses from the receiver channel endpoint and queues the associated root nodes for state retrieval @@ -832,7 +807,7 @@ async fn heal_state_trie( async fn storage_healer( state_root: H256, mut receiver: Receiver>, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result, SyncError> { // Pending list of storages to fetch @@ -885,45 +860,42 @@ async fn storage_healer( async fn heal_storage_batch( state_root: H256, mut batch: BTreeMap)>, - peers: Arc>, + peers: PeerHandler, store: Store, ) -> Result<(BTreeMap)>, bool), SyncError> { - for _ in 0..MAX_RETRIES { - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - let req_batch = batch.iter().map(|(k, v)| (*k, v.1.clone())).collect(); - if let Some(mut nodes) = peer.request_storage_trienodes(state_root, req_batch).await { - debug!("Received {} nodes", nodes.len()); - // Process the nodes for each account path - for (acc_path, (root, paths)) in batch.iter_mut() { - let mut trie = store.open_storage_trie(*acc_path, *root); - // Get the corresponding nodes - for node in nodes.drain(..paths.len().min(nodes.len())) { - let path = paths.remove(0); - // Add children to batch - let children = node_missing_children(&node, &path, trie.state())?; - paths.extend(children); - // If it is a leaf node, insert values into the trie - if let Node::Leaf(leaf) = node { - let path = &path.concat(leaf.partial.clone()).to_bytes(); - if path.len() != 32 { - // Something went wrong - return Err(SyncError::CorruptPath); - } - trie.insert(path.to_vec(), leaf.value.encode_to_vec())?; + let req_batch = batch.iter().map(|(k, v)| (*k, v.1.clone())).collect(); + if let Some(mut nodes) = peers.request_storage_trienodes(state_root, req_batch).await { + debug!("Received {} nodes", nodes.len()); + // Process the nodes for each account path + for (acc_path, (root, paths)) in batch.iter_mut() { + let mut trie = store.open_storage_trie(*acc_path, *root); + // Get the corresponding nodes + for node in nodes.drain(..paths.len().min(nodes.len())) { + let path = paths.remove(0); + // Add children to batch + let children = node_missing_children(&node, &path, trie.state())?; + paths.extend(children); + // If it is a leaf node, insert values into the trie + if let Node::Leaf(leaf) = node { + let path = &path.concat(leaf.partial.clone()).to_bytes(); + if path.len() != 32 { + // Something went wrong + return Err(SyncError::CorruptPath); } + trie.insert(path.to_vec(), leaf.value.encode_to_vec())?; } - // Update current root - *root = trie.hash()?; - // Cut the loop if we ran out of nodes - if nodes.is_empty() { - break; - } } - // Return remaining and added paths to be added to the queue - // Filter out the storages we completely fetched - batch.retain(|_, v| !v.1.is_empty()); - return Ok((batch, false)); + // Update current root + *root = trie.hash()?; + // Cut the loop if we ran out of nodes + if nodes.is_empty() { + break; + } } + // Return remaining and added paths to be added to the queue + // Filter out the storages we completely fetched + batch.retain(|_, v| !v.1.is_empty()); + return Ok((batch, false)); } // Pivot became stale, lets inform the fetcher Ok((batch, true)) @@ -989,27 +961,6 @@ fn seconds_to_readable(seconds: U512) -> String { } format!("{hours}h{minutes}m{seconds}s") } -/// Returns the channel ends to an active peer connection that supports the given capability -/// The peer is selected randomly, and doesn't guarantee that the selected peer is not currently busy -/// If no peer is found, this method will try again after 10 seconds -async fn get_peer_channel_with_retry( - table: Arc>, - capability: Capability, -) -> PeerChannels { - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - loop { - let table = table.lock().await; - table.show_peer_stats(); - if let Some(channels) = table.get_peer_channels(capability.clone()) { - return channels; - }; - // drop the lock early to no block the rest of processes - drop(table); - info!("[Sync] No peers available, retrying in 10 sec"); - // This is the unlikely case where we just started the node and don't have peers, wait a bit and try again - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; - } -} #[derive(thiserror::Error, Debug)] enum SyncError {