Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable prefetch on the new engine #164

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 11 additions & 17 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use std::{
clone::Clone,
collections::{BTreeMap, HashMap},
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};

Expand Down Expand Up @@ -231,14 +230,9 @@ impl AppendableChain {

let initial_execution_outcome = ExecutionOutcome::from((state, block.number));

// stop the prefetch task.
if let Some(interrupt_tx) = interrupt_tx {
let _ = interrupt_tx.send(());
}

// check state root if the block extends the canonical chain __and__ if state root
// validation was requested.
if block_validation_kind.is_exhaustive() {
let result = if block_validation_kind.is_exhaustive() {
// calculate and check state root
let start = Instant::now();
let (state_root, trie_updates) = if block_attachment.is_canonical() {
Expand Down Expand Up @@ -283,7 +277,14 @@ impl AppendableChain {
Ok((initial_execution_outcome, trie_updates))
} else {
Ok((initial_execution_outcome, None))
}
};

// stop the prefetch task.
if let Some(interrupt_tx) = interrupt_tx {
let _ = interrupt_tx.send(());
};

result
}

/// Validate and execute the given block, and append it to this chain.
Expand Down Expand Up @@ -356,18 +357,11 @@ impl AppendableChain {
let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let consistent_view = if let Ok(view) =
ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())
{
view
} else {
tracing::debug!("Failed to create consistent view for trie prefetch");
return (None, None)
};
let provider_factory = externals.provider_factory.clone();

tokio::spawn({
async move {
trie_prefetch.run(Arc::new(consistent_view), prefetch_rx, interrupt_rx).await;
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
}
});

Expand Down
3 changes: 3 additions & 0 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook>,
sync_metrics_tx: MetricEventsSender,
skip_state_root_validation: bool,
enable_prefetch: bool,
) -> Self {
let downloader = BasicBlockDownloader::new(client, consensus.clone());

Expand All @@ -99,6 +100,7 @@ where
tree_config,
invalid_block_hook,
skip_state_root_validation,
enable_prefetch,
);

let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
Expand Down Expand Up @@ -212,6 +214,7 @@ mod tests {
Box::new(NoopInvalidBlockHook::default()),
sync_metrics_tx,
false,
false,
);
}
}
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ reth-tasks.workspace = true
reth-node-types.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true
reth-trie-prefetch.workspace = true

# common
futures.workspace = true
Expand Down
58 changes: 53 additions & 5 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
Block, BlockNumHash, BlockNumber, GotExpected, Header, SealedBlock, SealedBlockWithSenders,
SealedHeader, B256, U256,
revm_primitives::EvmState, Block, BlockNumHash, BlockNumber, GotExpected, Header, SealedBlock,
SealedBlockWithSenders, SealedHeader, B256, U256,
};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
Expand All @@ -42,6 +42,7 @@ use reth_rpc_types::{
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -497,6 +498,8 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
invalid_block_hook: Box<dyn InvalidBlockHook>,
/// Flag indicating whether the state root validation should be skipped.
skip_state_root_validation: bool,
/// Flag indicating whether to enable prefetch.
enable_prefetch: bool,
}

impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTreeHandler<P, E, T> {
Expand All @@ -516,6 +519,8 @@ impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTr
.field("config", &self.config)
.field("metrics", &self.metrics)
.field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
.field("skip_state_root_validation", &self.skip_state_root_validation)
.field("enable_prefetch", &self.enable_prefetch)
.finish()
}
}
Expand All @@ -542,6 +547,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
config: TreeConfig,
skip_state_root_validation: bool,
enable_prefetch: bool,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
Self {
Expand All @@ -562,6 +568,7 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
skip_state_root_validation,
enable_prefetch,
}
}

Expand All @@ -587,6 +594,7 @@ where
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook>,
skip_state_root_validation: bool,
enable_prefetch: bool,
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
Expand Down Expand Up @@ -618,10 +626,20 @@ where
payload_builder,
config,
skip_state_root_validation,
enable_prefetch,
);
task.set_invalid_block_hook(invalid_block_hook);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
std::thread::Builder::new()
.name("Tree Task".to_string())
.spawn(move || {
let runtime =
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(async {
task.run();
});
})
.unwrap();
(incoming, outgoing)
}

Expand Down Expand Up @@ -2145,8 +2163,16 @@ where
return Err(e.into())
}

let executor =
self.executor_provider.executor(StateProviderDatabase::new(&state_provider), None);
let (prefetch_tx, interrupt_tx) =
if self.enable_prefetch && !self.skip_state_root_validation {
self.setup_prefetch()
} else {
(None, None)
};

let executor = self
.executor_provider
.executor(StateProviderDatabase::new(&state_provider), prefetch_tx);

let block_number = block.number;
let block_hash = block.hash();
Expand Down Expand Up @@ -2218,6 +2244,11 @@ where
state_provider.state_root_with_updates(hashed_state.clone())?
};

// stop the prefetch task.
if let Some(interrupt_tx) = interrupt_tx {
let _ = interrupt_tx.send(());
};

if state_root != block.state_root {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
Expand Down Expand Up @@ -2531,6 +2562,22 @@ where
);
Ok(())
}

fn setup_prefetch(&self) -> (Option<UnboundedSender<EvmState>>, Option<oneshot::Sender<()>>) {
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
let (interrupt_tx, interrupt_rx) = oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = self.provider.clone();

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
}
}

/// This is an error that can come from advancing persistence. Either this can be a
Expand Down Expand Up @@ -2726,6 +2773,7 @@ mod tests {
payload_builder,
TreeConfig::default(),
false,
false,
);

let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
Expand Down
2 changes: 2 additions & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ where
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_prefetch,
);
eth_service
}
Expand Down Expand Up @@ -274,6 +275,7 @@ where
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_prefetch,
);
eth_service
}
Expand Down
8 changes: 7 additions & 1 deletion crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,17 @@ where
hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?,
);

let account_tree_start = std::time::Instant::now();
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
let mut account_rlp = Vec::with_capacity(128);
while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
match node {
TrieElement::Branch(node) => {
tracker.inc_branch();
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
TrieElement::Leaf(hashed_address, account) => {
tracker.inc_leaf();
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
Some(result) => result,
// Since we do not store all intermediate nodes in the database, there might
Expand Down Expand Up @@ -181,15 +184,18 @@ where
prefix_sets.destroyed_accounts,
);

let account_tree_duration = account_tree_start.elapsed();
let stats = tracker.finish();

#[cfg(feature = "metrics")]
self.metrics.record_state_trie(stats);

trace!(
debug!(
target: "trie::parallel_state_root",
%root,
duration = ?stats.duration(),
account_tree_duration = ?account_tree_duration,
storage_trees_duration = ?(stats.duration() - account_tree_duration),
branches_added = stats.branches_added(),
leaves_added = stats.leaves_added(),
missed_leaves = stats.missed_leaves(),
Expand Down
7 changes: 5 additions & 2 deletions crates/trie/prefetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

pub use prefetch::TriePrefetch;
pub use reth_trie_parallel::StorageRootTargets;

/// Trie prefetch stats.
pub mod stats;

/// Implementation of trie prefetch.
mod prefetch;
pub use prefetch::TriePrefetch;
pub mod prefetch;
Loading
Loading