Skip to content

Commit

Permalink
feat: enable parallel root calculation on new engine live sync
Browse files Browse the repository at this point in the history
  • Loading branch information
j75689 committed Sep 10, 2024
1 parent afa32b5 commit aeec2f3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where
) -> Self {
let downloader = BasicBlockDownloader::new(client, consensus.clone());

let persistence_handle = PersistenceHandle::spawn_service(provider, pruner);
let persistence_handle = PersistenceHandle::spawn_service(provider.clone(), pruner);
let payload_validator = ExecutionPayloadValidator::new(chain_spec);

let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
Expand All @@ -97,6 +97,7 @@ where
payload_builder,
canonical_in_memory_state,
tree_config,
provider,
);

let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
Expand Down
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ reth-rpc-types.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-trie.workspace = true
reth-trie-parallel = { workspace = true, features = ["parallel"] }

# common
futures.workspace = true
Expand Down
35 changes: 28 additions & 7 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use reth_chain_state::{
CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_db_api::database::Database;
use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::execute::{BlockExecutorProvider, Executor};
Expand All @@ -27,8 +28,8 @@ use reth_primitives::{
SealedHeader, B256, U256,
};
use reth_provider::{
BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory,
StateRootProvider,
providers::ConsistentDbView, BlockReader, ExecutionOutcome, ProviderError, ProviderFactory,
StateProviderBox, StateProviderFactory, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
Expand All @@ -40,6 +41,7 @@ use reth_rpc_types::{
};
use reth_stages_api::ControlFlow;
use reth_trie::HashedPostState;
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
collections::{BTreeMap, HashMap, HashSet},
ops::Bound,
Expand Down Expand Up @@ -379,8 +381,9 @@ pub enum TreeAction {
/// This type is responsible for processing engine API requests, maintaining the canonical state and
/// emitting events.
#[derive(Debug)]
pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
pub struct EngineApiTreeHandler<P, E, T: EngineTypes, DB> {
provider: P,
provider_factory: ProviderFactory<DB>,
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
Expand Down Expand Up @@ -417,11 +420,12 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
metrics: EngineApiMetrics,
}

impl<P, E, T> EngineApiTreeHandler<P, E, T>
impl<P, E, T, DB> EngineApiTreeHandler<P, E, T, DB>
where
P: BlockReader + StateProviderFactory + Clone + 'static,
E: BlockExecutorProvider,
T: EngineTypes,
DB: Database + 'static,
{
/// Creates a new [`EngineApiTreeHandler`].
#[allow(clippy::too_many_arguments)]
Expand All @@ -437,6 +441,7 @@ where
persistence_state: PersistenceState,
payload_builder: PayloadBuilderHandle<T>,
config: TreeConfig,
provider_factory: ProviderFactory<DB>,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
Self {
Expand All @@ -455,6 +460,7 @@ where
config,
metrics: Default::default(),
incoming_tx,
provider_factory,
}
}

Expand All @@ -473,6 +479,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
canonical_in_memory_state: CanonicalInMemoryState,
config: TreeConfig,
provider_factory: ProviderFactory<DB>,
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
Expand Down Expand Up @@ -502,6 +509,7 @@ where
persistence_state,
payload_builder,
config,
provider_factory,
);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
Expand Down Expand Up @@ -1774,11 +1782,24 @@ where
PostExecutionInput::new(&output.receipts, &output.requests),
)?;

let hashed_state = HashedPostState::from_bundle_state(&output.state.state);
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider_factory.clone())?;
let mut hashed_state = HashedPostState::from_bundle_state(&output.state.state);

if let Some((_, executed_blocks)) =
self.state.tree_state.blocks_by_hash(block.block.header.parent_hash)
{
for block in &executed_blocks {
hashed_state.extend(block.hashed_state.as_ref().clone())
}
}

let root_time = Instant::now();
let (state_root, trie_output) =
state_provider.state_root_with_updates(hashed_state.clone())?;
ParallelStateRoot::new(consistent_view, hashed_state.clone())
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?;

if state_root != block.state_root {
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.state_root }.into(),
Expand All @@ -1793,7 +1814,7 @@ where
senders: Arc::new(block.senders),
execution_output: Arc::new(ExecutionOutcome::from((output, block_number))),
hashed_state: Arc::new(hashed_state),
trie: Arc::new(trie_output),
trie: Arc::new(trie_output.unwrap_or_default()),
};

if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash {
Expand Down

0 comments on commit aeec2f3

Please sign in to comment.