Skip to content

Commit

Permalink
feat(l1): skip state sync if it was already completed (#1829)
Browse files Browse the repository at this point in the history
**Motivation**
There is no reason to attempt to resume a state sync if the previous
cycle already moved on to healing, it could lead to corrupted state as
the trie is currently being healed and in an inconsistent state
<!-- Why does this pull request exist? What are its goals? -->

**Description**
* Skip to healing if a previous cycle already completed state sync
<!-- A clear and concise general description of the changes this PR
introduces -->

<!-- Link to issues: Resolves #111, Resolves #222 -->

Closes #issue_number
  • Loading branch information
fmoletta authored Jan 30, 2025
1 parent 4d26c98 commit 00fffd9
Showing 1 changed file with 113 additions and 102 deletions.
215 changes: 113 additions & 102 deletions crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use tokio::{
use tracing::{debug, info, warn};

use crate::{kademlia::KademliaTable, peer_channels::BlockRequestOrder};
use crate::{peer_channels::PeerChannels, rlpx::p2p::Capability};
use crate::{
peer_channels::{PeerChannels, HASH_MAX},
rlpx::p2p::Capability,
};

/// Maximum amount of times we will ask a peer for an account/storage range
/// If the max amount of retries is exceeded we will asume that the state we are requesting is old and no longer available
Expand Down Expand Up @@ -327,130 +330,138 @@ async fn rebuild_state_trie(
peers: Arc<Mutex<KademliaTable>>,
store: Store,
) -> Result<bool, SyncError> {
debug!("Rebuilding State Trie");
// Spawn storage & bytecode fetchers
let (bytecode_sender, bytecode_receiver) = mpsc::channel::<Vec<H256>>(500);
let (storage_sender, storage_receiver) = mpsc::channel::<Vec<(H256, H256)>>(500);
let bytecode_fetcher_handle = tokio::spawn(bytecode_fetcher(
bytecode_receiver,
peers.clone(),
store.clone(),
));
let storage_fetcher_handle = tokio::spawn(storage_fetcher(
storage_receiver,
peers.clone(),
store.clone(),
state_root,
));
// Resume download from checkpoint if available or start from an empty trie
// We cannot keep an open trie here so we will track the root between lookups
let mut current_state_root = store
.get_state_trie_root_checkpoint()?
.unwrap_or(*EMPTY_TRIE_HASH);
let mut start_account_hash = store.get_state_trie_key_checkpoint()?.unwrap_or_default();
debug!("Starting/Resuming state trie download from key {start_account_hash}");
// Fetch Account Ranges
// If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available
let mut retry_count = 0;
let mut progress_timer = Instant::now();
let initial_timestamp = Instant::now();
let initial_account_hash = start_account_hash.into_uint();
const PROGRESS_OUTPUT_TIMER: std::time::Duration = std::time::Duration::from_secs(30);
while retry_count <= MAX_RETRIES {
// Show Progress stats (this task is not vital so we can detach it)
if Instant::now().duration_since(progress_timer) >= PROGRESS_OUTPUT_TIMER {
progress_timer = Instant::now();
tokio::spawn(show_progress(
start_account_hash,
initial_account_hash,
initial_timestamp,
));
}
let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await;
debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}");
if let Some((account_hashes, accounts, should_continue)) = peer
.request_account_range(state_root, start_account_hash)
.await
{
debug!("Received {} account ranges", accounts.len());
// Reset retry counter
retry_count = 0;
// Update starting hash for next batch
if should_continue {
start_account_hash = *account_hashes.last().unwrap();
// Skip state sync if we are already on healing
if start_account_hash != HASH_MAX {
let (storage_sender, storage_receiver) = mpsc::channel::<Vec<(H256, H256)>>(500);
let storage_fetcher_handle = tokio::spawn(storage_fetcher(
storage_receiver,
peers.clone(),
store.clone(),
state_root,
));
debug!("Starting/Resuming state trie download from key {start_account_hash}");
// Fetch Account Ranges
// If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available
let mut retry_count = 0;
let mut progress_timer = Instant::now();
let initial_timestamp = Instant::now();
let initial_account_hash = start_account_hash.into_uint();
const PROGRESS_OUTPUT_TIMER: std::time::Duration = std::time::Duration::from_secs(30);
while retry_count <= MAX_RETRIES {
// Show Progress stats (this task is not vital so we can detach it)
if Instant::now().duration_since(progress_timer) >= PROGRESS_OUTPUT_TIMER {
progress_timer = Instant::now();
tokio::spawn(show_progress(
start_account_hash,
initial_account_hash,
initial_timestamp,
));
}
// Fetch Account Storage & Bytecode
let mut code_hashes = vec![];
let mut account_hashes_and_storage_roots = vec![];
for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) {
// Build the batch of code hashes to send to the bytecode fetcher
// Ignore accounts without code / code we already have stored
if account.code_hash != *EMPTY_KECCACK_HASH
&& store.get_account_code(account.code_hash)?.is_none()
{
code_hashes.push(account.code_hash)
let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await;
debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}");
if let Some((account_hashes, accounts, should_continue)) = peer
.request_account_range(state_root, start_account_hash)
.await
{
debug!("Received {} account ranges", accounts.len());
// Reset retry counter
retry_count = 0;
// Update starting hash for next batch
if should_continue {
start_account_hash = *account_hashes.last().unwrap();
}
// Build the batch of hashes and roots to send to the storage fetcher
// Ignore accounts without storage and account's which storage hasn't changed from our current stored state
if account.storage_root != *EMPTY_TRIE_HASH
&& !store.contains_storage_node(*account_hash, account.storage_root)?
{
account_hashes_and_storage_roots.push((*account_hash, account.storage_root));
// Fetch Account Storage & Bytecode
let mut code_hashes = vec![];
let mut account_hashes_and_storage_roots = vec![];
for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) {
// Build the batch of code hashes to send to the bytecode fetcher
// Ignore accounts without code / code we already have stored
if account.code_hash != *EMPTY_KECCACK_HASH
&& store.get_account_code(account.code_hash)?.is_none()
{
code_hashes.push(account.code_hash)
}
// Build the batch of hashes and roots to send to the storage fetcher
// Ignore accounts without storage and account's which storage hasn't changed from our current stored state
if account.storage_root != *EMPTY_TRIE_HASH
&& !store.contains_storage_node(*account_hash, account.storage_root)?
{
account_hashes_and_storage_roots
.push((*account_hash, account.storage_root));
}
}
}
// Send code hash batch to the bytecode fetcher
if !code_hashes.is_empty() {
bytecode_sender.send(code_hashes).await?;
}
// Send hash and root batch to the storage fetcher
if !account_hashes_and_storage_roots.is_empty() {
storage_sender
.send(account_hashes_and_storage_roots)
.await?;
}
// Update trie
let mut trie = store.open_state_trie(current_state_root);
for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) {
trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?;
}
current_state_root = trie.hash()?;
// Send code hash batch to the bytecode fetcher
if !code_hashes.is_empty() {
bytecode_sender.send(code_hashes).await?;
}
// Send hash and root batch to the storage fetcher
if !account_hashes_and_storage_roots.is_empty() {
storage_sender
.send(account_hashes_and_storage_roots)
.await?;
}
// Update trie
let mut trie = store.open_state_trie(current_state_root);
for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) {
trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?;
}
current_state_root = trie.hash()?;

if !should_continue {
// All accounts fetched!
break;
if !should_continue {
// All accounts fetched!
break;
}
} else {
retry_count += 1;
}
} else {
retry_count += 1;
}
}
if retry_count > MAX_RETRIES {
// Store current checkpoint
store.set_state_trie_root_checkpoint(current_state_root)?;
store.set_state_trie_key_checkpoint(start_account_hash)?;
}
debug!("Account Trie Fetching ended, signaling storage fetcher process");
// Send empty batch to signal that no more batches are incoming
storage_sender.send(vec![]).await?;
let pending_storage_accounts = storage_fetcher_handle.await??;
let pending_storages = !pending_storage_accounts.is_empty();
// Next cycle may have different storage roots for these accounts so we will leave them to healing
if pending_storages {
let mut stored_pending_storages = store
.get_pending_storage_heal_accounts()?
.unwrap_or_default();
stored_pending_storages.extend(pending_storage_accounts);
debug!(
"Current pending storage accounts: {}",
stored_pending_storages.len()
);
store.set_pending_storage_heal_accounts(stored_pending_storages)?;
}
if retry_count > MAX_RETRIES || pending_storages {
// Skip healing and return stale status
return Ok(false);
if retry_count > MAX_RETRIES {
store.set_state_trie_key_checkpoint(start_account_hash)?;
} else {
// Set highest key value so we know state sync is already complete on the next cycle
store.set_state_trie_key_checkpoint(HASH_MAX)?;
}
debug!("Account Trie Fetching ended, signaling storage fetcher process");
// Send empty batch to signal that no more batches are incoming
storage_sender.send(vec![]).await?;
let pending_storage_accounts = storage_fetcher_handle.await??;
let pending_storages = !pending_storage_accounts.is_empty();
// Next cycle may have different storage roots for these accounts so we will leave them to healing
if pending_storages {
let mut stored_pending_storages = store
.get_pending_storage_heal_accounts()?
.unwrap_or_default();
stored_pending_storages.extend(pending_storage_accounts);
info!(
"Current pending storage accounts: {}",
stored_pending_storages.len()
);
store.set_pending_storage_heal_accounts(stored_pending_storages)?;
}
if retry_count > MAX_RETRIES || pending_storages {
// Skip healing and return stale status
return Ok(false);
}
info!("Healing Start")
} else {
info!("Resuming healing")
}
// Perform state healing to fix inconsistencies with older state
info!("Starting state healing");
let res = heal_state_trie(
bytecode_sender.clone(),
state_root,
Expand Down

0 comments on commit 00fffd9

Please sign in to comment.