diff --git a/Cargo.lock b/Cargo.lock index 91fd00c99..19e1816ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6647,6 +6647,18 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick_cache" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec932c60e6faf77dc6601ea149a23d821598b019b450bb1d98fe89c0301c0b61" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.14.5", + "parking_lot 0.12.3", +] + [[package]] name = "quinn" version = "0.11.2" @@ -7269,9 +7281,11 @@ dependencies = [ "alloy-genesis", "aquamarine", "assert_matches", + "lazy_static", "linked_hash_set", "metrics", "parking_lot 0.12.3", + "quick_cache", "reth-blockchain-tree-api", "reth-chainspec", "reth-consensus", @@ -8718,6 +8732,7 @@ dependencies = [ "reth-auto-seal-consensus", "reth-basic-payload-builder", "reth-beacon-consensus", + "reth-blockchain-tree", "reth-chainspec", "reth-consensus", "reth-db", diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index b3679677a..99c0e8fe7 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -38,6 +38,10 @@ tokio = { workspace = true, features = ["macros", "sync"] } reth-metrics = { workspace = true, features = ["common"] } metrics.workspace = true +# cache +quick_cache = "0.6.2" +lazy_static = "1.4.0" + # misc aquamarine.workspace = true linked_hash_set.workspace = true diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 501ce4381..d265fdc8d 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -1,6 +1,7 @@ //! Implementation of [`BlockchainTree`] use crate::{ + canonical_cache::apply_bundle_state, metrics::{MakeCanonicalAction, MakeCanonicalDurationsRecorder, TreeMetrics}, state::{BlockchainId, TreeState}, AppendableChain, BlockIndices, BlockchainTreeConfig, ExecutionData, TreeExternals, @@ -1261,6 +1262,7 @@ where }; recorder.record_relative(MakeCanonicalAction::RetrieveStateTrieUpdates); + let cloned_bundle = state.bundle.clone(); let provider_rw = self.externals.provider_factory.provider_rw()?; provider_rw .append_blocks_with_state( @@ -1274,6 +1276,9 @@ where provider_rw.commit()?; recorder.record_relative(MakeCanonicalAction::CommitCanonicalChainToDatabase); + // update global canonical cache + apply_bundle_state(cloned_bundle); + Ok(()) } diff --git a/crates/blockchain-tree/src/canonical_cache.rs b/crates/blockchain-tree/src/canonical_cache.rs new file mode 100644 index 000000000..33c297efe --- /dev/null +++ b/crates/blockchain-tree/src/canonical_cache.rs @@ -0,0 +1,397 @@ +use lazy_static::lazy_static; +use quick_cache::sync::Cache; +use reth_primitives::{Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256}; +use reth_provider::{ + AccountReader, BlockHashReader, ExecutionDataProvider, StateProofProvider, StateProvider, + StateRootProvider, +}; +use reth_revm::db::BundleState; +use reth_storage_errors::provider::ProviderResult; +use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState}; + +/// The size of cache, counted by the number of accounts. +const CACHE_SIZE: usize = 1000000; + +type AddressStorageKey = (Address, StorageKey); + +lazy_static! { + /// Account cache + pub static ref ACCOUNT_CACHE: Cache = Cache::new(CACHE_SIZE); + + /// Contract cache + /// The size of contract is large and the hot contracts should be limited. + static ref CONTRACT_CACHE: Cache = Cache::new(CACHE_SIZE/10); + + /// Storage cache + static ref STORAGE_CACHE: Cache = Cache::new(CACHE_SIZE*10); + + /// Block hash cache + static ref BLOCK_HASH_CACHE: Cache = Cache::new(CACHE_SIZE/10); +} + +/// Apply committed state to canonical cache. +pub(crate) fn apply_bundle_state(bundle: BundleState) { + let change_set = bundle.into_plain_state(reth_provider::OriginalValuesKnown::Yes); + + for (address, account_info) in &change_set.accounts { + match account_info { + None => { + ACCOUNT_CACHE.remove(address); + } + Some(acc) => { + ACCOUNT_CACHE.insert( + *address, + Account { + nonce: acc.nonce, + balance: acc.balance, + bytecode_hash: Some(acc.code_hash), + }, + ); + } + } + } + + let mut to_wipe = false; + for storage in &change_set.storage { + if storage.wipe_storage { + to_wipe = true; + break; + } else { + for (k, v) in storage.storage.clone() { + STORAGE_CACHE.insert((storage.address, StorageKey::from(k)), v); + } + } + } + if to_wipe { + STORAGE_CACHE.clear(); + } +} + +/// Clear cached accounts and storages. +pub fn clear_accounts_and_storages() { + ACCOUNT_CACHE.clear(); + STORAGE_CACHE.clear(); +} + +#[derive(Debug)] +pub(crate) struct CachedBundleStateProvider { + /// The inner state provider. + pub(crate) state_provider: SP, + /// Block execution data. + pub(crate) block_execution_data_provider: EDP, +} + +impl CachedBundleStateProvider { + /// Create new cached bundle state provider + pub(crate) const fn new(state_provider: SP, block_execution_data_provider: EDP) -> Self { + Self { state_provider, block_execution_data_provider } + } +} + +impl BlockHashReader + for CachedBundleStateProvider +{ + fn block_hash(&self, block_number: BlockNumber) -> ProviderResult> { + let block_hash = self.block_execution_data_provider.block_hash(block_number); + if block_hash.is_some() { + return Ok(block_hash) + } + if let Some(v) = BLOCK_HASH_CACHE.get(&block_number) { + return Ok(Some(v)) + } + if let Some(value) = self.state_provider.block_hash(block_number)? { + BLOCK_HASH_CACHE.insert(block_number, value); + return Ok(Some(value)) + } + Ok(None) + } + + fn canonical_hashes_range( + &self, + _start: BlockNumber, + _end: BlockNumber, + ) -> ProviderResult> { + unimplemented!() + } +} + +impl AccountReader + for CachedBundleStateProvider +{ + fn basic_account(&self, address: Address) -> ProviderResult> { + if let Some(account) = + self.block_execution_data_provider.execution_outcome().account(&address) + { + return Ok(account) + } + if let Some(v) = ACCOUNT_CACHE.get(&address) { + return Ok(Some(v)) + } + if let Some(value) = self.state_provider.basic_account(address)? { + ACCOUNT_CACHE.insert(address, value); + return Ok(Some(value)) + } + Ok(None) + } +} + +impl StateRootProvider + for CachedBundleStateProvider +{ + fn state_root(&self, bundle_state: &BundleState) -> ProviderResult { + let mut state = self.block_execution_data_provider.execution_outcome().state().clone(); + state.extend(bundle_state.clone()); + self.state_provider.state_root(&state) + } + + fn hashed_state_root(&self, hashed_state: &reth_trie::HashedPostState) -> ProviderResult { + let bundle_state = self.block_execution_data_provider.execution_outcome().state(); + let mut state = HashedPostState::from_bundle_state(&bundle_state.state); + state.extend(hashed_state.clone()); + self.state_provider.hashed_state_root(&state) + } + + fn state_root_with_updates( + &self, + bundle_state: &BundleState, + ) -> ProviderResult<(B256, TrieUpdates)> { + let mut state = self.block_execution_data_provider.execution_outcome().state().clone(); + state.extend(bundle_state.clone()); + self.state_provider.state_root_with_updates(&state) + } + + fn hashed_state_root_with_updates( + &self, + hashed_state: &HashedPostState, + ) -> ProviderResult<(B256, TrieUpdates)> { + let bundle_state = self.block_execution_data_provider.execution_outcome().state(); + let mut state = HashedPostState::from_bundle_state(&bundle_state.state); + state.extend(hashed_state.clone()); + self.state_provider.hashed_state_root_with_updates(&state) + } +} + +impl StateProofProvider + for CachedBundleStateProvider +{ + fn hashed_proof( + &self, + hashed_state: &HashedPostState, + address: Address, + slots: &[B256], + ) -> ProviderResult { + let bundle_state = self.block_execution_data_provider.execution_outcome().state(); + let mut state = HashedPostState::from_bundle_state(&bundle_state.state); + state.extend(hashed_state.clone()); + self.state_provider.hashed_proof(&state, address, slots) + } +} + +impl StateProvider + for CachedBundleStateProvider +{ + fn storage( + &self, + account: Address, + storage_key: StorageKey, + ) -> ProviderResult> { + let u256_storage_key = storage_key.into(); + if let Some(value) = self + .block_execution_data_provider + .execution_outcome() + .storage(&account, u256_storage_key) + { + return Ok(Some(value)) + } + let cache_key = (account, storage_key); + if let Some(v) = STORAGE_CACHE.get(&cache_key) { + return Ok(Some(v)) + } + if let Some(value) = self.state_provider.storage(account, storage_key)? { + STORAGE_CACHE.insert(cache_key, value); + return Ok(Some(value)) + } + Ok(None) + } + + fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { + if let Some(bytecode) = + self.block_execution_data_provider.execution_outcome().bytecode(&code_hash) + { + return Ok(Some(bytecode)) + } + if let Some(v) = CONTRACT_CACHE.get(&code_hash) { + return Ok(Some(v)) + } + if let Some(value) = self.state_provider.bytecode_by_hash(code_hash)? { + CONTRACT_CACHE.insert(code_hash, value.clone()); + return Ok(Some(value)) + } + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{canonical_cache::CachedBundleStateProvider, BundleStateDataRef}; + use reth_execution_types::ExecutionOutcome; + use reth_primitives::{ + revm_primitives::{AccountInfo, KECCAK_EMPTY}, + ForkBlock, + }; + use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory}; + use reth_revm::{db::AccountStatus, primitives::U256}; + use std::collections::{BTreeMap, HashMap}; + + #[test] + fn test_basic() { + let execution_outcome = ExecutionOutcome::default(); + let empty = BTreeMap::new(); + + let factory = create_test_provider_factory(); + let consistent_view = ConsistentDbView::new_with_latest_tip(factory.clone()).unwrap(); + let state_provider = consistent_view + .provider_ro() + .unwrap() + .disable_long_read_transaction_safety() + .state_provider_by_block_number(1) + .unwrap(); + let bdp = BundleStateDataRef { + execution_outcome: &execution_outcome, + sidechain_block_hashes: &empty, + canonical_block_hashes: &empty, + canonical_fork: ForkBlock::new(1, B256::random()), + }; + let cached_bundle_provider = CachedBundleStateProvider::new(state_provider, bdp); + + let account = Address::random(); + let result = cached_bundle_provider.basic_account(account).unwrap(); + assert_eq!(result.is_none(), true); + + ACCOUNT_CACHE + .insert(account, Account { nonce: 100, balance: U256::ZERO, bytecode_hash: None }); + let result = cached_bundle_provider.basic_account(account).unwrap(); + assert_eq!(result.unwrap().nonce, 100); + + BLOCK_HASH_CACHE.insert(100, B256::with_last_byte(9)); + let result = cached_bundle_provider.block_hash(100).unwrap(); + assert_eq!(result.unwrap(), B256::with_last_byte(9)); + } + + #[test] + fn test_apply_bundle_state() { + let execution_outcome = ExecutionOutcome::default(); + let empty = BTreeMap::new(); + + let factory = create_test_provider_factory(); + let consistent_view = ConsistentDbView::new_with_latest_tip(factory.clone()).unwrap(); + let state_provider = consistent_view + .provider_ro() + .unwrap() + .disable_long_read_transaction_safety() + .state_provider_by_block_number(1) + .unwrap(); + let bdp = BundleStateDataRef { + execution_outcome: &execution_outcome, + sidechain_block_hashes: &empty, + canonical_block_hashes: &empty, + canonical_fork: ForkBlock::new(1, B256::random()), + }; + let cached_bundle_provider = CachedBundleStateProvider::new(state_provider, bdp); + + // apply bundle state to set cache + let account1 = Address::random(); + let account2 = Address::random(); + let bundle_state = BundleState::new( + vec![ + ( + account1, + None, + Some(AccountInfo { + nonce: 1, + balance: U256::from(10), + code_hash: KECCAK_EMPTY, + code: None, + }), + HashMap::from([ + (U256::from(2), (U256::from(0), U256::from(10))), + (U256::from(5), (U256::from(0), U256::from(15))), + ]), + ), + ( + account2, + None, + Some(AccountInfo { + nonce: 1, + balance: U256::from(10), + code_hash: KECCAK_EMPTY, + code: None, + }), + HashMap::from([]), + ), + ], + vec![vec![ + ( + account1, + Some(None), + vec![(U256::from(2), U256::from(0)), (U256::from(5), U256::from(0))], + ), + (account2, Some(None), vec![]), + ]], + vec![], + ); + apply_bundle_state(bundle_state); + + let account1_result = cached_bundle_provider.basic_account(account1).unwrap(); + assert_eq!(account1_result.unwrap().nonce, 1); + let storage1_result = + cached_bundle_provider.storage(account1, B256::with_last_byte(2)).unwrap(); + assert_eq!(storage1_result.unwrap(), U256::from(10)); + let storage2_result = + cached_bundle_provider.storage(account1, B256::with_last_byte(5)).unwrap(); + assert_eq!(storage2_result.unwrap(), U256::from(15)); + + let account2_result = cached_bundle_provider.basic_account(account2).unwrap(); + assert_eq!(account2_result.unwrap().nonce, 1); + + // apply bundle state to set clear cache + let account3 = Address::random(); + let mut bundle_state = BundleState::new( + vec![( + account3, + Some(AccountInfo { + nonce: 3, + balance: U256::from(10), + code_hash: KECCAK_EMPTY, + code: None, + }), + None, + HashMap::from([ + (U256::from(2), (U256::from(0), U256::from(10))), + (U256::from(5), (U256::from(0), U256::from(15))), + ]), + )], + vec![vec![( + account3, + Some(None), + vec![(U256::from(2), U256::from(0)), (U256::from(5), U256::from(0))], + )]], + vec![], + ); + bundle_state.state.get_mut(&account3).unwrap().status = AccountStatus::Destroyed; + apply_bundle_state(bundle_state); + + let account1_result = cached_bundle_provider.basic_account(account1).unwrap(); + assert_eq!(account1_result.unwrap().nonce, 1); + let storage1_result = + cached_bundle_provider.storage(account1, B256::with_last_byte(2)).unwrap(); + assert_eq!(storage1_result.is_none(), true); + let storage2_result = + cached_bundle_provider.storage(account1, B256::with_last_byte(5)).unwrap(); + assert_eq!(storage2_result.is_none(), true); + + let account2_result = cached_bundle_provider.basic_account(account2).unwrap(); + assert_eq!(account2_result.unwrap().nonce, 1); + } +} diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index dbc0c1d04..b04a20278 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -4,7 +4,10 @@ //! blocks, as well as a list of the blocks the chain is composed of. use super::externals::TreeExternals; -use crate::BundleStateDataRef; +use crate::{ + canonical_cache::{clear_accounts_and_storages, CachedBundleStateProvider}, + BundleStateDataRef, +}; use reth_blockchain_tree_api::{ error::{BlockchainTreeError, InsertBlockErrorKind}, BlockAttachment, BlockValidationKind, @@ -18,8 +21,7 @@ use reth_primitives::{ BlockHash, BlockNumber, ForkBlock, GotExpected, SealedBlockWithSenders, SealedHeader, U256, }; use reth_provider::{ - providers::{BundleStateProvider, ConsistentDbView}, - FullExecutionDataProvider, ProviderError, StateRootProvider, + providers::ConsistentDbView, FullExecutionDataProvider, ProviderError, StateRootProvider, }; use reth_revm::database::StateProviderDatabase; use reth_trie::updates::TrieUpdates; @@ -82,6 +84,12 @@ impl AppendableChain { let execution_outcome = ExecutionOutcome::default(); let empty = BTreeMap::new(); + if block_attachment == BlockAttachment::HistoricalFork { + // The fork is a historical fork, the global canonical cache could be dirty. + // The case should be rare for bsc & op. + clear_accounts_and_storages(); + } + let state_provider = BundleStateDataRef { execution_outcome: &execution_outcome, sidechain_block_hashes: &empty, @@ -202,7 +210,7 @@ impl AppendableChain { .disable_long_read_transaction_safety() .state_provider_by_block_number(canonical_fork.number)?; - let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider); + let provider = CachedBundleStateProvider::new(state_provider, bundle_state_data_provider); let db = StateProviderDatabase::new(&provider); let executor = externals.executor_factory.executor(db); diff --git a/crates/blockchain-tree/src/lib.rs b/crates/blockchain-tree/src/lib.rs index 3f501bead..39254877b 100644 --- a/crates/blockchain-tree/src/lib.rs +++ b/crates/blockchain-tree/src/lib.rs @@ -56,4 +56,7 @@ pub mod noop; mod state; +/// The global canonical cache for live sync. +pub mod canonical_cache; + use aquamarine as _; diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 079963e78..1e0e4826a 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -15,6 +15,7 @@ workspace = true reth-chainspec.workspace = true reth-ethereum-consensus.workspace = true reth-blockchain-tree-api.workspace = true +reth-blockchain-tree.workspace = true reth-primitives.workspace = true reth-stages-api.workspace = true reth-errors.workspace = true diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index f224cc670..3e83b8542 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -6,6 +6,7 @@ use crate::{ engine::metrics::EngineSyncMetrics, BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress, }; use futures::FutureExt; +use reth_blockchain_tree::canonical_cache; #[cfg(feature = "bsc")] use reth_bsc_consensus::Parlia; use reth_chainspec::ChainSpec; @@ -27,7 +28,7 @@ use std::{ task::{ready, Context, Poll}, }; use tokio::sync::oneshot; -use tracing::trace; +use tracing::{debug, trace}; /// Manages syncing under the control of the engine. /// @@ -220,6 +221,11 @@ where // precaution to never sync to the zero hash return } + debug!( + target: "consensus::engine::sync", + "Clear global canonical cache." + ); + canonical_cache::clear_accounts_and_storages(); self.pending_pipeline_target = Some(target); } diff --git a/crates/ethereum/node/Cargo.toml b/crates/ethereum/node/Cargo.toml index 2cce8650d..6afb08e37 100644 --- a/crates/ethereum/node/Cargo.toml +++ b/crates/ethereum/node/Cargo.toml @@ -38,7 +38,7 @@ reth-exex.workspace = true # misc eyre.workspace = true -tokio = { workspace = true , features = ["sync"]} +tokio = { workspace = true, features = ["sync"] } tokio-stream.workspace = true futures.workspace = true @@ -49,6 +49,7 @@ reth-db.workspace = true reth-exex.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true +reth-blockchain-tree.workspace = true reth-e2e-test-utils.workspace = true alloy-primitives.workspace = true alloy-genesis.workspace = true diff --git a/crates/ethereum/node/tests/e2e/p2p.rs b/crates/ethereum/node/tests/e2e/p2p.rs index a40c1b3f4..d54a3d5bc 100644 --- a/crates/ethereum/node/tests/e2e/p2p.rs +++ b/crates/ethereum/node/tests/e2e/p2p.rs @@ -1,4 +1,5 @@ use crate::utils::eth_payload_attributes; +use reth::blockchain_tree::canonical_cache; use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_e2e_test_utils::{setup, transaction::TransactionTestContext}; use reth_node_ethereum::EthereumNode; @@ -40,6 +41,10 @@ async fn can_sync() -> eyre::Result<()> { // only send forkchoice update to second node second_node.engine_api.update_forkchoice(block_hash, block_hash).await?; + // The canonical cache is static, which means that the two nodes will share the same cache. + // Need to clear the cache to advance the second node. + canonical_cache::clear_accounts_and_storages(); + // expect second node advanced via p2p gossip second_node.assert_new_block(tx_hash, block_hash, 1).await?;