Skip to content

Commit

Permalink
feat: add canonical cache for live sync
Browse files Browse the repository at this point in the history
  • Loading branch information
forcodedancing committed Aug 19, 2024
1 parent 380dbb2 commit a9106ae
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 5 deletions.
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ tokio = { workspace = true, features = ["macros", "sync"] }
reth-metrics = { workspace = true, features = ["common"] }
metrics.workspace = true

# cache
quick_cache = "0.6.2"
lazy_static = "1.4.0"

# misc
aquamarine.workspace = true
linked_hash_set.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Implementation of [`BlockchainTree`]

use crate::{
canonical_cache::apply_bundle_state,
metrics::{MakeCanonicalAction, MakeCanonicalDurationsRecorder, TreeMetrics},
state::{BlockchainId, TreeState},
AppendableChain, BlockIndices, BlockchainTreeConfig, ExecutionData, TreeExternals,
Expand Down Expand Up @@ -1261,6 +1262,7 @@ where
};
recorder.record_relative(MakeCanonicalAction::RetrieveStateTrieUpdates);

let cloned_bundle = state.clone().bundle;
let provider_rw = self.externals.provider_factory.provider_rw()?;
provider_rw
.append_blocks_with_state(
Expand All @@ -1274,6 +1276,9 @@ where
provider_rw.commit()?;
recorder.record_relative(MakeCanonicalAction::CommitCanonicalChainToDatabase);

// update global canonical cache
apply_bundle_state(cloned_bundle);

Ok(())
}

Expand Down
231 changes: 231 additions & 0 deletions crates/blockchain-tree/src/canonical_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use lazy_static::lazy_static;
use quick_cache::sync::Cache;
use reth_primitives::{Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256};
use reth_provider::{
AccountReader, BlockHashReader, ExecutionDataProvider, StateProofProvider, StateProvider,
StateRootProvider,
};
use reth_revm::db::BundleState;
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState};

/// The size of cache, counted by the number of accounts.
const CACHE_SIZE: usize = 1000000;

type AddressStorageKey = (Address, StorageKey);

lazy_static! {
/// Account cache
pub static ref ACCOUNT_CACHE: Cache<Address, Account> = Cache::new(CACHE_SIZE);

/// Contract cache
static ref CONTRACT_CACHE: Cache<B256, Bytecode> = Cache::new(CACHE_SIZE);

/// Storage cache
static ref STORAGE_CACHE: Cache<AddressStorageKey, StorageValue> = Cache::new(CACHE_SIZE*10);

/// Block hash cache
static ref BLOCK_HASH_CACHE: Cache<u64, B256> = Cache::new(CACHE_SIZE);
}

/// Apply committed state to canonical cache.
pub(crate) fn apply_bundle_state(bundle: BundleState) {
let change_set = bundle.into_plain_state(reth_provider::OriginalValuesKnown::Yes);

for (address, account_info) in &change_set.accounts {
match account_info {
None => {
ACCOUNT_CACHE.remove(address);
}
Some(acc) => {
ACCOUNT_CACHE.insert(
*address,
Account {
nonce: acc.nonce,
balance: acc.balance,
bytecode_hash: Some(acc.code_hash),
},
);
}
}
}

let mut to_wipe = false;
for storage in &change_set.storage {
if storage.wipe_storage {
to_wipe = true;
break;
} else {
for (k, v) in storage.storage.clone() {
STORAGE_CACHE.insert((storage.address, StorageKey::from(k)), v);
}
}
}
if to_wipe {
STORAGE_CACHE.clear();
}
}

/// Clear cached accounts and storages.
pub fn clear_accounts_and_storages() {
ACCOUNT_CACHE.clear();
STORAGE_CACHE.clear();
}

#[derive(Debug)]
pub(crate) struct CachedBundleStateProvider<SP: StateProvider, EDP: ExecutionDataProvider> {
/// The inner state provider.
pub(crate) state_provider: SP,
/// Block execution data.
pub(crate) block_execution_data_provider: EDP,
}

impl<SP: StateProvider, EDP: ExecutionDataProvider> CachedBundleStateProvider<SP, EDP> {
/// Create new cached bundle state provider
pub(crate) const fn new(state_provider: SP, block_execution_data_provider: EDP) -> Self {
Self { state_provider, block_execution_data_provider }
}
}

impl<SP: StateProvider, EDP: ExecutionDataProvider> BlockHashReader
for CachedBundleStateProvider<SP, EDP>
{
fn block_hash(&self, block_number: BlockNumber) -> ProviderResult<Option<B256>> {
let block_hash = self.block_execution_data_provider.block_hash(block_number);
if block_hash.is_some() {
return Ok(block_hash)
}
if let Some(v) = BLOCK_HASH_CACHE.get(&block_number) {
return Ok(Some(v))
}
if let Some(value) = self.state_provider.block_hash(block_number)? {
BLOCK_HASH_CACHE.insert(block_number, value);
return Ok(Some(value))
}
Ok(None)
}

fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
unimplemented!()
}
}

impl<SP: StateProvider, EDP: ExecutionDataProvider> AccountReader
for CachedBundleStateProvider<SP, EDP>
{
fn basic_account(&self, address: Address) -> ProviderResult<Option<Account>> {
if let Some(account) =
self.block_execution_data_provider.execution_outcome().account(&address)
{
return Ok(account)
}
if let Some(v) = ACCOUNT_CACHE.get(&address) {
return Ok(Some(v))
}
if let Some(value) = self.state_provider.basic_account(address)? {
ACCOUNT_CACHE.insert(address, value);
return Ok(Some(value))
}
Ok(None)
}
}

