Skip to content

Commit

Permalink
feat: enable prefetch on the new engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Keefe Liu committed Oct 21, 2024
1 parent 01ee568 commit 331b982
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 11 additions & 17 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use std::{
clone::Clone,
collections::{BTreeMap, HashMap},
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};

Expand Down Expand Up @@ -231,14 +230,9 @@ impl AppendableChain {

let initial_execution_outcome = ExecutionOutcome::from((state, block.number));

// stop the prefetch task.
if let Some(interrupt_tx) = interrupt_tx {
let _ = interrupt_tx.send(());
}

// check state root if the block extends the canonical chain __and__ if state root
// validation was requested.
if block_validation_kind.is_exhaustive() {
let result = if block_validation_kind.is_exhaustive() {
// calculate and check state root
let start = Instant::now();
let (state_root, trie_updates) = if block_attachment.is_canonical() {
Expand Down Expand Up @@ -283,7 +277,14 @@ impl AppendableChain {
Ok((initial_execution_outcome, trie_updates))
} else {
Ok((initial_execution_outcome, None))
}
};

// stop the prefetch task.
if let Some(interrupt_tx) = interrupt_tx {
let _ = interrupt_tx.send(());
};

result
}

/// Validate and execute the given block, and append it to this chain.
Expand Down Expand Up @@ -356,18 +357,11 @@ impl AppendableChain {
let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let consistent_view = if let Ok(view) =
ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())
{
view
} else {
tracing::debug!("Failed to create consistent view for trie prefetch");
return (None, None)
};
let provider_factory = externals.provider_factory.clone();

tokio::spawn({
async move {
trie_prefetch.run(Arc::new(consistent_view), prefetch_rx, interrupt_rx).await;
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
}
});

Expand Down
3 changes: 3 additions & 0 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook>,
sync_metrics_tx: MetricEventsSender,
skip_state_root_validation: bool,
enable_prefetch: bool,
) -> Self {
let downloader = BasicBlockDownloader::new(client, consensus.clone());

Expand All @@ -99,6 +100,7 @@ where
tree_config,
invalid_block_hook,
skip_state_root_validation,
enable_prefetch,
);

let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
Expand Down Expand Up @@ -212,6 +214,7 @@ mod tests {
Box::new(NoopInvalidBlockHook::default()),
sync_metrics_tx,
false,
false,
);
}
}
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ reth-tasks.workspace = true
reth-node-types.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true
reth-trie-prefetch.workspace = true

# common
futures.workspace = true
Expand Down
58 changes: 53 additions & 5 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
Block, BlockNumHash, BlockNumber, GotExpected, Header, SealedBlock, SealedBlockWithSenders,
SealedHeader, B256, U256,
revm_primitives::EvmState, Block, BlockNumHash, BlockNumber, GotExpected, Header, SealedBlock,
SealedBlockWithSenders, SealedHeader, B256, U256,
};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
Expand All @@ -42,6 +42,7 @@ use reth_rpc_types::{
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -497,6 +498,8 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
invalid_block_hook: Box<dyn InvalidBlockHook>,
/// Flag indicating whether the state root validation should be skipped.
skip_state_root_validation: bool,
/// Flag indicating whether to enable prefetch.
enable_prefetch: bool,
}

impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTreeHandler<P, E, T> {
Expand All @@ -516,6 +519,8 @@ impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTr
.field("config", &self.config)
.field("metrics", &self.metrics)
.field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
.field("skip_state_root_validation", &self.skip_state_root_validation)
.field("enable_prefetch", &self.enable_prefetch)
.finish()
}
}
Expand All @@ -542,6 +547,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
config: TreeConfig,
skip_state_root_validation: bool,
enable_prefetch: bool,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
Self {
Expand All @@ -562,6 +568,7 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
skip_state_root_validation,
enable_prefetch,
}
}

Expand All @@ -587,6 +594,7 @@ where
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook>,
skip_state_root_validation: bool,
enable_prefetch: bool,
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
Expand Down Expand Up @@ -618,10 +626,20 @@ where
payload_builder,
config,
skip_state_root_validation,
enable_prefetch,
);
task.set_invalid_block_hook(invalid_block_hook);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
std::thread::Builder::new()
.name("Tree Task".to_string())
.spawn(move || {
let runtime =
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(async {
task.run();
});
})
.unwrap();
(incoming, outgoing)
}

Expand Down Expand Up @@ -2145,8 +2163,16 @@ where
return Err(e.into())
}

let executor =
self.executor_provider.executor(StateProviderDatabase::new(&state_provider), None);
let (prefetch_tx, interrupt_tx) =
if self.enable_prefetch && !self.skip_state_root_validation {
self.setup_prefetch()
} else {
(None, None)
};

let executor = self
.executor_provider
.executor(StateProviderDatabase::new(&state_provider), prefetch_tx);

let block_number = block.number;
let block_hash = block.hash();
Expand Down Expand Up @@ -2218,6 +2244,11 @@ where
state_provider.state_root_with_updates(hashed_state.clone())?
};

// stop the prefetch task.
if let Some(interrupt_tx) = interrupt_tx {
let _ = interrupt_tx.send(());
};

if state_root != block.state_root {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
Expand Down Expand Up @@ -2531,6 +2562,22 @@ where
);
Ok(())
}

fn setup_prefetch(&self) -> (Option<UnboundedSender<EvmState>>, Option<oneshot::Sender<()>>) {
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
let (interrupt_tx, interrupt_rx) = oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = self.provider.clone();

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
}
}

/// This is an error that can come from advancing persistence. Either this can be a
Expand Down Expand Up @@ -2726,6 +2773,7 @@ mod tests {
payload_builder,
TreeConfig::default(),
false,
false,
);

let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
Expand Down
2 changes: 2 additions & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ where
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_prefetch,
);
eth_service
}
Expand Down Expand Up @@ -274,6 +275,7 @@ where
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_prefetch,
);
eth_service
}
Expand Down
8 changes: 7 additions & 1 deletion crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,17 @@ where
hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?,
);

let account_tree_start = std::time::Instant::now();
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
let mut account_rlp = Vec::with_capacity(128);
while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
match node {
TrieElement::Branch(node) => {
tracker.inc_branch();
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
TrieElement::Leaf(hashed_address, account) => {
tracker.inc_leaf();
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
Some(result) => result,
// Since we do not store all intermediate nodes in the database, there might
Expand Down Expand Up @@ -181,15 +184,18 @@ where
prefix_sets.destroyed_accounts,
);

let account_tree_duration = account_tree_start.elapsed();
let stats = tracker.finish();

#[cfg(feature = "metrics")]
self.metrics.record_state_trie(stats);

trace!(
debug!(
target: "trie::parallel_state_root",
%root,
duration = ?stats.duration(),
account_tree_duration = ?account_tree_duration,
storage_trees_duration = ?(stats.duration() - account_tree_duration),
branches_added = stats.branches_added(),
leaves_added = stats.leaves_added(),
missed_leaves = stats.missed_leaves(),
Expand Down
7 changes: 5 additions & 2 deletions crates/trie/prefetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

pub use prefetch::TriePrefetch;
pub use reth_trie_parallel::StorageRootTargets;

/// Trie prefetch stats.
pub mod stats;

/// Implementation of trie prefetch.
mod prefetch;
pub use prefetch::TriePrefetch;
pub mod prefetch;
Loading

0 comments on commit 331b982

Please sign in to comment.