From a2f77e5c203bd45dfc229852f1b2ce8867f79b77 Mon Sep 17 00:00:00 2001 From: Keefe Liu Date: Tue, 27 Aug 2024 15:35:00 +0800 Subject: [PATCH 1/5] feat: add trie prefetch --- Cargo.lock | 34 ++ Cargo.toml | 2 + Makefile | 12 +- bin/reth/Cargo.toml | 2 + .../src/commands/debug_cmd/build_block.rs | 4 +- .../commands/debug_cmd/in_memory_merkle.rs | 6 +- crates/blockchain-tree/Cargo.toml | 2 + crates/blockchain-tree/src/blockchain_tree.rs | 2 +- crates/blockchain-tree/src/chain.rs | 42 ++- crates/blockchain-tree/src/shareable.rs | 6 +- crates/bsc/evm/Cargo.toml | 4 + crates/bsc/evm/src/execute.rs | 84 ++++- crates/consensus/auto-seal/src/lib.rs | 2 +- crates/engine/tree/src/tree/mod.rs | 3 +- crates/ethereum/evm/Cargo.toml | 7 + crates/ethereum/evm/src/execute.rs | 80 ++++- crates/evm/Cargo.toml | 2 + crates/evm/src/either.rs | 12 +- crates/evm/src/execute.rs | 16 +- crates/evm/src/noop.rs | 4 +- crates/evm/src/test_utils.rs | 4 +- crates/exex/exex/src/backfill/job.rs | 9 +- crates/exex/exex/src/backfill/mod.rs | 11 +- crates/optimism/evm/Cargo.toml | 4 + crates/optimism/evm/src/execute.rs | 73 ++++- crates/trie/prefetch/Cargo.toml | 55 ++++ crates/trie/prefetch/src/lib.rs | 14 + crates/trie/prefetch/src/prefetch.rs | 305 ++++++++++++++++++ crates/trie/trie/src/state.rs | 22 +- crates/trie/trie/src/trie.rs | 50 +++ 30 files changed, 795 insertions(+), 78 deletions(-) create mode 100644 crates/trie/prefetch/Cargo.toml create mode 100644 crates/trie/prefetch/src/lib.rs create mode 100644 crates/trie/prefetch/src/prefetch.rs diff --git a/Cargo.lock b/Cargo.lock index 71201ffd2..0b80b93df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7292,6 +7292,7 @@ dependencies = [ "reth-testing-utils", "reth-trie", "reth-trie-parallel", + "reth-trie-prefetch", "tokio", "tracing", ] @@ -8133,8 +8134,10 @@ dependencies = [ "reth-primitives", "reth-prune-types", "reth-storage-errors", + "reth-trie", "revm", "revm-primitives", + "tokio", ] [[package]] @@ -8155,8 +8158,10 @@ dependencies = [ "reth-provider", "reth-prune-types", "reth-revm", + "reth-trie", "revm-primitives", "thiserror", + "tokio", "tracing", ] @@ -8175,9 +8180,12 @@ dependencies = [ "reth-prune-types", "reth-revm", "reth-testing-utils", + "reth-trie", "revm-primitives", "secp256k1", "serde_json", + "tokio", + "tracing", ] [[package]] @@ -8194,9 +8202,11 @@ dependencies = [ "reth-primitives", "reth-prune-types", "reth-revm", + "reth-trie", "revm", "revm-primitives", "thiserror", + "tokio", "tracing", ] @@ -9792,6 +9802,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "reth-trie-prefetch" +version = "1.0.3" +dependencies = [ + "alloy-rlp", + "criterion", + "derive_more", + "metrics", + "proptest", + "rand 0.8.5", + "rayon", + "reth-db", + "reth-execution-errors", + "reth-metrics", + "reth-primitives", + "reth-provider", + "reth-tasks", + "reth-trie", + "reth-trie-parallel", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "revm" version = "12.1.0" diff --git a/Cargo.toml b/Cargo.toml index 24c55325c..374a28416 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,6 +119,7 @@ members = [ "crates/transaction-pool/", "crates/trie/common", "crates/trie/parallel/", + "crates/trie/prefetch/", "crates/trie/trie", "crates/bsc/node/", "crates/bsc/engine/", @@ -384,6 +385,7 @@ reth-trie = { path = "crates/trie/trie" } reth-trie-common = { path = "crates/trie/common" } reth-trie-db = { path = "crates/trie/db" } reth-trie-parallel = { path = "crates/trie/parallel" } +reth-trie-prefetch = { path = "crates/trie/prefetch" } # revm revm = { version = "12.1.0", features = [ diff --git a/Makefile b/Makefile index 12d37f0e2..98aed53b4 100644 --- a/Makefile +++ b/Makefile @@ -53,14 +53,14 @@ install: ## Build and install the reth binary under `~/.cargo/bin`. .PHONY: install-op install-op: ## Build and install the op-reth binary under `~/.cargo/bin`. cargo install --path bin/reth --bin op-reth --force --locked \ - --features "optimism opbnb $(FEATURES)" \ + --features "optimism,opbnb,$(FEATURES)" \ --profile "$(PROFILE)" \ $(CARGO_INSTALL_EXTRA_FLAGS) .PHONY: install-bsc install-bsc: ## Build and install the bsc-reth binary under `~/.cargo/bin`. cargo install --path bin/reth --bin bsc-reth --force --locked \ - --features "bsc $(FEATURES)" \ + --features "bsc,prefetch,$(FEATURES)" \ --profile "$(PROFILE)" \ $(CARGO_INSTALL_EXTRA_FLAGS) @@ -70,11 +70,11 @@ build: ## Build the reth binary into `target` directory. .PHONY: build-op build-op: ## Build the op-reth binary into `target` directory. - cargo build --bin op-reth --features "optimism opbnb $(FEATURES)" --profile "$(PROFILE)" + cargo build --bin op-reth --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)" .PHONY: build-bsc build-bsc: ## Build the bsc-reth binary into `target` directory. - cargo build --bin bsc-reth --features "bsc $(FEATURES)" --profile "$(PROFILE)" + cargo build --bin bsc-reth --features "bsc,prefetch,$(FEATURES)" --profile "$(PROFILE)" # Builds the reth binary natively. build-native-%: @@ -84,7 +84,7 @@ op-build-native-%: cargo build --bin op-reth --target $* --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)" bsc-build-native-%: - cargo build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)" + cargo build --bin bsc-reth --target $* --features "bsc,prefetch,$(FEATURES)" --profile "$(PROFILE)" # The following commands use `cross` to build a cross-compile. # @@ -127,7 +127,7 @@ op-build-%: bsc-build-%: RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \ - cross build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)" + cross build --bin bsc-reth --target $* --features "bsc,prefetch,$(FEATURES)" --profile "$(PROFILE)" # Unfortunately we can't easily use cross to build for Darwin because of licensing issues. # If we wanted to, we would need to build a custom Docker image with the SDK available. diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 300e74b4f..7fac4b833 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -164,6 +164,8 @@ bsc = [ "reth-beacon-consensus/bsc", ] +prefetch = ["reth-blockchain-tree/prefetch"] + # no-op feature flag for switching between the `optimism` and default functionality in CI matrices ethereum = [] diff --git a/bin/reth/src/commands/debug_cmd/build_block.rs b/bin/reth/src/commands/debug_cmd/build_block.rs index b674324aa..2772fd144 100644 --- a/bin/reth/src/commands/debug_cmd/build_block.rs +++ b/bin/reth/src/commands/debug_cmd/build_block.rs @@ -275,9 +275,9 @@ impl Command { #[cfg(feature = "bsc")] let executor = block_executor!(provider_factory.chain_spec(), provider_factory.clone()) - .executor(db); + .executor(db, None); #[cfg(not(feature = "bsc"))] - let executor = block_executor!(provider_factory.chain_spec()).executor(db); + let executor = block_executor!(provider_factory.chain_spec()).executor(db, None); let BlockExecutionOutput { state, receipts, requests, .. } = executor .execute((&block_with_senders.clone().unseal(), U256::MAX, None).into())?; diff --git a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs index aa202004c..9950004f9 100644 --- a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs @@ -130,10 +130,10 @@ impl Command { )); #[cfg(feature = "bsc")] - let executor = - block_executor!(provider_factory.chain_spec(), provider_factory.clone()).executor(db); + let executor = block_executor!(provider_factory.chain_spec(), provider_factory.clone()) + .executor(db, None); #[cfg(not(feature = "bsc"))] - let executor = block_executor!(provider_factory.chain_spec()).executor(db); + let executor = block_executor!(provider_factory.chain_spec()).executor(db, None); let merkle_block_td = provider.header_td_by_number(merkle_block_number)?.unwrap_or_default(); diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index b3679677a..6b2336485 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -28,6 +28,7 @@ reth-trie = { workspace = true, features = ["metrics"] } reth-trie-parallel = { workspace = true, features = ["parallel"] } reth-network.workspace = true reth-consensus.workspace = true +reth-trie-prefetch = { workspace = true, optional = true } # common parking_lot.workspace = true @@ -59,3 +60,4 @@ alloy-genesis.workspace = true [features] test-utils = [] optimism = ["reth-primitives/optimism", "reth-provider/optimism"] +prefetch = ["dep:reth-trie-prefetch"] diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 501ce4381..49bc97ebe 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -90,7 +90,7 @@ impl BlockchainTree { impl BlockchainTree where - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { /// Builds the blockchain tree for the node. diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index 7dfdce696..a769d176a 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -25,6 +25,10 @@ use reth_provider::{ use reth_revm::database::StateProviderDatabase; use reth_trie::updates::TrieUpdates; use reth_trie_parallel::parallel_root::ParallelStateRoot; +#[cfg(feature = "prefetch")] +use reth_trie_prefetch::TriePrefetch; +#[cfg(feature = "prefetch")] +use std::sync::Arc; use std::{ collections::{BTreeMap, HashMap}, ops::{Deref, DerefMut}, @@ -77,7 +81,7 @@ impl AppendableChain { block_validation_kind: BlockValidationKind, ) -> Result where - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { let execution_outcome = ExecutionOutcome::default(); @@ -116,7 +120,7 @@ impl AppendableChain { block_validation_kind: BlockValidationKind, ) -> Result where - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { let parent_number = @@ -181,7 +185,7 @@ impl AppendableChain { ) -> Result<(ExecutionOutcome, Option), BlockExecutionError> where EDP: FullExecutionDataProvider, - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { // some checks are done before blocks comes here. @@ -208,11 +212,35 @@ impl AppendableChain { let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider); + #[cfg(feature = "prefetch")] + let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel(); + let db = StateProviderDatabase::new(&provider); - let executor = externals.executor_factory.executor(db); + #[cfg(feature = "prefetch")] + let executor = externals.executor_factory.executor(db, Some(prefetch_tx)); + #[cfg(not(feature = "prefetch"))] + let executor = externals.executor_factory.executor(db, None); + let block_hash = block.hash(); let block = block.unseal(); + #[cfg(feature = "prefetch")] + let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel(); + + #[cfg(feature = "prefetch")] + { + let mut trie_prefetch = TriePrefetch::new(); + let consistent_view = Arc::new(ConsistentDbView::new_with_latest_tip( + externals.provider_factory.clone(), + )?); + + tokio::spawn({ + async move { + trie_prefetch.run::(consistent_view, prefetch_rx, interrupt_rx).await; + } + }); + } + let state = executor.execute((&block, U256::MAX, ancestor_blocks).into())?; let BlockExecutionOutput { state, receipts, requests, .. } = state; externals @@ -222,6 +250,10 @@ impl AppendableChain { let initial_execution_outcome = ExecutionOutcome::new(state, receipts.into(), block.number, vec![requests.into()]); + // stop the prefetch task. + #[cfg(feature = "prefetch")] + let _ = interrupt_tx.send(()); + // check state root if the block extends the canonical chain __and__ if state root // validation was requested. if block_validation_kind.is_exhaustive() { @@ -284,7 +316,7 @@ impl AppendableChain { block_validation_kind: BlockValidationKind, ) -> Result<(), InsertBlockErrorKind> where - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { let parent_block = self.chain.tip(); diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index 0f7903cf0..c106d0ad3 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -37,7 +37,7 @@ impl ShareableBlockchainTree { impl BlockchainTreeEngine for ShareableBlockchainTree where - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> { @@ -108,7 +108,7 @@ where impl BlockchainTreeViewer for ShareableBlockchainTree where - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { fn header_by_hash(&self, hash: BlockHash) -> Option { @@ -171,7 +171,7 @@ where impl BlockchainTreePendingStateProvider for ShareableBlockchainTree where - DB: Database + Clone, + DB: Database + Clone + 'static, E: BlockExecutorProvider, { fn find_pending_state_provider( diff --git a/crates/bsc/evm/Cargo.toml b/crates/bsc/evm/Cargo.toml index 6446157a6..bc713c816 100644 --- a/crates/bsc/evm/Cargo.toml +++ b/crates/bsc/evm/Cargo.toml @@ -21,6 +21,7 @@ reth-prune-types.workspace = true reth-revm.workspace = true reth-provider.workspace = true reth-bsc-consensus.workspace = true +reth-trie.workspace = true # Revm revm-primitives.workspace = true @@ -34,6 +35,9 @@ bitset = "0.1.2" lru = "0.12.3" blst = "0.3.12" +# async +tokio = { workspace = true, features = ["sync", "time"] } + [dev-dependencies] reth-revm = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } diff --git a/crates/bsc/evm/src/execute.rs b/crates/bsc/evm/src/execute.rs index 121193135..1994bb5ff 100644 --- a/crates/bsc/evm/src/execute.rs +++ b/crates/bsc/evm/src/execute.rs @@ -28,11 +28,13 @@ use reth_revm::{ db::states::bundle_state::BundleRetention, Evm, State, }; +use reth_trie::HashedPostState; use revm_primitives::{ db::{Database, DatabaseCommit}, BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, ResultAndState, TransactTo, }; use std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Instant}; +use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, warn}; const SNAP_CACHE_NUM: usize = 2048; @@ -80,17 +82,40 @@ where P: Clone, EvmConfig: ConfigureEvm, { - fn bsc_executor(&self, db: DB) -> BscBlockExecutor + fn bsc_executor( + &self, + db: DB, + prefetch_tx: Option>, + ) -> BscBlockExecutor where DB: Database + std::fmt::Display>, { - BscBlockExecutor::new( - self.chain_spec.clone(), - self.evm_config.clone(), - self.parlia_config.clone(), - State::builder().with_database(db).with_bundle_update().without_state_clear().build(), - self.provider.clone(), - ) + if let Some(tx) = prefetch_tx { + BscBlockExecutor::new_with_prefetch_tx( + self.chain_spec.clone(), + self.evm_config.clone(), + self.parlia_config.clone(), + State::builder() + .with_database(db) + .with_bundle_update() + .without_state_clear() + .build(), + self.provider.clone(), + tx, + ) + } else { + BscBlockExecutor::new( + self.chain_spec.clone(), + self.evm_config.clone(), + self.parlia_config.clone(), + State::builder() + .with_database(db) + .with_bundle_update() + .without_state_clear() + .build(), + self.provider.clone(), + ) + } } } @@ -105,18 +130,22 @@ where type BatchExecutor + std::fmt::Display>> = BscBatchExecutor; - fn executor(&self, db: DB) -> Self::Executor + fn executor( + &self, + db: DB, + prefetch_tx: Option>, + ) -> Self::Executor where DB: Database + std::fmt::Display>, { - self.bsc_executor(db) + self.bsc_executor(db, prefetch_tx) } fn batch_executor(&self, db: DB) -> Self::BatchExecutor where DB: Database + std::fmt::Display>, { - let executor = self.bsc_executor(db); + let executor = self.bsc_executor(db, None); BscBatchExecutor { executor, batch_record: BlockBatchRecord::default(), @@ -158,6 +187,7 @@ where &self, block: &BlockWithSenders, mut evm: Evm<'_, Ext, &mut State>, + tx: Option>, ) -> Result<(Vec, Vec, u64), BlockExecutionError> where DB: Database + std::fmt::Display>, @@ -210,6 +240,13 @@ where } })?; + if let Some(tx) = tx.as_ref() { + let post_state = HashedPostState::from_state(state.clone()); + tx.send(post_state).unwrap_or_else(|err| { + debug!(target: "evm_executor", ?err, "Failed to send post state to prefetch channel") + }); + } + evm.db_mut().commit(state); self.patch_mainnet_after_tx(transaction, evm.db_mut()); @@ -250,6 +287,8 @@ pub struct BscBlockExecutor { pub(crate) provider: Arc

, /// Parlia consensus instance pub(crate) parlia: Arc, + /// Prefetch channel + prefetch_tx: Option>, } impl BscBlockExecutor { @@ -268,6 +307,27 @@ impl BscBlockExecutor { state, provider: shared_provider, parlia, + prefetch_tx: None, + } + } + + /// Creates a new BSC block executor with a prefetch channel. + pub fn new_with_prefetch_tx( + chain_spec: Arc, + evm_config: EvmConfig, + parlia_config: ParliaConfig, + state: State, + provider: P, + tx: UnboundedSender, + ) -> Self { + let parlia = Arc::new(Parlia::new(Arc::clone(&chain_spec), parlia_config)); + let shared_provider = Arc::new(provider); + Self { + executor: BscEvmExecutor { chain_spec, evm_config }, + state, + provider: shared_provider, + parlia, + prefetch_tx: Some(tx), } } @@ -347,7 +407,7 @@ where let (mut system_txs, mut receipts, mut gas_used) = { let evm = self.executor.evm_config.evm_with_env(&mut self.state, env.clone()); - self.executor.execute_pre_and_transactions(block, evm) + self.executor.execute_pre_and_transactions(block, evm, self.prefetch_tx.clone()) }?; // 5. apply post execution changes diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs index 3274abb25..878cf5382 100644 --- a/crates/consensus/auto-seal/src/lib.rs +++ b/crates/consensus/auto-seal/src/lib.rs @@ -388,7 +388,7 @@ impl StorageInner { requests: block_execution_requests, gas_used, .. - } = executor.executor(&mut db).execute((&block, U256::ZERO, None).into())?; + } = executor.executor(&mut db, None).execute((&block, U256::ZERO, None).into())?; let execution_outcome = ExecutionOutcome::new( state, receipts.into(), diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 4de11fc9c..0ead876ca 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -653,7 +653,8 @@ where self.validate_block(&block)?; let state_provider = self.state_provider(block.parent_hash).unwrap(); - let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider)); + let executor = + self.executor_provider.executor(StateProviderDatabase::new(&state_provider), None); let block_number = block.number; let block_hash = block.hash(); diff --git a/crates/ethereum/evm/Cargo.toml b/crates/ethereum/evm/Cargo.toml index 7ea2e4b58..0beeca65a 100644 --- a/crates/ethereum/evm/Cargo.toml +++ b/crates/ethereum/evm/Cargo.toml @@ -20,6 +20,7 @@ reth-revm.workspace = true reth-ethereum-consensus.workspace = true reth-prune-types.workspace = true reth-execution-types.workspace = true +reth-trie.workspace = true # Ethereum revm-primitives.workspace = true @@ -28,6 +29,12 @@ revm-primitives.workspace = true alloy-eips.workspace = true alloy-sol-types.workspace = true +# misc +tracing.workspace = true + +# async +tokio = { workspace = true, features = ["sync", "time"] } + [dev-dependencies] reth-testing-utils.workspace = true reth-revm = { workspace = true, features = ["test-utils"] } diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index 7617d09ef..aae646798 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -28,10 +28,13 @@ use reth_revm::{ state_change::{apply_blockhashes_update, post_block_balance_increments}, Evm, State, }; +use reth_trie::HashedPostState; use revm_primitives::{ db::{Database, DatabaseCommit}, BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, ResultAndState, }; +use tokio::sync::mpsc::UnboundedSender; +use tracing::debug; #[cfg(feature = "std")] use std::{fmt::Display, sync::Arc, vec, vec::Vec}; @@ -65,15 +68,36 @@ impl EthExecutorProvider where EvmConfig: ConfigureEvm, { - fn eth_executor(&self, db: DB) -> EthBlockExecutor + fn eth_executor( + &self, + db: DB, + prefetch_tx: Option>, + ) -> EthBlockExecutor where DB: Database>, { - EthBlockExecutor::new( - self.chain_spec.clone(), - self.evm_config.clone(), - State::builder().with_database(db).with_bundle_update().without_state_clear().build(), - ) + if let Some(tx) = prefetch_tx { + EthBlockExecutor::new_with_prefetch_tx( + self.chain_spec.clone(), + self.evm_config.clone(), + State::builder() + .with_database(db) + .with_bundle_update() + .without_state_clear() + .build(), + tx, + ) + } else { + EthBlockExecutor::new( + self.chain_spec.clone(), + self.evm_config.clone(), + State::builder() + .with_database(db) + .with_bundle_update() + .without_state_clear() + .build(), + ) + } } } @@ -87,18 +111,22 @@ where type BatchExecutor + Display>> = EthBatchExecutor; - fn executor(&self, db: DB) -> Self::Executor + fn executor( + &self, + db: DB, + prefetch_tx: Option>, + ) -> Self::Executor where DB: Database + Display>, { - self.eth_executor(db) + self.eth_executor(db, prefetch_tx) } fn batch_executor(&self, db: DB) -> Self::BatchExecutor where DB: Database + Display>, { - let executor = self.eth_executor(db); + let executor = self.eth_executor(db, None); EthBatchExecutor { executor, batch_record: BlockBatchRecord::default(), @@ -142,6 +170,7 @@ where &self, block: &BlockWithSenders, mut evm: Evm<'_, Ext, &mut State>, + tx: Option>, ) -> Result where DB: Database, @@ -196,6 +225,14 @@ where error: Box::new(new_err), } })?; + + if let Some(tx) = tx.as_ref() { + let post_state = HashedPostState::from_state(state.clone()); + tx.send(post_state).unwrap_or_else(|err| { + debug!(target: "evm_executor", ?err, "Failed to send post state to prefetch channel") + }); + } + evm.db_mut().commit(state); // append gas used @@ -250,12 +287,24 @@ pub struct EthBlockExecutor { executor: EthEvmExecutor, /// The state to use for execution state: State, + /// Prefetch channel + prefetch_tx: Option>, } impl EthBlockExecutor { /// Creates a new Ethereum block executor. pub const fn new(chain_spec: Arc, evm_config: EvmConfig, state: State) -> Self { - Self { executor: EthEvmExecutor { chain_spec, evm_config }, state } + Self { executor: EthEvmExecutor { chain_spec, evm_config }, state, prefetch_tx: None } + } + + /// Creates a new Ethereum block executor with a prefetch channel. + pub const fn new_with_prefetch_tx( + chain_spec: Arc, + evm_config: EvmConfig, + state: State, + tx: UnboundedSender, + ) -> Self { + Self { executor: EthEvmExecutor { chain_spec, evm_config }, state, prefetch_tx: Some(tx) } } #[inline] @@ -312,7 +361,7 @@ where let env = self.evm_env_for_block(&block.header, total_difficulty); let output = { let evm = self.executor.evm_config.evm_with_env(&mut self.state, env); - self.executor.execute_state_transitions(block, evm) + self.executor.execute_state_transitions(block, evm, self.prefetch_tx.clone()) }?; // 3. apply post execution changes @@ -552,7 +601,7 @@ mod tests { // attempt to execute a block without parent beacon block root, expect err let err = provider - .executor(StateProviderDatabase::new(&db)) + .executor(StateProviderDatabase::new(&db), None) .execute( ( &BlockWithSenders { @@ -583,7 +632,7 @@ mod tests { // fix header, set a gas limit header.parent_beacon_block_root = Some(B256::with_last_byte(0x69)); - let mut executor = provider.executor(StateProviderDatabase::new(&db)); + let mut executor = provider.executor(StateProviderDatabase::new(&db), None); // Now execute a block with the fixed header, ensure that it does not fail executor @@ -1322,7 +1371,7 @@ mod tests { let provider = executor_provider(chain_spec); - let executor = provider.executor(StateProviderDatabase::new(&db)); + let executor = provider.executor(StateProviderDatabase::new(&db), None); let BlockExecutionOutput { receipts, requests, .. } = executor .execute( @@ -1412,7 +1461,8 @@ mod tests { ); // Create an executor from the state provider - let executor = executor_provider(chain_spec).executor(StateProviderDatabase::new(&db)); + let executor = + executor_provider(chain_spec).executor(StateProviderDatabase::new(&db), None); // Execute the block and capture the result let exec_result = executor.execute( diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index ab3383719..3bcf13ef2 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -19,12 +19,14 @@ revm-primitives.workspace = true reth-prune-types.workspace = true reth-storage-errors.workspace = true reth-execution-types.workspace = true +reth-trie.workspace = true revm.workspace = true alloy-eips.workspace = true auto_impl.workspace = true futures-util.workspace = true parking_lot = { workspace = true, optional = true } +tokio = { workspace = true, features = ["sync", "time"] } [dev-dependencies] parking_lot.workspace = true diff --git a/crates/evm/src/either.rs b/crates/evm/src/either.rs index def3dcc65..a2abf3c59 100644 --- a/crates/evm/src/either.rs +++ b/crates/evm/src/either.rs @@ -14,6 +14,8 @@ use revm_primitives::db::Database; // re-export Either pub use futures_util::future::Either; +use reth_trie::HashedPostState; +use tokio::sync::mpsc::UnboundedSender; impl BlockExecutorProvider for Either where @@ -26,13 +28,17 @@ where type BatchExecutor + Display>> = Either, B::BatchExecutor>; - fn executor(&self, db: DB) -> Self::Executor + fn executor( + &self, + db: DB, + prefetch_tx: Option>, + ) -> Self::Executor where DB: Database + Display>, { match self { - Self::Left(a) => Either::Left(a.executor(db)), - Self::Right(b) => Either::Right(b.executor(db)), + Self::Left(a) => Either::Left(a.executor(db, prefetch_tx)), + Self::Right(b) => Either::Right(b.executor(db, prefetch_tx)), } } diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index c7af37527..cfa87fbd0 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -5,9 +5,11 @@ use reth_primitives::{ parlia::Snapshot, BlockNumber, BlockWithSenders, Header, Receipt, Request, B256, U256, }; use reth_prune_types::PruneModes; +use reth_trie::HashedPostState; use revm::db::BundleState; use revm_primitives::db::Database; use std::fmt::Display; +use tokio::sync::mpsc::UnboundedSender; #[cfg(not(feature = "std"))] use alloc::vec::Vec; @@ -187,7 +189,11 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// Creates a new executor for single block execution. /// /// This is used to execute a single block and get the changed state. - fn executor(&self, db: DB) -> Self::Executor + fn executor( + &self, + db: DB, + prefetch_rx: Option>, + ) -> Self::Executor where DB: Database + Display>; @@ -214,7 +220,11 @@ mod tests { type Executor + Display>> = TestExecutor; type BatchExecutor + Display>> = TestExecutor; - fn executor(&self, _db: DB) -> Self::Executor + fn executor( + &self, + _db: DB, + _prefetch_tx: Option>, + ) -> Self::Executor where DB: Database + Display>, { @@ -271,7 +281,7 @@ mod tests { fn test_provider() { let provider = TestExecutorProvider; let db = CacheDB::>::default(); - let executor = provider.executor(db); + let executor = provider.executor(db, None); let block = Block { header: Default::default(), body: vec![], diff --git a/crates/evm/src/noop.rs b/crates/evm/src/noop.rs index 72c0ed9d9..f5d5d5e1b 100644 --- a/crates/evm/src/noop.rs +++ b/crates/evm/src/noop.rs @@ -7,7 +7,9 @@ use reth_execution_types::ExecutionOutcome; use reth_primitives::{BlockNumber, BlockWithSenders, Header, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; +use reth_trie::HashedPostState; use revm_primitives::db::Database; +use tokio::sync::mpsc::UnboundedSender; use crate::execute::{ BatchExecutor, BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, @@ -25,7 +27,7 @@ impl BlockExecutorProvider for NoopBlockExecutorProvider { type BatchExecutor + Display>> = Self; - fn executor(&self, _: DB) -> Self::Executor + fn executor(&self, _: DB, _: Option>) -> Self::Executor where DB: Database + Display>, { diff --git a/crates/evm/src/test_utils.rs b/crates/evm/src/test_utils.rs index 76f4f6521..4772ced4c 100644 --- a/crates/evm/src/test_utils.rs +++ b/crates/evm/src/test_utils.rs @@ -9,8 +9,10 @@ use reth_execution_types::ExecutionOutcome; use reth_primitives::{BlockNumber, BlockWithSenders, Header, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; +use reth_trie::HashedPostState; use revm_primitives::db::Database; use std::{fmt::Display, sync::Arc}; +use tokio::sync::mpsc::UnboundedSender; /// A [`BlockExecutorProvider`] that returns mocked execution results. #[derive(Clone, Debug, Default)] @@ -30,7 +32,7 @@ impl BlockExecutorProvider for MockExecutorProvider { type BatchExecutor + Display>> = Self; - fn executor(&self, _: DB) -> Self::Executor + fn executor(&self, _: DB, _: Option>) -> Self::Executor where DB: Database + Display>, { diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index 892f63efa..fbf7056da 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -55,9 +55,12 @@ where .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; // Configure the executor to use the previous block's state. - let executor = self.executor.executor(StateProviderDatabase::new( - self.provider.history_by_block_number(block_number.saturating_sub(1))?, - )); + let executor = self.executor.executor( + StateProviderDatabase::new( + self.provider.history_by_block_number(block_number.saturating_sub(1))?, + ), + None, + ); trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block"); diff --git a/crates/exex/exex/src/backfill/mod.rs b/crates/exex/exex/src/backfill/mod.rs index d53d021df..0be6e8820 100644 --- a/crates/exex/exex/src/backfill/mod.rs +++ b/crates/exex/exex/src/backfill/mod.rs @@ -283,10 +283,13 @@ mod tests { // Execute the block to produce a block execution output let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec) - .executor(StateProviderDatabase::new(LatestStateProviderRef::new( - provider.tx_ref(), - provider.static_file_provider().clone(), - ))) + .executor( + StateProviderDatabase::new(LatestStateProviderRef::new( + provider.tx_ref(), + provider.static_file_provider().clone(), + )), + None, + ) .execute(BlockExecutionInput { block, total_difficulty: U256::ZERO, diff --git a/crates/optimism/evm/Cargo.toml b/crates/optimism/evm/Cargo.toml index aa365ab83..a68e98209 100644 --- a/crates/optimism/evm/Cargo.toml +++ b/crates/optimism/evm/Cargo.toml @@ -21,6 +21,7 @@ reth-execution-errors.workspace = true reth-execution-types.workspace = true reth-prune-types.workspace = true reth-consensus-common.workspace = true +reth-trie.workspace = true # Optimism reth-optimism-consensus.workspace = true @@ -33,6 +34,9 @@ revm-primitives.workspace = true thiserror.workspace = true tracing.workspace = true +# async +tokio = { workspace = true, features = ["sync", "time"] } + [dev-dependencies] reth-revm = { workspace = true, features = ["test-utils"] } diff --git a/crates/optimism/evm/src/execute.rs b/crates/optimism/evm/src/execute.rs index 1f32449b8..c4001022c 100644 --- a/crates/optimism/evm/src/execute.rs +++ b/crates/optimism/evm/src/execute.rs @@ -22,13 +22,15 @@ use reth_revm::{ state_change::post_block_balance_increments, Evm, State, }; +use reth_trie::HashedPostState; use revm::db::states::StorageSlot; use revm_primitives::{ db::{Database, DatabaseCommit}, BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, ResultAndState, }; use std::{collections::HashMap, str::FromStr, sync::Arc}; -use tracing::trace; +use tokio::sync::mpsc::UnboundedSender; +use tracing::{debug, trace}; /// Provides executors to execute regular ethereum blocks #[derive(Debug, Clone)] @@ -55,15 +57,36 @@ impl OpExecutorProvider where EvmConfig: ConfigureEvm, { - fn op_executor(&self, db: DB) -> OpBlockExecutor + fn op_executor( + &self, + db: DB, + prefetch_tx: Option>, + ) -> OpBlockExecutor where DB: Database + std::fmt::Display>, { - OpBlockExecutor::new( - self.chain_spec.clone(), - self.evm_config.clone(), - State::builder().with_database(db).with_bundle_update().without_state_clear().build(), - ) + if let Some(tx) = prefetch_tx { + OpBlockExecutor::new_with_prefetch_tx( + self.chain_spec.clone(), + self.evm_config.clone(), + State::builder() + .with_database(db) + .with_bundle_update() + .without_state_clear() + .build(), + tx, + ) + } else { + OpBlockExecutor::new( + self.chain_spec.clone(), + self.evm_config.clone(), + State::builder() + .with_database(db) + .with_bundle_update() + .without_state_clear() + .build(), + ) + } } } @@ -76,18 +99,22 @@ where type BatchExecutor + std::fmt::Display>> = OpBatchExecutor; - fn executor(&self, db: DB) -> Self::Executor + fn executor( + &self, + db: DB, + prefetch_tx: Option>, + ) -> Self::Executor where DB: Database + std::fmt::Display>, { - self.op_executor(db) + self.op_executor(db, prefetch_tx) } fn batch_executor(&self, db: DB) -> Self::BatchExecutor where DB: Database + std::fmt::Display>, { - let executor = self.op_executor(db); + let executor = self.op_executor(db, None); OpBatchExecutor { executor, batch_record: BlockBatchRecord::default(), @@ -120,6 +147,7 @@ where &self, block: &BlockWithSenders, mut evm: Evm<'_, Ext, &mut State>, + tx: Option>, ) -> Result<(Vec, u64), BlockExecutionError> where DB: Database + std::fmt::Display>, @@ -204,6 +232,13 @@ where "Executed transaction" ); + if let Some(tx) = tx.as_ref() { + let post_state = HashedPostState::from_state(state.clone()); + tx.send(post_state).unwrap_or_else(|err| { + debug!(target: "evm_executor", ?err, "Failed to send post state to prefetch channel") + }); + } + evm.db_mut().commit(state); // append gas used @@ -244,12 +279,24 @@ pub struct OpBlockExecutor { executor: OpEvmExecutor, /// The state to use for execution state: State, + /// Prefetch channel + prefetch_tx: Option>, } impl OpBlockExecutor { - /// Creates a new Ethereum block executor. + /// Creates a new Optimism block executor. pub const fn new(chain_spec: Arc, evm_config: EvmConfig, state: State) -> Self { - Self { executor: OpEvmExecutor { chain_spec, evm_config }, state } + Self { executor: OpEvmExecutor { chain_spec, evm_config }, state, prefetch_tx: None } + } + + /// Creates a new Optimism block executor with a prefetch channel. + pub const fn new_with_prefetch_tx( + chain_spec: Arc, + evm_config: EvmConfig, + state: State, + tx: UnboundedSender, + ) -> Self { + Self { executor: OpEvmExecutor { chain_spec, evm_config }, state, prefetch_tx: Some(tx) } } #[inline] @@ -304,7 +351,7 @@ where let (receipts, gas_used) = { let evm = self.executor.evm_config.evm_with_env(&mut self.state, env); - self.executor.execute_pre_and_transactions(block, evm) + self.executor.execute_pre_and_transactions(block, evm, self.prefetch_tx.clone()) }?; // 3. apply post execution changes diff --git a/crates/trie/prefetch/Cargo.toml b/crates/trie/prefetch/Cargo.toml new file mode 100644 index 000000000..1fbc385c9 --- /dev/null +++ b/crates/trie/prefetch/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "reth-trie-prefetch" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Prefetch trie storages when executing block" + +[lints] +workspace = true + +[dependencies] +# reth +reth-primitives.workspace = true +reth-db.workspace = true +reth-trie.workspace = true +reth-provider.workspace = true +reth-trie-parallel.workspace = true +reth-tasks.workspace = true +reth-execution-errors.workspace = true + +# alloy +alloy-rlp.workspace = true + +# tracing +tracing.workspace = true + +# misc +thiserror.workspace = true +derive_more.workspace = true +rayon.workspace = true + +# async +tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] } + +# `metrics` feature +reth-metrics = { workspace = true, optional = true } +metrics = { workspace = true, optional = true } + +[dev-dependencies] +# reth +reth-primitives = { workspace = true, features = ["test-utils", "arbitrary"] } +reth-provider = { workspace = true, features = ["test-utils"] } +reth-trie = { workspace = true, features = ["test-utils"] } + +# misc +rand.workspace = true +criterion = { workspace = true, features = ["async_tokio"] } +proptest.workspace = true + +[features] +default = ["metrics"] +metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics"] diff --git a/crates/trie/prefetch/src/lib.rs b/crates/trie/prefetch/src/lib.rs new file mode 100644 index 000000000..20e9d984a --- /dev/null +++ b/crates/trie/prefetch/src/lib.rs @@ -0,0 +1,14 @@ +//! Implementation of exotic trie prefetch approaches. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +pub use reth_trie_parallel::StorageRootTargets; + +/// Implementation of trie prefetch. +mod prefetch; +pub use prefetch::TriePrefetch; diff --git a/crates/trie/prefetch/src/prefetch.rs b/crates/trie/prefetch/src/prefetch.rs new file mode 100644 index 000000000..888d0573e --- /dev/null +++ b/crates/trie/prefetch/src/prefetch.rs @@ -0,0 +1,305 @@ +use rayon::prelude::*; +use reth_db::database::Database; +use reth_execution_errors::StorageRootError; +use reth_primitives::B256; +use reth_provider::{providers::ConsistentDbView, ProviderError, ProviderFactory}; +use reth_trie::{ + hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, + metrics::TrieRootMetrics, + node_iter::{TrieElement, TrieNodeIter}, + stats::TrieTracker, + trie_cursor::TrieCursorFactory, + walker::TrieWalker, + HashedPostState, HashedStorage, StorageRoot, +}; +use reth_trie_parallel::{parallel_root::ParallelStateRootError, StorageRootTargets}; +use std::{collections::HashMap, sync::Arc}; +use thiserror::Error; +use tokio::sync::{mpsc::UnboundedReceiver, oneshot::Receiver, watch}; +use tracing::{debug, trace}; + +/// Prefetch trie storage when executing transactions. +#[derive(Debug, Clone)] +pub struct TriePrefetch { + /// Cached accounts. + cached_accounts: HashMap, + /// Cached storages. + cached_storages: HashMap>, + /// State trie metrics. + #[cfg(feature = "metrics")] + metrics: TrieRootMetrics, +} + +impl Default for TriePrefetch { + fn default() -> Self { + Self::new() + } +} + +impl TriePrefetch { + /// Create new `TriePrefetch` instance. + pub fn new() -> Self { + Self { + cached_accounts: HashMap::new(), + cached_storages: HashMap::new(), + #[cfg(feature = "metrics")] + metrics: TrieRootMetrics::default(), + } + } + + /// Run the prefetching task. + pub async fn run( + &mut self, + consistent_view: Arc>>, + mut prefetch_rx: UnboundedReceiver, + mut interrupt_rx: Receiver<()>, + ) where + DB: Database + 'static, + { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let mut count = 0u64; + loop { + tokio::select! { + hashed_state = prefetch_rx.recv() => { + if let Some(hashed_state) = hashed_state { + count += 1; + + let consistent_view = Arc::clone(&consistent_view); + let hashed_state = self.deduplicate_and_update_cached(&hashed_state); + + let self_clone = Arc::new(self.clone()); + let mut shutdown_rx = shutdown_rx.clone(); + tokio::spawn(async move { + if let Err(e) = self_clone.prefetch_once::(consistent_view, hashed_state, &mut shutdown_rx).await { + debug!(target: "trie::trie_prefetch", ?e, "Error while prefetching trie storage"); + }; + }); + } + } + _ = &mut interrupt_rx => { + debug!(target: "trie::trie_prefetch", "Interrupted trie prefetch task. Processed {:?}, left {:?}", count, prefetch_rx.len()); + let _ = shutdown_tx.send(true); + return + } + } + } + } + + /// Deduplicate `hashed_state` based on `cached` and update `cached`. + fn deduplicate_and_update_cached(&mut self, hashed_state: &HashedPostState) -> HashedPostState { + let mut new_hashed_state = HashedPostState::default(); + + // deduplicate accounts if their keys are not present in storages + for (address, account) in &hashed_state.accounts { + if !hashed_state.storages.contains_key(address) && + !self.cached_accounts.contains_key(address) + { + self.cached_accounts.insert(*address, true); + new_hashed_state.accounts.insert(*address, *account); + } + } + + // deduplicate storages + for (address, storage) in &hashed_state.storages { + let cached_entry = self.cached_storages.entry(*address).or_default(); + + // Collect the keys to be added to `new_storage` after filtering + let keys_to_add: Vec<_> = storage + .storage + .iter() + .filter(|(slot, _)| !cached_entry.contains_key(*slot)) + .map(|(slot, _)| *slot) + .collect(); + + // Iterate over `keys_to_add` to update `cached_entry` and `new_storage` + let new_storage: HashMap<_, _> = keys_to_add + .into_iter() + .map(|slot| { + cached_entry.insert(slot, true); + (slot, *storage.storage.get(&slot).unwrap()) + }) + .collect(); + + if !new_storage.is_empty() { + new_hashed_state + .storages + .insert(*address, HashedStorage::from_iter(false, new_storage.into_iter())); + + if let Some(account) = hashed_state.accounts.get(address) { + new_hashed_state.accounts.insert(*address, *account); + } + } + } + + new_hashed_state + } + + /// Prefetch trie storage for the given hashed state. + pub async fn prefetch_once( + self: Arc, + consistent_view: Arc>>, + hashed_state: HashedPostState, + shutdown_rx: &mut watch::Receiver, + ) -> Result<(), TriePrefetchError> + where + DB: Database, + { + let mut tracker = TrieTracker::default(); + + let prefix_sets = hashed_state.construct_prefix_sets().freeze(); + let storage_root_targets = StorageRootTargets::new( + hashed_state.accounts.keys().copied(), + prefix_sets.storage_prefix_sets, + ); + let hashed_state_sorted = hashed_state.into_sorted(); + + trace!(target: "trie::trie_prefetch", "start prefetching trie storages"); + let mut storage_roots = storage_root_targets + .into_par_iter() + .map(|(hashed_address, prefix_set)| { + if *shutdown_rx.borrow() { + return Ok((B256::ZERO, 0)); // return early if shutdown + } + + let provider_ro = consistent_view.provider_ro()?; + + let storage_root_result = StorageRoot::new_hashed( + provider_ro.tx_ref(), + HashedPostStateCursorFactory::new(provider_ro.tx_ref(), &hashed_state_sorted), + hashed_address, + #[cfg(feature = "metrics")] + self.metrics.clone(), + ) + .with_prefix_set(prefix_set) + .prefetch(); + + Ok((hashed_address, storage_root_result?)) + }) + .collect::, ParallelStateRootError>>()?; + + if *shutdown_rx.borrow() { + return Ok(()); // return early if shutdown + } + + trace!(target: "trie::trie_prefetch", "prefetching account tries"); + let provider_ro = consistent_view.provider_ro()?; + let hashed_cursor_factory = + HashedPostStateCursorFactory::new(provider_ro.tx_ref(), &hashed_state_sorted); + let trie_cursor_factory = provider_ro.tx_ref(); + + let walker = TrieWalker::new( + trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, + prefix_sets.account_prefix_set, + ) + .with_deletions_retained(false); + let mut account_node_iter = TrieNodeIter::new( + walker, + hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, + ); + + while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? { + if *shutdown_rx.borrow() { + return Ok(()); // return early if shutdown + } + + match node { + TrieElement::Branch(_) => { + tracker.inc_branch(); + } + TrieElement::Leaf(hashed_address, _) => { + match storage_roots.remove(&hashed_address) { + Some(result) => result, + // Since we do not store all intermediate nodes in the database, there might + // be a possibility of re-adding a non-modified leaf to the hash builder. + None => StorageRoot::new_hashed( + trie_cursor_factory, + hashed_cursor_factory.clone(), + hashed_address, + #[cfg(feature = "metrics")] + self.metrics.clone(), + ) + .prefetch() + .ok() + .unwrap_or_default(), + }; + tracker.inc_leaf(); + } + } + } + + let stats = tracker.finish(); + + #[cfg(feature = "metrics")] + self.metrics.record(stats); + + trace!( + target: "trie::trie_prefetch", + duration = ?stats.duration(), + branches_added = stats.branches_added(), + leaves_added = stats.leaves_added(), + "prefetched account trie" + ); + + Ok(()) + } +} + +/// Error during prefetching trie storage. +#[derive(Error, Debug)] +pub enum TriePrefetchError { + /// Error while calculating storage root. + #[error(transparent)] + StorageRoot(#[from] StorageRootError), + /// Error while calculating parallel storage root. + #[error(transparent)] + ParallelStateRoot(#[from] ParallelStateRootError), + /// Provider error. + #[error(transparent)] + Provider(#[from] ProviderError), +} + +impl From for ProviderError { + fn from(error: TriePrefetchError) -> Self { + match error { + TriePrefetchError::Provider(error) => error, + TriePrefetchError::StorageRoot(StorageRootError::DB(error)) => Self::Database(error), + TriePrefetchError::ParallelStateRoot(error) => error.into(), + } + } +} + +#[cfg(test)] +mod tests { + use tokio::time; + + #[tokio::test] + async fn test_channel() { + let (prefetch_tx, mut prefetch_rx) = tokio::sync::mpsc::unbounded_channel(); + let (interrupt_tx, mut interrupt_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = prefetch_rx.recv() => { + println!("got message"); + time::sleep(time::Duration::from_secs(3)).await; + } + _ = &mut interrupt_rx => { + println!("left items in channel: {}" ,prefetch_rx.len()); + break; + } + } + } + }); + + for _ in 0..10 { + prefetch_tx.send(()).unwrap(); + } + + time::sleep(time::Duration::from_secs(3)).await; + + interrupt_tx.send(()).unwrap(); + + time::sleep(time::Duration::from_secs(10)).await; + } +} diff --git a/crates/trie/trie/src/state.rs b/crates/trie/trie/src/state.rs index 84bfb8fd6..4ec468819 100644 --- a/crates/trie/trie/src/state.rs +++ b/crates/trie/trie/src/state.rs @@ -16,7 +16,7 @@ use reth_db_api::{ use reth_execution_errors::StateRootError; use reth_primitives::{keccak256, Account, Address, BlockNumber, B256, U256}; use reth_trie_common::AccountProof; -use revm::db::BundleAccount; +use revm::{db::BundleAccount, primitives::AccountStatus}; use std::{ collections::{hash_map, HashMap, HashSet}, ops::RangeInclusive, @@ -62,6 +62,26 @@ impl HashedPostState { Self { accounts, storages } } + /// Initialize [`HashedPostState`] from evm state. + pub fn from_state( + changes: HashMap, + ) -> Self { + let mut this = Self::default(); + for (address, account) in changes { + let hashed_address = keccak256(address); + this.accounts.insert(hashed_address, Some(account.info.clone().into())); + + let hashed_storage = HashedStorage::from_iter( + account.status == AccountStatus::SelfDestructed, + account.storage.iter().map(|(key, value)| { + (keccak256(B256::new(key.to_be_bytes())), value.present_value) + }), + ); + this.storages.insert(hashed_address, hashed_storage); + } + this + } + /// Initialize [`HashedPostState`] from revert range. /// Iterate over state reverts in the specified block range and /// apply them to hashed state in reverse. diff --git a/crates/trie/trie/src/trie.rs b/crates/trie/trie/src/trie.rs index c444a3056..1e7d854a9 100644 --- a/crates/trie/trie/src/trie.rs +++ b/crates/trie/trie/src/trie.rs @@ -535,6 +535,56 @@ where let storage_slots_walked = stats.leaves_added() as usize; Ok((root, storage_slots_walked, trie_updates)) } + + /// Walks the hashed storage table entries for a given address to prefetch the storage tries. + /// + /// # Returns + /// + /// The number of walked entries. + pub fn prefetch(self) -> Result { + trace!(target: "trie::storage_root", hashed_address = ?self.hashed_address, "prefetching storage tries"); + + let mut hashed_storage_cursor = + self.hashed_cursor_factory.hashed_storage_cursor(self.hashed_address)?; + + // short circuit on empty storage + if hashed_storage_cursor.is_storage_empty()? { + return Ok(0) + } + + let mut tracker = TrieTracker::default(); + let trie_cursor = self.trie_cursor_factory.storage_trie_cursor(self.hashed_address)?; + let walker = TrieWalker::new(trie_cursor, self.prefix_set).with_deletions_retained(false); + + let mut storage_node_iter = TrieNodeIter::new(walker, hashed_storage_cursor); + while let Some(node) = storage_node_iter.try_next()? { + match node { + TrieElement::Branch(_) => { + tracker.inc_branch(); + } + TrieElement::Leaf(_, _) => { + tracker.inc_leaf(); + } + } + } + + let stats = tracker.finish(); + + #[cfg(feature = "metrics")] + self.metrics.record(stats); + + trace!( + target: "trie::storage_root", + hashed_address = %self.hashed_address, + duration = ?stats.duration(), + branches_added = stats.branches_added(), + leaves_added = stats.leaves_added(), + "prefetched storage tries" + ); + + let storage_slots_walked = stats.leaves_added() as usize; + Ok(storage_slots_walked) + } } #[cfg(test)] From 0d2bee2b2f5d60d2b16a4360658f7da42447ad2d Mon Sep 17 00:00:00 2001 From: Keefe Liu Date: Wed, 28 Aug 2024 10:40:47 +0800 Subject: [PATCH 2/5] optimize prefetch --- Cargo.lock | 6 +--- crates/bsc/evm/Cargo.toml | 1 - crates/bsc/evm/src/execute.rs | 18 +++++------ crates/bsc/evm/src/lib.rs | 1 + crates/ethereum/evm/Cargo.toml | 1 - crates/ethereum/evm/src/execute.rs | 16 +++++----- crates/evm/Cargo.toml | 1 - crates/evm/src/either.rs | 7 ++--- crates/evm/src/execute.rs | 9 +++--- crates/evm/src/noop.rs | 5 ++- crates/evm/src/test_utils.rs | 5 ++- crates/optimism/evm/Cargo.toml | 1 - crates/optimism/evm/src/execute.rs | 16 +++++----- crates/trie/prefetch/src/prefetch.rs | 46 ++++++++++------------------ 14 files changed, 53 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b80b93df..adc4905f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8134,7 +8134,6 @@ dependencies = [ "reth-primitives", "reth-prune-types", "reth-storage-errors", - "reth-trie", "revm", "revm-primitives", "tokio", @@ -8158,7 +8157,6 @@ dependencies = [ "reth-provider", "reth-prune-types", "reth-revm", - "reth-trie", "revm-primitives", "thiserror", "tokio", @@ -8180,7 +8178,6 @@ dependencies = [ "reth-prune-types", "reth-revm", "reth-testing-utils", - "reth-trie", "revm-primitives", "secp256k1", "serde_json", @@ -8202,7 +8199,6 @@ dependencies = [ "reth-primitives", "reth-prune-types", "reth-revm", - "reth-trie", "revm", "revm-primitives", "thiserror", @@ -9804,7 +9800,7 @@ dependencies = [ [[package]] name = "reth-trie-prefetch" -version = "1.0.3" +version = "1.0.2" dependencies = [ "alloy-rlp", "criterion", diff --git a/crates/bsc/evm/Cargo.toml b/crates/bsc/evm/Cargo.toml index bc713c816..5903dade5 100644 --- a/crates/bsc/evm/Cargo.toml +++ b/crates/bsc/evm/Cargo.toml @@ -21,7 +21,6 @@ reth-prune-types.workspace = true reth-revm.workspace = true reth-provider.workspace = true reth-bsc-consensus.workspace = true -reth-trie.workspace = true # Revm revm-primitives.workspace = true diff --git a/crates/bsc/evm/src/execute.rs b/crates/bsc/evm/src/execute.rs index 1994bb5ff..255ed80f0 100644 --- a/crates/bsc/evm/src/execute.rs +++ b/crates/bsc/evm/src/execute.rs @@ -28,10 +28,10 @@ use reth_revm::{ db::states::bundle_state::BundleRetention, Evm, State, }; -use reth_trie::HashedPostState; use revm_primitives::{ db::{Database, DatabaseCommit}, - BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, ResultAndState, TransactTo, + BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, EvmState, ResultAndState, + TransactTo, }; use std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Instant}; use tokio::sync::mpsc::UnboundedSender; @@ -85,7 +85,7 @@ where fn bsc_executor( &self, db: DB, - prefetch_tx: Option>, + prefetch_tx: Option>, ) -> BscBlockExecutor where DB: Database + std::fmt::Display>, @@ -133,7 +133,7 @@ where fn executor( &self, db: DB, - prefetch_tx: Option>, + prefetch_tx: Option>, ) -> Self::Executor where DB: Database + std::fmt::Display>, @@ -187,7 +187,7 @@ where &self, block: &BlockWithSenders, mut evm: Evm<'_, Ext, &mut State>, - tx: Option>, + tx: Option>, ) -> Result<(Vec, Vec, u64), BlockExecutionError> where DB: Database + std::fmt::Display>, @@ -241,8 +241,8 @@ where })?; if let Some(tx) = tx.as_ref() { - let post_state = HashedPostState::from_state(state.clone()); - tx.send(post_state).unwrap_or_else(|err| { + // let post_state = HashedPostState::from_state(state.clone()); + tx.send(state.clone()).unwrap_or_else(|err| { debug!(target: "evm_executor", ?err, "Failed to send post state to prefetch channel") }); } @@ -288,7 +288,7 @@ pub struct BscBlockExecutor { /// Parlia consensus instance pub(crate) parlia: Arc, /// Prefetch channel - prefetch_tx: Option>, + prefetch_tx: Option>, } impl BscBlockExecutor { @@ -318,7 +318,7 @@ impl BscBlockExecutor { parlia_config: ParliaConfig, state: State, provider: P, - tx: UnboundedSender, + tx: UnboundedSender, ) -> Self { let parlia = Arc::new(Parlia::new(Arc::clone(&chain_spec), parlia_config)); let shared_provider = Arc::new(provider); diff --git a/crates/bsc/evm/src/lib.rs b/crates/bsc/evm/src/lib.rs index 4344f5944..df075e5cb 100644 --- a/crates/bsc/evm/src/lib.rs +++ b/crates/bsc/evm/src/lib.rs @@ -2,6 +2,7 @@ // TODO: doc #![allow(missing_docs)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] // The `bsc` feature must be enabled to use this crate. #![cfg(feature = "bsc")] diff --git a/crates/ethereum/evm/Cargo.toml b/crates/ethereum/evm/Cargo.toml index 0beeca65a..218e18e64 100644 --- a/crates/ethereum/evm/Cargo.toml +++ b/crates/ethereum/evm/Cargo.toml @@ -20,7 +20,6 @@ reth-revm.workspace = true reth-ethereum-consensus.workspace = true reth-prune-types.workspace = true reth-execution-types.workspace = true -reth-trie.workspace = true # Ethereum revm-primitives.workspace = true diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index aae646798..2a22d128f 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -28,10 +28,9 @@ use reth_revm::{ state_change::{apply_blockhashes_update, post_block_balance_increments}, Evm, State, }; -use reth_trie::HashedPostState; use revm_primitives::{ db::{Database, DatabaseCommit}, - BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, ResultAndState, + BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, EvmState, ResultAndState, }; use tokio::sync::mpsc::UnboundedSender; use tracing::debug; @@ -71,7 +70,7 @@ where fn eth_executor( &self, db: DB, - prefetch_tx: Option>, + prefetch_tx: Option>, ) -> EthBlockExecutor where DB: Database>, @@ -114,7 +113,7 @@ where fn executor( &self, db: DB, - prefetch_tx: Option>, + prefetch_tx: Option>, ) -> Self::Executor where DB: Database + Display>, @@ -170,7 +169,7 @@ where &self, block: &BlockWithSenders, mut evm: Evm<'_, Ext, &mut State>, - tx: Option>, + tx: Option>, ) -> Result where DB: Database, @@ -227,8 +226,7 @@ where })?; if let Some(tx) = tx.as_ref() { - let post_state = HashedPostState::from_state(state.clone()); - tx.send(post_state).unwrap_or_else(|err| { + tx.send(state.clone()).unwrap_or_else(|err| { debug!(target: "evm_executor", ?err, "Failed to send post state to prefetch channel") }); } @@ -288,7 +286,7 @@ pub struct EthBlockExecutor { /// The state to use for execution state: State, /// Prefetch channel - prefetch_tx: Option>, + prefetch_tx: Option>, } impl EthBlockExecutor { @@ -302,7 +300,7 @@ impl EthBlockExecutor { chain_spec: Arc, evm_config: EvmConfig, state: State, - tx: UnboundedSender, + tx: UnboundedSender, ) -> Self { Self { executor: EthEvmExecutor { chain_spec, evm_config }, state, prefetch_tx: Some(tx) } } diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index 3bcf13ef2..f7ad688d8 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -19,7 +19,6 @@ revm-primitives.workspace = true reth-prune-types.workspace = true reth-storage-errors.workspace = true reth-execution-types.workspace = true -reth-trie.workspace = true revm.workspace = true alloy-eips.workspace = true diff --git a/crates/evm/src/either.rs b/crates/evm/src/either.rs index a2abf3c59..d1a46c5ca 100644 --- a/crates/evm/src/either.rs +++ b/crates/evm/src/either.rs @@ -10,12 +10,11 @@ use reth_execution_types::ExecutionOutcome; use reth_primitives::{BlockNumber, BlockWithSenders, Header, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; -use revm_primitives::db::Database; +use revm_primitives::{db::Database, EvmState}; +use tokio::sync::mpsc::UnboundedSender; // re-export Either pub use futures_util::future::Either; -use reth_trie::HashedPostState; -use tokio::sync::mpsc::UnboundedSender; impl BlockExecutorProvider for Either where @@ -31,7 +30,7 @@ where fn executor( &self, db: DB, - prefetch_tx: Option>, + prefetch_tx: Option>, ) -> Self::Executor where DB: Database + Display>, diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index cfa87fbd0..80810f38d 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -5,7 +5,6 @@ use reth_primitives::{ parlia::Snapshot, BlockNumber, BlockWithSenders, Header, Receipt, Request, B256, U256, }; use reth_prune_types::PruneModes; -use reth_trie::HashedPostState; use revm::db::BundleState; use revm_primitives::db::Database; use std::fmt::Display; @@ -13,10 +12,10 @@ use tokio::sync::mpsc::UnboundedSender; #[cfg(not(feature = "std"))] use alloc::vec::Vec; -use std::collections::HashMap; - pub use reth_execution_errors::{BlockExecutionError, BlockValidationError}; pub use reth_storage_errors::provider::ProviderError; +use revm_primitives::EvmState; +use std::collections::HashMap; /// A general purpose executor trait that executes an input (e.g. block) and produces an output /// (e.g. state changes and receipts). @@ -192,7 +191,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { fn executor( &self, db: DB, - prefetch_rx: Option>, + prefetch_rx: Option>, ) -> Self::Executor where DB: Database + Display>; @@ -223,7 +222,7 @@ mod tests { fn executor( &self, _db: DB, - _prefetch_tx: Option>, + _prefetch_tx: Option>, ) -> Self::Executor where DB: Database + Display>, diff --git a/crates/evm/src/noop.rs b/crates/evm/src/noop.rs index f5d5d5e1b..dbf051af9 100644 --- a/crates/evm/src/noop.rs +++ b/crates/evm/src/noop.rs @@ -7,8 +7,7 @@ use reth_execution_types::ExecutionOutcome; use reth_primitives::{BlockNumber, BlockWithSenders, Header, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; -use reth_trie::HashedPostState; -use revm_primitives::db::Database; +use revm_primitives::{db::Database, EvmState}; use tokio::sync::mpsc::UnboundedSender; use crate::execute::{ @@ -27,7 +26,7 @@ impl BlockExecutorProvider for NoopBlockExecutorProvider { type BatchExecutor + Display>> = Self; - fn executor(&self, _: DB, _: Option>) -> Self::Executor + fn executor(&self, _: DB, _: Option>) -> Self::Executor where DB: Database + Display>, { diff --git a/crates/evm/src/test_utils.rs b/crates/evm/src/test_utils.rs index 4772ced4c..f93846f2b 100644 --- a/crates/evm/src/test_utils.rs +++ b/crates/evm/src/test_utils.rs @@ -9,8 +9,7 @@ use reth_execution_types::ExecutionOutcome; use reth_primitives::{BlockNumber, BlockWithSenders, Header, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; -use reth_trie::HashedPostState; -use revm_primitives::db::Database; +use revm_primitives::{db::Database, EvmState}; use std::{fmt::Display, sync::Arc}; use tokio::sync::mpsc::UnboundedSender; @@ -32,7 +31,7 @@ impl BlockExecutorProvider for MockExecutorProvider { type BatchExecutor + Display>> = Self; - fn executor(&self, _: DB, _: Option>) -> Self::Executor + fn executor(&self, _: DB, _: Option>) -> Self::Executor where DB: Database + Display>, { diff --git a/crates/optimism/evm/Cargo.toml b/crates/optimism/evm/Cargo.toml index a68e98209..4f9d8225f 100644 --- a/crates/optimism/evm/Cargo.toml +++ b/crates/optimism/evm/Cargo.toml @@ -21,7 +21,6 @@ reth-execution-errors.workspace = true reth-execution-types.workspace = true reth-prune-types.workspace = true reth-consensus-common.workspace = true -reth-trie.workspace = true # Optimism reth-optimism-consensus.workspace = true diff --git a/crates/optimism/evm/src/execute.rs b/crates/optimism/evm/src/execute.rs index c4001022c..33e063d4a 100644 --- a/crates/optimism/evm/src/execute.rs +++ b/crates/optimism/evm/src/execute.rs @@ -22,11 +22,10 @@ use reth_revm::{ state_change::post_block_balance_increments, Evm, State, }; -use reth_trie::HashedPostState; use revm::db::states::StorageSlot; use revm_primitives::{ db::{Database, DatabaseCommit}, - BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, ResultAndState, + BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, EvmState, ResultAndState, }; use std::{collections::HashMap, str::FromStr, sync::Arc}; use tokio::sync::mpsc::UnboundedSender; @@ -60,7 +59,7 @@ where fn op_executor( &self, db: DB, - prefetch_tx: Option>, + prefetch_tx: Option>, ) -> OpBlockExecutor where DB: Database + std::fmt::Display>, @@ -102,7 +101,7 @@ where fn executor( &self, db: DB, - prefetch_tx: Option>, + prefetch_tx: Option>, ) -> Self::Executor where DB: Database + std::fmt::Display>, @@ -147,7 +146,7 @@ where &self, block: &BlockWithSenders, mut evm: Evm<'_, Ext, &mut State>, - tx: Option>, + tx: Option>, ) -> Result<(Vec, u64), BlockExecutionError> where DB: Database + std::fmt::Display>, @@ -233,8 +232,7 @@ where ); if let Some(tx) = tx.as_ref() { - let post_state = HashedPostState::from_state(state.clone()); - tx.send(post_state).unwrap_or_else(|err| { + tx.send(state.clone()).unwrap_or_else(|err| { debug!(target: "evm_executor", ?err, "Failed to send post state to prefetch channel") }); } @@ -280,7 +278,7 @@ pub struct OpBlockExecutor { /// The state to use for execution state: State, /// Prefetch channel - prefetch_tx: Option>, + prefetch_tx: Option>, } impl OpBlockExecutor { @@ -294,7 +292,7 @@ impl OpBlockExecutor { chain_spec: Arc, evm_config: EvmConfig, state: State, - tx: UnboundedSender, + tx: UnboundedSender, ) -> Self { Self { executor: OpEvmExecutor { chain_spec, evm_config }, state, prefetch_tx: Some(tx) } } diff --git a/crates/trie/prefetch/src/prefetch.rs b/crates/trie/prefetch/src/prefetch.rs index 888d0573e..2d9071b93 100644 --- a/crates/trie/prefetch/src/prefetch.rs +++ b/crates/trie/prefetch/src/prefetch.rs @@ -1,7 +1,7 @@ use rayon::prelude::*; use reth_db::database::Database; use reth_execution_errors::StorageRootError; -use reth_primitives::B256; +use reth_primitives::{revm_primitives::EvmState, B256}; use reth_provider::{providers::ConsistentDbView, ProviderError, ProviderFactory}; use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, @@ -15,7 +15,10 @@ use reth_trie::{ use reth_trie_parallel::{parallel_root::ParallelStateRootError, StorageRootTargets}; use std::{collections::HashMap, sync::Arc}; use thiserror::Error; -use tokio::sync::{mpsc::UnboundedReceiver, oneshot::Receiver, watch}; +use tokio::{ + sync::{mpsc::UnboundedReceiver, oneshot::Receiver}, + task::JoinSet, +}; use tracing::{debug, trace}; /// Prefetch trie storage when executing transactions. @@ -51,34 +54,31 @@ impl TriePrefetch { pub async fn run( &mut self, consistent_view: Arc>>, - mut prefetch_rx: UnboundedReceiver, + mut prefetch_rx: UnboundedReceiver, mut interrupt_rx: Receiver<()>, ) where DB: Database + 'static, { - let (shutdown_tx, shutdown_rx) = watch::channel(false); - let mut count = 0u64; + let mut join_set = JoinSet::new(); + loop { tokio::select! { - hashed_state = prefetch_rx.recv() => { - if let Some(hashed_state) = hashed_state { - count += 1; - + state = prefetch_rx.recv() => { + if let Some(state) = state { let consistent_view = Arc::clone(&consistent_view); - let hashed_state = self.deduplicate_and_update_cached(&hashed_state); + let hashed_state = self.deduplicate_and_update_cached(state); let self_clone = Arc::new(self.clone()); - let mut shutdown_rx = shutdown_rx.clone(); - tokio::spawn(async move { - if let Err(e) = self_clone.prefetch_once::(consistent_view, hashed_state, &mut shutdown_rx).await { + join_set.spawn(async move { + if let Err(e) = self_clone.prefetch_once::(consistent_view, hashed_state).await { debug!(target: "trie::trie_prefetch", ?e, "Error while prefetching trie storage"); }; }); } } _ = &mut interrupt_rx => { - debug!(target: "trie::trie_prefetch", "Interrupted trie prefetch task. Processed {:?}, left {:?}", count, prefetch_rx.len()); - let _ = shutdown_tx.send(true); + debug!(target: "trie::trie_prefetch", "Interrupted trie prefetch task. Unprocessed tx {:?}", prefetch_rx.len()); + join_set.abort_all(); return } } @@ -86,7 +86,8 @@ impl TriePrefetch { } /// Deduplicate `hashed_state` based on `cached` and update `cached`. - fn deduplicate_and_update_cached(&mut self, hashed_state: &HashedPostState) -> HashedPostState { + fn deduplicate_and_update_cached(&mut self, state: EvmState) -> HashedPostState { + let hashed_state = HashedPostState::from_state(state); let mut new_hashed_state = HashedPostState::default(); // deduplicate accounts if their keys are not present in storages @@ -139,7 +140,6 @@ impl TriePrefetch { self: Arc, consistent_view: Arc>>, hashed_state: HashedPostState, - shutdown_rx: &mut watch::Receiver, ) -> Result<(), TriePrefetchError> where DB: Database, @@ -157,10 +157,6 @@ impl TriePrefetch { let mut storage_roots = storage_root_targets .into_par_iter() .map(|(hashed_address, prefix_set)| { - if *shutdown_rx.borrow() { - return Ok((B256::ZERO, 0)); // return early if shutdown - } - let provider_ro = consistent_view.provider_ro()?; let storage_root_result = StorageRoot::new_hashed( @@ -177,10 +173,6 @@ impl TriePrefetch { }) .collect::, ParallelStateRootError>>()?; - if *shutdown_rx.borrow() { - return Ok(()); // return early if shutdown - } - trace!(target: "trie::trie_prefetch", "prefetching account tries"); let provider_ro = consistent_view.provider_ro()?; let hashed_cursor_factory = @@ -198,10 +190,6 @@ impl TriePrefetch { ); while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? { - if *shutdown_rx.borrow() { - return Ok(()); // return early if shutdown - } - match node { TrieElement::Branch(_) => { tracker.inc_branch(); From 9a1d03a4e37af427e576dac73b60f80b48ca853d Mon Sep 17 00:00:00 2001 From: Keefe Liu Date: Thu, 29 Aug 2024 16:10:38 +0800 Subject: [PATCH 3/5] remove prefetch feature --- Makefile | 8 +- bin/reth/Cargo.toml | 2 - crates/blockchain-tree/Cargo.toml | 3 +- crates/blockchain-tree/src/blockchain_tree.rs | 12 +++ crates/blockchain-tree/src/chain.rs | 73 +++++++++++-------- crates/cli/commands/src/node.rs | 6 ++ crates/node/builder/src/launch/common.rs | 18 +++-- crates/node/core/src/node_config.rs | 4 + 8 files changed, 83 insertions(+), 43 deletions(-) diff --git a/Makefile b/Makefile index 98aed53b4..6488b62c9 100644 --- a/Makefile +++ b/Makefile @@ -60,7 +60,7 @@ install-op: ## Build and install the op-reth binary under `~/.cargo/bin`. .PHONY: install-bsc install-bsc: ## Build and install the bsc-reth binary under `~/.cargo/bin`. cargo install --path bin/reth --bin bsc-reth --force --locked \ - --features "bsc,prefetch,$(FEATURES)" \ + --features "bsc,$(FEATURES)" \ --profile "$(PROFILE)" \ $(CARGO_INSTALL_EXTRA_FLAGS) @@ -74,7 +74,7 @@ build-op: ## Build the op-reth binary into `target` directory. .PHONY: build-bsc build-bsc: ## Build the bsc-reth binary into `target` directory. - cargo build --bin bsc-reth --features "bsc,prefetch,$(FEATURES)" --profile "$(PROFILE)" + cargo build --bin bsc-reth --features "bsc,$(FEATURES)" --profile "$(PROFILE)" # Builds the reth binary natively. build-native-%: @@ -84,7 +84,7 @@ op-build-native-%: cargo build --bin op-reth --target $* --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)" bsc-build-native-%: - cargo build --bin bsc-reth --target $* --features "bsc,prefetch,$(FEATURES)" --profile "$(PROFILE)" + cargo build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)" # The following commands use `cross` to build a cross-compile. # @@ -127,7 +127,7 @@ op-build-%: bsc-build-%: RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \ - cross build --bin bsc-reth --target $* --features "bsc,prefetch,$(FEATURES)" --profile "$(PROFILE)" + cross build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)" # Unfortunately we can't easily use cross to build for Darwin because of licensing issues. # If we wanted to, we would need to build a custom Docker image with the SDK available. diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 7fac4b833..300e74b4f 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -164,8 +164,6 @@ bsc = [ "reth-beacon-consensus/bsc", ] -prefetch = ["reth-blockchain-tree/prefetch"] - # no-op feature flag for switching between the `optimism` and default functionality in CI matrices ethereum = [] diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index 6b2336485..cd6c597c8 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -28,7 +28,7 @@ reth-trie = { workspace = true, features = ["metrics"] } reth-trie-parallel = { workspace = true, features = ["parallel"] } reth-network.workspace = true reth-consensus.workspace = true -reth-trie-prefetch = { workspace = true, optional = true } +reth-trie-prefetch.workspace = true # common parking_lot.workspace = true @@ -60,4 +60,3 @@ alloy-genesis.workspace = true [features] test-utils = [] optimism = ["reth-primitives/optimism", "reth-provider/optimism"] -prefetch = ["dep:reth-trie-prefetch"] diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 49bc97ebe..5228ae984 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -72,6 +72,8 @@ pub struct BlockchainTree { sync_metrics_tx: Option, /// Metrics for the blockchain tree. metrics: TreeMetrics, + /// Whether to enable prefetch when execute block + enable_prefetch: bool, } impl BlockchainTree { @@ -143,6 +145,7 @@ where canon_state_notification_sender, sync_metrics_tx: None, metrics: Default::default(), + enable_prefetch: false, }) } @@ -167,6 +170,12 @@ where self } + /// Enable prefetch. + pub fn enable_prefetch(mut self) -> Self { + self.enable_prefetch = true; + self + } + /// Check if the block is known to blockchain tree or database and return its status. /// /// Function will check: @@ -434,6 +443,7 @@ where &self.externals, block_attachment, block_validation_kind, + self.enable_prefetch, )?; self.insert_chain(chain); @@ -491,6 +501,7 @@ where canonical_fork, block_attachment, block_validation_kind, + self.enable_prefetch, )?; self.state.block_indices.insert_non_fork_block(block_number, block_hash, chain_id); @@ -505,6 +516,7 @@ where canonical_fork, &self.externals, block_validation_kind, + self.enable_prefetch, )?; self.insert_chain(chain); BlockAttachment::HistoricalFork diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index a769d176a..8a51fdbc9 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -22,16 +22,14 @@ use reth_provider::{ providers::{BundleStateProvider, ConsistentDbView}, FullExecutionDataProvider, ProviderError, StateRootProvider, }; -use reth_revm::database::StateProviderDatabase; +use reth_revm::{database::StateProviderDatabase, primitives::EvmState}; use reth_trie::updates::TrieUpdates; use reth_trie_parallel::parallel_root::ParallelStateRoot; -#[cfg(feature = "prefetch")] use reth_trie_prefetch::TriePrefetch; -#[cfg(feature = "prefetch")] -use std::sync::Arc; use std::{ collections::{BTreeMap, HashMap}, ops::{Deref, DerefMut}, + sync::Arc, time::Instant, }; @@ -79,6 +77,7 @@ impl AppendableChain { externals: &TreeExternals, block_attachment: BlockAttachment, block_validation_kind: BlockValidationKind, + enable_prefetch: bool, ) -> Result where DB: Database + Clone + 'static, @@ -102,6 +101,7 @@ impl AppendableChain { externals, block_attachment, block_validation_kind, + enable_prefetch, )?; Ok(Self { chain: Chain::new(vec![block], bundle_state, trie_updates) }) @@ -118,6 +118,7 @@ impl AppendableChain { canonical_fork: ForkBlock, externals: &TreeExternals, block_validation_kind: BlockValidationKind, + enable_prefetch: bool, ) -> Result where DB: Database + Clone + 'static, @@ -149,6 +150,7 @@ impl AppendableChain { externals, BlockAttachment::HistoricalFork, block_validation_kind, + enable_prefetch, )?; // extending will also optimize few things, mostly related to selfdestruct and wiping of // storage. @@ -182,6 +184,7 @@ impl AppendableChain { externals: &TreeExternals, block_attachment: BlockAttachment, block_validation_kind: BlockValidationKind, + enable_prefetch: bool, ) -> Result<(ExecutionOutcome, Option), BlockExecutionError> where EDP: FullExecutionDataProvider, @@ -212,35 +215,15 @@ impl AppendableChain { let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider); - #[cfg(feature = "prefetch")] - let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel(); + let (prefetch_tx, interrupt_tx) = + if enable_prefetch { Self::setup_prefetch(externals)? } else { (None, None) }; let db = StateProviderDatabase::new(&provider); - #[cfg(feature = "prefetch")] - let executor = externals.executor_factory.executor(db, Some(prefetch_tx)); - #[cfg(not(feature = "prefetch"))] - let executor = externals.executor_factory.executor(db, None); + let executor = externals.executor_factory.executor(db, prefetch_tx); let block_hash = block.hash(); let block = block.unseal(); - #[cfg(feature = "prefetch")] - let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel(); - - #[cfg(feature = "prefetch")] - { - let mut trie_prefetch = TriePrefetch::new(); - let consistent_view = Arc::new(ConsistentDbView::new_with_latest_tip( - externals.provider_factory.clone(), - )?); - - tokio::spawn({ - async move { - trie_prefetch.run::(consistent_view, prefetch_rx, interrupt_rx).await; - } - }); - } - let state = executor.execute((&block, U256::MAX, ancestor_blocks).into())?; let BlockExecutionOutput { state, receipts, requests, .. } = state; externals @@ -251,8 +234,9 @@ impl AppendableChain { ExecutionOutcome::new(state, receipts.into(), block.number, vec![requests.into()]); // stop the prefetch task. - #[cfg(feature = "prefetch")] - let _ = interrupt_tx.send(()); + if let Some(interrupt_tx) = interrupt_tx { + let _ = interrupt_tx.send(()); + } // check state root if the block extends the canonical chain __and__ if state root // validation was requested. @@ -314,6 +298,7 @@ impl AppendableChain { canonical_fork: ForkBlock, block_attachment: BlockAttachment, block_validation_kind: BlockValidationKind, + enable_prefetch: bool, ) -> Result<(), InsertBlockErrorKind> where DB: Database + Clone + 'static, @@ -339,10 +324,40 @@ impl AppendableChain { externals, block_attachment, block_validation_kind, + enable_prefetch, )?; // extend the state. self.chain.append_block(block, block_state); Ok(()) } + + fn setup_prefetch( + externals: &TreeExternals, + ) -> Result< + ( + Option>, + Option>, + ), + BlockExecutionError, + > + where + DB: Database + Clone + 'static, + E: BlockExecutorProvider, + { + let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel(); + let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel(); + + let mut trie_prefetch = TriePrefetch::new(); + let consistent_view = + Arc::new(ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())?); + + tokio::spawn({ + async move { + trie_prefetch.run::(consistent_view, prefetch_rx, interrupt_rx).await; + } + }); + + Ok((Some(prefetch_tx), Some(interrupt_tx))) + } } diff --git a/crates/cli/commands/src/node.rs b/crates/cli/commands/src/node.rs index 233a7d5b3..46bd42729 100644 --- a/crates/cli/commands/src/node.rs +++ b/crates/cli/commands/src/node.rs @@ -106,6 +106,10 @@ pub struct NodeCommand { /// Additional cli arguments #[command(flatten, next_help_heading = "Extension")] pub ext: Ext, + + /// Enable prefetch when executing block + #[arg(long, default_value_t = false)] + pub enable_prefetch: bool, } impl NodeCommand { @@ -152,6 +156,7 @@ impl NodeCommand { dev, pruning, ext, + enable_prefetch, } = self; // set up node config @@ -169,6 +174,7 @@ impl NodeCommand { db, dev, pruning, + enable_prefetch, }; // Register the prometheus recorder before creating the database, diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index a2b281861..fc6e76102 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -640,12 +640,18 @@ where consensus.clone(), components.block_executor().clone(), ); - let tree = BlockchainTree::new(tree_externals, *self.tree_config(), self.prune_modes())? - .with_sync_metrics_tx(self.sync_metrics_tx()) - // Note: This is required because we need to ensure that both the components and the - // tree are using the same channel for canon state notifications. This will be removed - // once the Blockchain provider no longer depends on an instance of the tree - .with_canon_state_notification_sender(self.canon_state_notification_sender()); + let mut tree = + BlockchainTree::new(tree_externals, *self.tree_config(), self.prune_modes())? + .with_sync_metrics_tx(self.sync_metrics_tx()) + // Note: This is required because we need to ensure that both the components and the + // tree are using the same channel for canon state notifications. This will be + // removed once the Blockchain provider no longer depends on an + // instance of the tree + .with_canon_state_notification_sender(self.canon_state_notification_sender()); + + if self.node_config().enable_prefetch { + tree = tree.enable_prefetch(); + } let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree)); diff --git a/crates/node/core/src/node_config.rs b/crates/node/core/src/node_config.rs index 1f5bea21b..b0d895bec 100644 --- a/crates/node/core/src/node_config.rs +++ b/crates/node/core/src/node_config.rs @@ -145,6 +145,9 @@ pub struct NodeConfig { /// All pruning related arguments pub pruning: PruningArgs, + + /// Enable prefetch when executing blocks. + pub enable_prefetch: bool, } impl NodeConfig { @@ -436,6 +439,7 @@ impl Default for NodeConfig { dev: DevArgs::default(), pruning: PruningArgs::default(), datadir: DatadirArgs::default(), + enable_prefetch: false, } } } From adc415c20d76942949201eba26254f734ba44164 Mon Sep 17 00:00:00 2001 From: Keefe Liu Date: Thu, 29 Aug 2024 17:19:07 +0800 Subject: [PATCH 4/5] fix lint --- crates/blockchain-tree/src/blockchain_tree.rs | 2 +- crates/blockchain-tree/src/chain.rs | 30 +++++++++++-------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 5228ae984..082856f35 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -171,7 +171,7 @@ where } /// Enable prefetch. - pub fn enable_prefetch(mut self) -> Self { + pub const fn enable_prefetch(mut self) -> Self { self.enable_prefetch = true; self } diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index 8a51fdbc9..3440b2cb1 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -69,6 +69,7 @@ impl AppendableChain { /// /// if [`BlockValidationKind::Exhaustive`] is specified, the method will verify the state root /// of the block. + #[allow(clippy::too_many_arguments)] pub fn new_canonical_fork( block: SealedBlockWithSenders, parent_header: &SealedHeader, @@ -110,6 +111,7 @@ impl AppendableChain { /// Create a new chain that forks off of an existing sidechain. /// /// This differs from [`AppendableChain::new_canonical_fork`] in that this starts a new fork. + #[allow(clippy::too_many_arguments)] pub(crate) fn new_chain_fork( &self, block: SealedBlockWithSenders, @@ -176,6 +178,7 @@ impl AppendableChain { /// - [`BlockAttachment`] represents if the block extends the canonical chain, and thus we can /// cache the trie state updates. /// - [`BlockValidationKind`] determines if the state root __should__ be validated. + #[allow(clippy::too_many_arguments)] fn validate_and_execute( block: SealedBlockWithSenders, parent_block: &SealedHeader, @@ -216,7 +219,7 @@ impl AppendableChain { let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider); let (prefetch_tx, interrupt_tx) = - if enable_prefetch { Self::setup_prefetch(externals)? } else { (None, None) }; + if enable_prefetch { Self::setup_prefetch(externals) } else { (None, None) }; let db = StateProviderDatabase::new(&provider); let executor = externals.executor_factory.executor(db, prefetch_tx); @@ -334,13 +337,10 @@ impl AppendableChain { fn setup_prefetch( externals: &TreeExternals, - ) -> Result< - ( - Option>, - Option>, - ), - BlockExecutionError, - > + ) -> ( + Option>, + Option>, + ) where DB: Database + Clone + 'static, E: BlockExecutorProvider, @@ -349,15 +349,21 @@ impl AppendableChain { let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel(); let mut trie_prefetch = TriePrefetch::new(); - let consistent_view = - Arc::new(ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())?); + let consistent_view = if let Ok(view) = + ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone()) + { + view + } else { + tracing::debug!("Failed to create consistent view for trie prefetch"); + return (None, None) + }; tokio::spawn({ async move { - trie_prefetch.run::(consistent_view, prefetch_rx, interrupt_rx).await; + trie_prefetch.run::(Arc::new(consistent_view), prefetch_rx, interrupt_rx).await; } }); - Ok((Some(prefetch_tx), Some(interrupt_tx))) + (Some(prefetch_tx), Some(interrupt_tx)) } } From 2c2cb097834f062258cf999fd8964ad04eccb294 Mon Sep 17 00:00:00 2001 From: Keefe Liu Date: Mon, 2 Sep 2024 17:54:16 +0800 Subject: [PATCH 5/5] fix build commands --- Makefile | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 6488b62c9..0cf02349f 100644 --- a/Makefile +++ b/Makefile @@ -53,14 +53,14 @@ install: ## Build and install the reth binary under `~/.cargo/bin`. .PHONY: install-op install-op: ## Build and install the op-reth binary under `~/.cargo/bin`. cargo install --path bin/reth --bin op-reth --force --locked \ - --features "optimism,opbnb,$(FEATURES)" \ + --features "optimism opbnb $(FEATURES)" \ --profile "$(PROFILE)" \ $(CARGO_INSTALL_EXTRA_FLAGS) .PHONY: install-bsc install-bsc: ## Build and install the bsc-reth binary under `~/.cargo/bin`. cargo install --path bin/reth --bin bsc-reth --force --locked \ - --features "bsc,$(FEATURES)" \ + --features "bsc $(FEATURES)" \ --profile "$(PROFILE)" \ $(CARGO_INSTALL_EXTRA_FLAGS) @@ -70,21 +70,21 @@ build: ## Build the reth binary into `target` directory. .PHONY: build-op build-op: ## Build the op-reth binary into `target` directory. - cargo build --bin op-reth --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)" + cargo build --bin op-reth --features "optimism opbnb $(FEATURES)" --profile "$(PROFILE)" .PHONY: build-bsc build-bsc: ## Build the bsc-reth binary into `target` directory. - cargo build --bin bsc-reth --features "bsc,$(FEATURES)" --profile "$(PROFILE)" + cargo build --bin bsc-reth --features "bsc $(FEATURES)" --profile "$(PROFILE)" # Builds the reth binary natively. build-native-%: cargo build --bin reth --target $* --features "$(FEATURES)" --profile "$(PROFILE)" op-build-native-%: - cargo build --bin op-reth --target $* --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)" + cargo build --bin op-reth --target $* --features "optimism opbnb $(FEATURES)" --profile "$(PROFILE)" bsc-build-native-%: - cargo build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)" + cargo build --bin bsc-reth --target $* --features "bsc $(FEATURES)" --profile "$(PROFILE)" # The following commands use `cross` to build a cross-compile. # @@ -123,11 +123,11 @@ build-%: op-build-%: RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \ - cross build --bin op-reth --target $* --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)" + cross build --bin op-reth --target $* --features "optimism opbnb $(FEATURES)" --profile "$(PROFILE)" bsc-build-%: RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \ - cross build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)" + cross build --bin bsc-reth --target $* --features "bsc $(FEATURES)" --profile "$(PROFILE)" # Unfortunately we can't easily use cross to build for Darwin because of licensing issues. # If we wanted to, we would need to build a custom Docker image with the SDK available.