impl<SP: StateProvider, EDP: ExecutionDataProvider> StateRootProvider
for CachedBundleStateProvider<SP, EDP>
{
fn state_root(&self, bundle_state: &BundleState) -> ProviderResult<B256> {
let mut state = self.block_execution_data_provider.execution_outcome().state().clone();
state.extend(bundle_state.clone());
self.state_provider.state_root(&state)
}

fn hashed_state_root(&self, hashed_state: &reth_trie::HashedPostState) -> ProviderResult<B256> {
let bundle_state = self.block_execution_data_provider.execution_outcome().state();
let mut state = HashedPostState::from_bundle_state(&bundle_state.state);
state.extend(hashed_state.clone());
self.state_provider.hashed_state_root(&state)
}

fn state_root_with_updates(
&self,
bundle_state: &BundleState,
) -> ProviderResult<(B256, TrieUpdates)> {
let mut state = self.block_execution_data_provider.execution_outcome().state().clone();
state.extend(bundle_state.clone());
self.state_provider.state_root_with_updates(&state)
}

fn hashed_state_root_with_updates(
&self,
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let bundle_state = self.block_execution_data_provider.execution_outcome().state();
let mut state = HashedPostState::from_bundle_state(&bundle_state.state);
state.extend(hashed_state.clone());
self.state_provider.hashed_state_root_with_updates(&state)
}
}

impl<SP: StateProvider, EDP: ExecutionDataProvider> StateProofProvider
for CachedBundleStateProvider<SP, EDP>
{
fn hashed_proof(
&self,
hashed_state: &HashedPostState,
address: Address,
slots: &[B256],
) -> ProviderResult<AccountProof> {
let bundle_state = self.block_execution_data_provider.execution_outcome().state();
let mut state = HashedPostState::from_bundle_state(&bundle_state.state);
state.extend(hashed_state.clone());
self.state_provider.hashed_proof(&state, address, slots)
}
}

impl<SP: StateProvider, EDP: ExecutionDataProvider> StateProvider
for CachedBundleStateProvider<SP, EDP>
{
fn storage(
&self,
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let u256_storage_key = storage_key.into();
if let Some(value) = self
.block_execution_data_provider
.execution_outcome()
.storage(&account, u256_storage_key)
{
return Ok(Some(value))
}
let cache_key = (account, storage_key);
if let Some(v) = STORAGE_CACHE.get(&cache_key) {
return Ok(Some(v))
}
if let Some(value) = self.state_provider.storage(account, storage_key)? {
STORAGE_CACHE.insert(cache_key, value);
return Ok(Some(value))
}
Ok(None)
}

fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult<Option<Bytecode>> {
if let Some(bytecode) =
self.block_execution_data_provider.execution_outcome().bytecode(&code_hash)
{
return Ok(Some(bytecode))
}
if let Some(v) = CONTRACT_CACHE.get(&code_hash) {
return Ok(Some(v))
}
if let Some(value) = self.state_provider.bytecode_by_hash(code_hash)? {
CONTRACT_CACHE.insert(code_hash, value.clone());
return Ok(Some(value))
}
Ok(None)
}
}
7 changes: 3 additions & 4 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! blocks, as well as a list of the blocks the chain is composed of.

use super::externals::TreeExternals;
use crate::BundleStateDataRef;
use crate::{canonical_cache::CachedBundleStateProvider, BundleStateDataRef};
use reth_blockchain_tree_api::{
error::{BlockchainTreeError, InsertBlockErrorKind},
BlockAttachment, BlockValidationKind,
Expand All @@ -18,8 +18,7 @@ use reth_primitives::{
BlockHash, BlockNumber, ForkBlock, GotExpected, SealedBlockWithSenders, SealedHeader, U256,
};
use reth_provider::{
providers::{BundleStateProvider, ConsistentDbView},
FullExecutionDataProvider, ProviderError, StateRootProvider,
providers::ConsistentDbView, FullExecutionDataProvider, ProviderError, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_trie::updates::TrieUpdates;
Expand Down Expand Up @@ -202,7 +201,7 @@ impl AppendableChain {
.disable_long_read_transaction_safety()
.state_provider_by_block_number(canonical_fork.number)?;

let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);
let provider = CachedBundleStateProvider::new(state_provider, bundle_state_data_provider);

let db = StateProviderDatabase::new(&provider);
let executor = externals.executor_factory.executor(db);
Expand Down
3 changes: 3 additions & 0 deletions crates/blockchain-tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ pub mod noop;

mod state;

/// The global canonical cache for live sync.
pub mod canonical_cache;

use aquamarine as _;
1 change: 1 addition & 0 deletions crates/consensus/beacon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
reth-chainspec.workspace = true
reth-ethereum-consensus.workspace = true
reth-blockchain-tree-api.workspace = true
reth-blockchain-tree.workspace = true
reth-primitives.workspace = true
reth-stages-api.workspace = true
reth-errors.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
engine::metrics::EngineSyncMetrics, BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress,
};
use futures::FutureExt;
use reth_blockchain_tree::canonical_cache;
#[cfg(feature = "bsc")]
use reth_bsc_consensus::Parlia;
use reth_chainspec::ChainSpec;
Expand Down Expand Up @@ -288,6 +289,8 @@ where
// outdated (included in the range the pipeline is syncing anyway)
self.clear_block_download_requests();

canonical_cache::clear_accounts_and_storages();

Some(EngineSyncEvent::PipelineStarted(Some(target)))
}
PipelineState::Running(_) => None,
Expand Down

0 comments on commit a9106ae

Please sign in to comment.