diff --git a/Cargo.lock b/Cargo.lock index d47b980e7..5d5378a6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7785,6 +7785,7 @@ dependencies = [ "reth-tasks", "reth-tracing", "reth-trie", + "reth-trie-parallel", "thiserror", "tokio", "tracing", diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index fd4cf26a9..56f8d53bf 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -83,7 +83,7 @@ where ) -> Self { let downloader = BasicBlockDownloader::new(client, consensus.clone()); - let persistence_handle = PersistenceHandle::spawn_service(provider, pruner); + let persistence_handle = PersistenceHandle::spawn_service(provider.clone(), pruner); let payload_validator = ExecutionPayloadValidator::new(chain_spec); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); @@ -97,6 +97,7 @@ where payload_builder, canonical_in_memory_state, tree_config, + provider, ); let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index edc8fa2c2..4f8d0074f 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -34,6 +34,7 @@ reth-rpc-types.workspace = true reth-stages-api.workspace = true reth-tasks.workspace = true reth-trie.workspace = true +reth-trie-parallel = { workspace = true, features = ["parallel"] } # common futures.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 8eebf960b..323503ac0 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -16,6 +16,7 @@ use reth_chain_state::{ CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain, }; use reth_consensus::{Consensus, PostExecutionInput}; +use reth_db_api::database::Database; use reth_engine_primitives::EngineTypes; use reth_errors::{ConsensusError, ProviderResult}; use reth_evm::execute::{BlockExecutorProvider, Executor}; @@ -27,8 +28,8 @@ use reth_primitives::{ SealedHeader, B256, U256, }; use reth_provider::{ - BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory, - StateRootProvider, + providers::ConsistentDbView, BlockReader, ExecutionOutcome, ProviderError, ProviderFactory, + StateProviderBox, StateProviderFactory, StateRootProvider, }; use reth_revm::database::StateProviderDatabase; use reth_rpc_types::{ @@ -40,6 +41,7 @@ use reth_rpc_types::{ }; use reth_stages_api::ControlFlow; use reth_trie::HashedPostState; +use reth_trie_parallel::parallel_root::ParallelStateRoot; use std::{ collections::{BTreeMap, HashMap, HashSet}, ops::Bound, @@ -379,8 +381,9 @@ pub enum TreeAction { /// This type is responsible for processing engine API requests, maintaining the canonical state and /// emitting events. #[derive(Debug)] -pub struct EngineApiTreeHandler { +pub struct EngineApiTreeHandler { provider: P, + provider_factory: ProviderFactory, executor_provider: E, consensus: Arc, payload_validator: ExecutionPayloadValidator, @@ -417,11 +420,12 @@ pub struct EngineApiTreeHandler { metrics: EngineApiMetrics, } -impl EngineApiTreeHandler +impl EngineApiTreeHandler where P: BlockReader + StateProviderFactory + Clone + 'static, E: BlockExecutorProvider, T: EngineTypes, + DB: Database + 'static, { /// Creates a new [`EngineApiTreeHandler`]. #[allow(clippy::too_many_arguments)] @@ -437,6 +441,7 @@ where persistence_state: PersistenceState, payload_builder: PayloadBuilderHandle, config: TreeConfig, + provider_factory: ProviderFactory, ) -> Self { let (incoming_tx, incoming) = std::sync::mpsc::channel(); Self { @@ -455,6 +460,7 @@ where config, metrics: Default::default(), incoming_tx, + provider_factory, } } @@ -473,6 +479,7 @@ where payload_builder: PayloadBuilderHandle, canonical_in_memory_state: CanonicalInMemoryState, config: TreeConfig, + provider_factory: ProviderFactory, ) -> (Sender>>, UnboundedReceiver) { let best_block_number = provider.best_block_number().unwrap_or(0); let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); @@ -502,6 +509,7 @@ where persistence_state, payload_builder, config, + provider_factory, ); let incoming = task.incoming_tx.clone(); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap(); @@ -1774,11 +1782,24 @@ where PostExecutionInput::new(&output.receipts, &output.requests), )?; - let hashed_state = HashedPostState::from_bundle_state(&output.state.state); + let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider_factory.clone())?; + let mut hashed_state = HashedPostState::from_bundle_state(&output.state.state); + + if let Some((_, executed_blocks)) = + self.state.tree_state.blocks_by_hash(block.block.header.parent_hash) + { + for block in &executed_blocks { + hashed_state.extend(block.hashed_state.as_ref().clone()) + } + } let root_time = Instant::now(); let (state_root, trie_output) = - state_provider.state_root_with_updates(hashed_state.clone())?; + ParallelStateRoot::new(consistent_view, hashed_state.clone()) + .incremental_root_with_updates() + .map(|(root, updates)| (root, Some(updates))) + .map_err(ProviderError::from)?; + if state_root != block.state_root { return Err(ConsensusError::BodyStateRootDiff( GotExpected { got: state_root, expected: block.state_root }.into(), @@ -1793,7 +1814,7 @@ where senders: Arc::new(block.senders), execution_output: Arc::new(ExecutionOutcome::from((output, block_number))), hashed_state: Arc::new(hashed_state), - trie: Arc::new(trie_output), + trie: Arc::new(trie_output.unwrap_or_default()), }; if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash {