From 331b9824f29a10985a64bac3e838be865689aad4 Mon Sep 17 00:00:00 2001 From: Keefe Liu Date: Mon, 21 Oct 2024 18:28:21 +0800 Subject: [PATCH] feat: enable prefetch on the new engine --- Cargo.lock | 1 + crates/blockchain-tree/src/chain.rs | 28 ++--- crates/engine/service/src/service.rs | 3 + crates/engine/tree/Cargo.toml | 1 + crates/engine/tree/src/tree/mod.rs | 58 +++++++++- crates/node/builder/src/launch/engine.rs | 2 + crates/trie/parallel/src/parallel_root.rs | 8 +- crates/trie/prefetch/src/lib.rs | 7 +- crates/trie/prefetch/src/prefetch.rs | 125 +++++++++++++++------- crates/trie/prefetch/src/stats.rs | 45 ++++++++ 10 files changed, 216 insertions(+), 62 deletions(-) create mode 100644 crates/trie/prefetch/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index d014e3b166..cb8df23d68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7914,6 +7914,7 @@ dependencies = [ "reth-tracing", "reth-trie", "reth-trie-parallel", + "reth-trie-prefetch", "thiserror", "tokio", "tracing", diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index b158530f4a..97ee4c083a 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -30,7 +30,6 @@ use std::{ clone::Clone, collections::{BTreeMap, HashMap}, ops::{Deref, DerefMut}, - sync::Arc, time::Instant, }; @@ -231,14 +230,9 @@ impl AppendableChain { let initial_execution_outcome = ExecutionOutcome::from((state, block.number)); - // stop the prefetch task. - if let Some(interrupt_tx) = interrupt_tx { - let _ = interrupt_tx.send(()); - } - // check state root if the block extends the canonical chain __and__ if state root // validation was requested. - if block_validation_kind.is_exhaustive() { + let result = if block_validation_kind.is_exhaustive() { // calculate and check state root let start = Instant::now(); let (state_root, trie_updates) = if block_attachment.is_canonical() { @@ -283,7 +277,14 @@ impl AppendableChain { Ok((initial_execution_outcome, trie_updates)) } else { Ok((initial_execution_outcome, None)) - } + }; + + // stop the prefetch task. + if let Some(interrupt_tx) = interrupt_tx { + let _ = interrupt_tx.send(()); + }; + + result } /// Validate and execute the given block, and append it to this chain. @@ -356,18 +357,11 @@ impl AppendableChain { let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel(); let mut trie_prefetch = TriePrefetch::new(); - let consistent_view = if let Ok(view) = - ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone()) - { - view - } else { - tracing::debug!("Failed to create consistent view for trie prefetch"); - return (None, None) - }; + let provider_factory = externals.provider_factory.clone(); tokio::spawn({ async move { - trie_prefetch.run(Arc::new(consistent_view), prefetch_rx, interrupt_rx).await; + trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await; } }); diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 3ee9504536..1097cd2e00 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -79,6 +79,7 @@ where invalid_block_hook: Box, sync_metrics_tx: MetricEventsSender, skip_state_root_validation: bool, + enable_prefetch: bool, ) -> Self { let downloader = BasicBlockDownloader::new(client, consensus.clone()); @@ -99,6 +100,7 @@ where tree_config, invalid_block_hook, skip_state_root_validation, + enable_prefetch, ); let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); @@ -212,6 +214,7 @@ mod tests { Box::new(NoopInvalidBlockHook::default()), sync_metrics_tx, false, + false, ); } } diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 486f103c81..b938969f73 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -34,6 +34,7 @@ reth-tasks.workspace = true reth-node-types.workspace = true reth-trie.workspace = true reth-trie-parallel.workspace = true +reth-trie-prefetch.workspace = true # common futures.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index d5612b3025..3c57a74a47 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -23,8 +23,8 @@ use reth_payload_builder::PayloadBuilderHandle; use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes}; use reth_payload_validator::ExecutionPayloadValidator; use reth_primitives::{ - Block, BlockNumHash, BlockNumber, GotExpected, Header, SealedBlock, SealedBlockWithSenders, - SealedHeader, B256, U256, + revm_primitives::EvmState, Block, BlockNumHash, BlockNumber, GotExpected, Header, SealedBlock, + SealedBlockWithSenders, SealedHeader, B256, U256, }; use reth_provider::{ providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, @@ -42,6 +42,7 @@ use reth_rpc_types::{ use reth_stages_api::ControlFlow; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie_parallel::parallel_root::ParallelStateRoot; +use reth_trie_prefetch::TriePrefetch; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque}, @@ -497,6 +498,8 @@ pub struct EngineApiTreeHandler { invalid_block_hook: Box, /// Flag indicating whether the state root validation should be skipped. skip_state_root_validation: bool, + /// Flag indicating whether to enable prefetch. + enable_prefetch: bool, } impl std::fmt::Debug for EngineApiTreeHandler { @@ -516,6 +519,8 @@ impl std::fmt::Debug for EngineApiTr .field("config", &self.config) .field("metrics", &self.metrics) .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook)) + .field("skip_state_root_validation", &self.skip_state_root_validation) + .field("enable_prefetch", &self.enable_prefetch) .finish() } } @@ -542,6 +547,7 @@ where payload_builder: PayloadBuilderHandle, config: TreeConfig, skip_state_root_validation: bool, + enable_prefetch: bool, ) -> Self { let (incoming_tx, incoming) = std::sync::mpsc::channel(); Self { @@ -562,6 +568,7 @@ where incoming_tx, invalid_block_hook: Box::new(NoopInvalidBlockHook), skip_state_root_validation, + enable_prefetch, } } @@ -587,6 +594,7 @@ where config: TreeConfig, invalid_block_hook: Box, skip_state_root_validation: bool, + enable_prefetch: bool, ) -> (Sender>>, UnboundedReceiver) { let best_block_number = provider.best_block_number().unwrap_or(0); let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); @@ -618,10 +626,20 @@ where payload_builder, config, skip_state_root_validation, + enable_prefetch, ); task.set_invalid_block_hook(invalid_block_hook); let incoming = task.incoming_tx.clone(); - std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap(); + std::thread::Builder::new() + .name("Tree Task".to_string()) + .spawn(move || { + let runtime = + tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + runtime.block_on(async { + task.run(); + }); + }) + .unwrap(); (incoming, outgoing) } @@ -2145,8 +2163,16 @@ where return Err(e.into()) } - let executor = - self.executor_provider.executor(StateProviderDatabase::new(&state_provider), None); + let (prefetch_tx, interrupt_tx) = + if self.enable_prefetch && !self.skip_state_root_validation { + self.setup_prefetch() + } else { + (None, None) + }; + + let executor = self + .executor_provider + .executor(StateProviderDatabase::new(&state_provider), prefetch_tx); let block_number = block.number; let block_hash = block.hash(); @@ -2218,6 +2244,11 @@ where state_provider.state_root_with_updates(hashed_state.clone())? }; + // stop the prefetch task. + if let Some(interrupt_tx) = interrupt_tx { + let _ = interrupt_tx.send(()); + }; + if state_root != block.state_root { // call post-block hook self.invalid_block_hook.on_invalid_block( @@ -2531,6 +2562,22 @@ where ); Ok(()) } + + fn setup_prefetch(&self) -> (Option>, Option>) { + let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel(); + let (interrupt_tx, interrupt_rx) = oneshot::channel(); + + let mut trie_prefetch = TriePrefetch::new(); + let provider_factory = self.provider.clone(); + + tokio::spawn({ + async move { + trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await; + } + }); + + (Some(prefetch_tx), Some(interrupt_tx)) + } } /// This is an error that can come from advancing persistence. Either this can be a @@ -2726,6 +2773,7 @@ mod tests { payload_builder, TreeConfig::default(), false, + false, ); let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone()); diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 7c27442848..ab621a61f4 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -241,6 +241,7 @@ where ctx.invalid_block_hook()?, ctx.sync_metrics_tx(), ctx.node_config().skip_state_root_validation, + ctx.node_config().enable_prefetch, ); eth_service } @@ -274,6 +275,7 @@ where ctx.invalid_block_hook()?, ctx.sync_metrics_tx(), ctx.node_config().skip_state_root_validation, + ctx.node_config().enable_prefetch, ); eth_service } diff --git a/crates/trie/parallel/src/parallel_root.rs b/crates/trie/parallel/src/parallel_root.rs index e63c3f1a17..369eaca2bf 100644 --- a/crates/trie/parallel/src/parallel_root.rs +++ b/crates/trie/parallel/src/parallel_root.rs @@ -136,14 +136,17 @@ where hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, ); + let account_tree_start = std::time::Instant::now(); let mut hash_builder = HashBuilder::default().with_updates(retain_updates); let mut account_rlp = Vec::with_capacity(128); while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? { match node { TrieElement::Branch(node) => { + tracker.inc_branch(); hash_builder.add_branch(node.key, node.value, node.children_are_in_trie); } TrieElement::Leaf(hashed_address, account) => { + tracker.inc_leaf(); let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) { Some(result) => result, // Since we do not store all intermediate nodes in the database, there might @@ -181,15 +184,18 @@ where prefix_sets.destroyed_accounts, ); + let account_tree_duration = account_tree_start.elapsed(); let stats = tracker.finish(); #[cfg(feature = "metrics")] self.metrics.record_state_trie(stats); - trace!( + debug!( target: "trie::parallel_state_root", %root, duration = ?stats.duration(), + account_tree_duration = ?account_tree_duration, + storage_trees_duration = ?(stats.duration() - account_tree_duration), branches_added = stats.branches_added(), leaves_added = stats.leaves_added(), missed_leaves = stats.missed_leaves(), diff --git a/crates/trie/prefetch/src/lib.rs b/crates/trie/prefetch/src/lib.rs index 20e9d984a3..c3e9e92d41 100644 --- a/crates/trie/prefetch/src/lib.rs +++ b/crates/trie/prefetch/src/lib.rs @@ -7,8 +7,11 @@ )] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +pub use prefetch::TriePrefetch; pub use reth_trie_parallel::StorageRootTargets; +/// Trie prefetch stats. +pub mod stats; + /// Implementation of trie prefetch. -mod prefetch; -pub use prefetch::TriePrefetch; +pub mod prefetch; diff --git a/crates/trie/prefetch/src/prefetch.rs b/crates/trie/prefetch/src/prefetch.rs index 24a4d368d9..b27da82d43 100644 --- a/crates/trie/prefetch/src/prefetch.rs +++ b/crates/trie/prefetch/src/prefetch.rs @@ -1,3 +1,5 @@ +use std::{collections::HashMap, sync::Arc}; + use rayon::prelude::*; use reth_execution_errors::StorageRootError; use reth_primitives::{revm_primitives::EvmState, B256}; @@ -15,14 +17,15 @@ use reth_trie::{ }; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use reth_trie_parallel::{parallel_root::ParallelStateRootError, StorageRootTargets}; -use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use tokio::{ - sync::{mpsc::UnboundedReceiver, oneshot::Receiver}, + sync::{mpsc::UnboundedReceiver, oneshot::Receiver, Mutex}, task::JoinSet, }; use tracing::{debug, trace}; +use crate::stats::TriePrefetchTracker; + /// Prefetch trie storage when executing transactions. #[derive(Debug, Clone)] pub struct TriePrefetch { @@ -55,31 +58,53 @@ impl TriePrefetch { /// Run the prefetching task. pub async fn run( &mut self, - consistent_view: Arc>, + provider_factory: Factory, mut prefetch_rx: UnboundedReceiver, mut interrupt_rx: Receiver<()>, ) where - Factory: DatabaseProviderFactory + Send + Sync + 'static, + Factory: DatabaseProviderFactory + Clone + 'static, { let mut join_set = JoinSet::new(); + let arc_tracker = Arc::new(Mutex::new(TriePrefetchTracker::default())); loop { tokio::select! { state = prefetch_rx.recv() => { if let Some(state) = state { - let consistent_view = Arc::clone(&consistent_view); let hashed_state = self.deduplicate_and_update_cached(state); let self_clone = Arc::new(self.clone()); + let consistent_view = ConsistentDbView::new_with_latest_tip(provider_factory.clone()).unwrap(); + let hashed_state_clone = hashed_state.clone(); + let arc_tracker_clone = Arc::clone(&arc_tracker); + join_set.spawn(async move { + if let Err(e) = self_clone.prefetch_accounts::(consistent_view, hashed_state_clone, arc_tracker_clone).await { + debug!(target: "trie::trie_prefetch", ?e, "Error while prefetching account trie storage"); + }; + }); + + let self_clone = Arc::new(self.clone()); + let consistent_view = ConsistentDbView::new_with_latest_tip(provider_factory.clone()).unwrap(); join_set.spawn(async move { - if let Err(e) = self_clone.prefetch_once(consistent_view, hashed_state).await { - debug!(target: "trie::trie_prefetch", ?e, "Error while prefetching trie storage"); + if let Err(e) = self_clone.prefetch_storages::(consistent_view, hashed_state).await { + debug!(target: "trie::trie_prefetch", ?e, "Error while prefetching storage trie storage"); }; }); } } + _ = &mut interrupt_rx => { - debug!(target: "trie::trie_prefetch", "Interrupted trie prefetch task. Unprocessed tx {:?}", prefetch_rx.len()); + let stat = arc_tracker.lock().await.finish(); + debug!( + target: "trie::trie_prefetch", + unprocessed_tx = prefetch_rx.len(), + accounts_cached = self.cached_accounts.len(), + storages_cached = self.cached_storages.len(), + branches_prefetched = stat.branches_prefetched(), + leaves_prefetched = stat.leaves_prefetched(), + "trie prefetch interrupted" + ); + join_set.abort_all(); return } @@ -94,9 +119,7 @@ impl TriePrefetch { // deduplicate accounts if their keys are not present in storages for (address, account) in &hashed_state.accounts { - if !hashed_state.storages.contains_key(address) && - !self.cached_accounts.contains_key(address) - { + if !self.cached_accounts.contains_key(address) { self.cached_accounts.insert(*address, true); new_hashed_state.accounts.insert(*address, *account); } @@ -137,14 +160,15 @@ impl TriePrefetch { new_hashed_state } - /// Prefetch trie storage for the given hashed state. - pub async fn prefetch_once( + /// Prefetch account trie nodes for the given hashed state. + pub async fn prefetch_accounts( self: Arc, - consistent_view: Arc>, + consistent_view: ConsistentDbView, hashed_state: HashedPostState, + arc_prefetch_tracker: Arc>, ) -> Result<(), TriePrefetchError> where - Factory: DatabaseProviderFactory + Send + Sync + 'static, + Factory: DatabaseProviderFactory, { let mut tracker = TrieTracker::default(); @@ -155,28 +179,10 @@ impl TriePrefetch { ); let hashed_state_sorted = hashed_state.into_sorted(); - trace!(target: "trie::trie_prefetch", "start prefetching trie storages"); + // Solely for marking storage roots, so precise calculations are not necessary. let mut storage_roots = storage_root_targets .into_par_iter() - .map(|(hashed_address, prefix_set)| { - let provider_ro = consistent_view.provider_ro()?; - let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref()); - let hashed_cursor_factory = HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), - &hashed_state_sorted, - ); - let storage_root_result = StorageRoot::new_hashed( - trie_cursor_factory, - hashed_cursor_factory, - hashed_address, - #[cfg(feature = "metrics")] - self.metrics.clone(), - ) - .with_prefix_set(prefix_set) - .prefetch(); - - Ok((hashed_address, storage_root_result?)) - }) + .map(|(hashed_address, _)| Ok((hashed_address, 1))) .collect::, ParallelStateRootError>>()?; trace!(target: "trie::trie_prefetch", "prefetching account tries"); @@ -204,6 +210,7 @@ impl TriePrefetch { tracker.inc_branch(); } TrieElement::Leaf(hashed_address, _) => { + tracker.inc_leaf(); match storage_roots.remove(&hashed_address) { Some(result) => result, // Since we do not store all intermediate nodes in the database, there might @@ -219,17 +226,19 @@ impl TriePrefetch { .ok() .unwrap_or_default(), }; - tracker.inc_leaf(); } } } let stats = tracker.finish(); + let mut prefetch_tracker = arc_prefetch_tracker.lock().await; + prefetch_tracker.inc_branches(stats.branches_added()); + prefetch_tracker.inc_leaves(stats.leaves_added()); #[cfg(feature = "metrics")] self.metrics.record(stats); - trace!( + debug!( target: "trie::trie_prefetch", duration = ?stats.duration(), branches_added = stats.branches_added(), @@ -239,6 +248,48 @@ impl TriePrefetch { Ok(()) } + + /// Prefetch storage trie nodes for the given hashed state. + pub async fn prefetch_storages( + self: Arc, + consistent_view: ConsistentDbView, + hashed_state: HashedPostState, + ) -> Result<(), TriePrefetchError> + where + Factory: DatabaseProviderFactory, + { + let prefix_sets = hashed_state.construct_prefix_sets().freeze(); + let storage_root_targets = StorageRootTargets::new( + hashed_state.accounts.keys().copied(), + prefix_sets.storage_prefix_sets, + ); + let hashed_state_sorted = hashed_state.into_sorted(); + + storage_root_targets + .into_par_iter() + .map(|(hashed_address, prefix_set)| { + let provider_ro = consistent_view.provider_ro()?; + let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref()); + let hashed_cursor_factory = HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + &hashed_state_sorted, + ); + let storage_root_result = StorageRoot::new_hashed( + trie_cursor_factory, + hashed_cursor_factory, + hashed_address, + #[cfg(feature = "metrics")] + self.metrics.clone(), + ) + .with_prefix_set(prefix_set) + .prefetch(); + + Ok((hashed_address, storage_root_result?)) + }) + .collect::, ParallelStateRootError>>()?; + + Ok(()) + } } /// Error during prefetching trie storage. diff --git a/crates/trie/prefetch/src/stats.rs b/crates/trie/prefetch/src/stats.rs new file mode 100644 index 0000000000..1921cde060 --- /dev/null +++ b/crates/trie/prefetch/src/stats.rs @@ -0,0 +1,45 @@ +/// Trie stats. +#[derive(Clone, Copy, Debug)] +pub struct TriePrefetchStats { + branches_prefetched: u64, + leaves_prefetched: u64, +} + +impl TriePrefetchStats { + /// The number of added branch nodes for which we prefetched. + pub const fn branches_prefetched(&self) -> u64 { + self.branches_prefetched + } + + /// The number of added leaf nodes for which we prefetched. + pub const fn leaves_prefetched(&self) -> u64 { + self.leaves_prefetched + } +} + +/// Trie metrics tracker. +#[derive(Default, Debug, Clone, Copy)] +pub struct TriePrefetchTracker { + branches_prefetched: u64, + leaves_prefetched: u64, +} + +impl TriePrefetchTracker { + /// Increment the number of branches prefetched. + pub fn inc_branches(&mut self, num: u64) { + self.branches_prefetched += num; + } + + /// Increment the number of leaves prefetched. + pub fn inc_leaves(&mut self, num: u64) { + self.leaves_prefetched += num; + } + + /// Called when prefetch is finished to return trie prefetch statistics. + pub const fn finish(self) -> TriePrefetchStats { + TriePrefetchStats { + branches_prefetched: self.branches_prefetched, + leaves_prefetched: self.leaves_prefetched, + } + } +}