diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 3b0497c38c..dfee07eb5b 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -17,7 +17,10 @@ use tokio::{ use tracing::{debug, info, warn}; use crate::{kademlia::KademliaTable, peer_channels::BlockRequestOrder}; -use crate::{peer_channels::PeerChannels, rlpx::p2p::Capability}; +use crate::{ + peer_channels::{PeerChannels, HASH_MAX}, + rlpx::p2p::Capability, +}; /// Maximum amount of times we will ask a peer for an account/storage range /// If the max amount of retries is exceeded we will asume that the state we are requesting is old and no longer available @@ -327,130 +330,138 @@ async fn rebuild_state_trie( peers: Arc>, store: Store, ) -> Result { - debug!("Rebuilding State Trie"); // Spawn storage & bytecode fetchers let (bytecode_sender, bytecode_receiver) = mpsc::channel::>(500); - let (storage_sender, storage_receiver) = mpsc::channel::>(500); let bytecode_fetcher_handle = tokio::spawn(bytecode_fetcher( bytecode_receiver, peers.clone(), store.clone(), )); - let storage_fetcher_handle = tokio::spawn(storage_fetcher( - storage_receiver, - peers.clone(), - store.clone(), - state_root, - )); // Resume download from checkpoint if available or start from an empty trie // We cannot keep an open trie here so we will track the root between lookups let mut current_state_root = store .get_state_trie_root_checkpoint()? .unwrap_or(*EMPTY_TRIE_HASH); let mut start_account_hash = store.get_state_trie_key_checkpoint()?.unwrap_or_default(); - debug!("Starting/Resuming state trie download from key {start_account_hash}"); - // Fetch Account Ranges - // If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available - let mut retry_count = 0; - let mut progress_timer = Instant::now(); - let initial_timestamp = Instant::now(); - let initial_account_hash = start_account_hash.into_uint(); - const PROGRESS_OUTPUT_TIMER: std::time::Duration = std::time::Duration::from_secs(30); - while retry_count <= MAX_RETRIES { - // Show Progress stats (this task is not vital so we can detach it) - if Instant::now().duration_since(progress_timer) >= PROGRESS_OUTPUT_TIMER { - progress_timer = Instant::now(); - tokio::spawn(show_progress( - start_account_hash, - initial_account_hash, - initial_timestamp, - )); - } - let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}"); - if let Some((account_hashes, accounts, should_continue)) = peer - .request_account_range(state_root, start_account_hash) - .await - { - debug!("Received {} account ranges", accounts.len()); - // Reset retry counter - retry_count = 0; - // Update starting hash for next batch - if should_continue { - start_account_hash = *account_hashes.last().unwrap(); + // Skip state sync if we are already on healing + if start_account_hash != HASH_MAX { + let (storage_sender, storage_receiver) = mpsc::channel::>(500); + let storage_fetcher_handle = tokio::spawn(storage_fetcher( + storage_receiver, + peers.clone(), + store.clone(), + state_root, + )); + debug!("Starting/Resuming state trie download from key {start_account_hash}"); + // Fetch Account Ranges + // If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available + let mut retry_count = 0; + let mut progress_timer = Instant::now(); + let initial_timestamp = Instant::now(); + let initial_account_hash = start_account_hash.into_uint(); + const PROGRESS_OUTPUT_TIMER: std::time::Duration = std::time::Duration::from_secs(30); + while retry_count <= MAX_RETRIES { + // Show Progress stats (this task is not vital so we can detach it) + if Instant::now().duration_since(progress_timer) >= PROGRESS_OUTPUT_TIMER { + progress_timer = Instant::now(); + tokio::spawn(show_progress( + start_account_hash, + initial_account_hash, + initial_timestamp, + )); } - // Fetch Account Storage & Bytecode - let mut code_hashes = vec![]; - let mut account_hashes_and_storage_roots = vec![]; - for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) { - // Build the batch of code hashes to send to the bytecode fetcher - // Ignore accounts without code / code we already have stored - if account.code_hash != *EMPTY_KECCACK_HASH - && store.get_account_code(account.code_hash)?.is_none() - { - code_hashes.push(account.code_hash) + let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; + debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}"); + if let Some((account_hashes, accounts, should_continue)) = peer + .request_account_range(state_root, start_account_hash) + .await + { + debug!("Received {} account ranges", accounts.len()); + // Reset retry counter + retry_count = 0; + // Update starting hash for next batch + if should_continue { + start_account_hash = *account_hashes.last().unwrap(); } - // Build the batch of hashes and roots to send to the storage fetcher - // Ignore accounts without storage and account's which storage hasn't changed from our current stored state - if account.storage_root != *EMPTY_TRIE_HASH - && !store.contains_storage_node(*account_hash, account.storage_root)? - { - account_hashes_and_storage_roots.push((*account_hash, account.storage_root)); + // Fetch Account Storage & Bytecode + let mut code_hashes = vec![]; + let mut account_hashes_and_storage_roots = vec![]; + for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) { + // Build the batch of code hashes to send to the bytecode fetcher + // Ignore accounts without code / code we already have stored + if account.code_hash != *EMPTY_KECCACK_HASH + && store.get_account_code(account.code_hash)?.is_none() + { + code_hashes.push(account.code_hash) + } + // Build the batch of hashes and roots to send to the storage fetcher + // Ignore accounts without storage and account's which storage hasn't changed from our current stored state + if account.storage_root != *EMPTY_TRIE_HASH + && !store.contains_storage_node(*account_hash, account.storage_root)? + { + account_hashes_and_storage_roots + .push((*account_hash, account.storage_root)); + } } - } - // Send code hash batch to the bytecode fetcher - if !code_hashes.is_empty() { - bytecode_sender.send(code_hashes).await?; - } - // Send hash and root batch to the storage fetcher - if !account_hashes_and_storage_roots.is_empty() { - storage_sender - .send(account_hashes_and_storage_roots) - .await?; - } - // Update trie - let mut trie = store.open_state_trie(current_state_root); - for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) { - trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; - } - current_state_root = trie.hash()?; + // Send code hash batch to the bytecode fetcher + if !code_hashes.is_empty() { + bytecode_sender.send(code_hashes).await?; + } + // Send hash and root batch to the storage fetcher + if !account_hashes_and_storage_roots.is_empty() { + storage_sender + .send(account_hashes_and_storage_roots) + .await?; + } + // Update trie + let mut trie = store.open_state_trie(current_state_root); + for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) { + trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; + } + current_state_root = trie.hash()?; - if !should_continue { - // All accounts fetched! - break; + if !should_continue { + // All accounts fetched! + break; + } + } else { + retry_count += 1; } - } else { - retry_count += 1; } - } - if retry_count > MAX_RETRIES { // Store current checkpoint store.set_state_trie_root_checkpoint(current_state_root)?; - store.set_state_trie_key_checkpoint(start_account_hash)?; - } - debug!("Account Trie Fetching ended, signaling storage fetcher process"); - // Send empty batch to signal that no more batches are incoming - storage_sender.send(vec![]).await?; - let pending_storage_accounts = storage_fetcher_handle.await??; - let pending_storages = !pending_storage_accounts.is_empty(); - // Next cycle may have different storage roots for these accounts so we will leave them to healing - if pending_storages { - let mut stored_pending_storages = store - .get_pending_storage_heal_accounts()? - .unwrap_or_default(); - stored_pending_storages.extend(pending_storage_accounts); - debug!( - "Current pending storage accounts: {}", - stored_pending_storages.len() - ); - store.set_pending_storage_heal_accounts(stored_pending_storages)?; - } - if retry_count > MAX_RETRIES || pending_storages { - // Skip healing and return stale status - return Ok(false); + if retry_count > MAX_RETRIES { + store.set_state_trie_key_checkpoint(start_account_hash)?; + } else { + // Set highest key value so we know state sync is already complete on the next cycle + store.set_state_trie_key_checkpoint(HASH_MAX)?; + } + debug!("Account Trie Fetching ended, signaling storage fetcher process"); + // Send empty batch to signal that no more batches are incoming + storage_sender.send(vec![]).await?; + let pending_storage_accounts = storage_fetcher_handle.await??; + let pending_storages = !pending_storage_accounts.is_empty(); + // Next cycle may have different storage roots for these accounts so we will leave them to healing + if pending_storages { + let mut stored_pending_storages = store + .get_pending_storage_heal_accounts()? + .unwrap_or_default(); + stored_pending_storages.extend(pending_storage_accounts); + info!( + "Current pending storage accounts: {}", + stored_pending_storages.len() + ); + store.set_pending_storage_heal_accounts(stored_pending_storages)?; + } + if retry_count > MAX_RETRIES || pending_storages { + // Skip healing and return stale status + return Ok(false); + } + info!("Healing Start") + } else { + info!("Resuming healing") } // Perform state healing to fix inconsistencies with older state - info!("Starting state healing"); let res = heal_state_trie( bytecode_sender.clone(), state_root,