diff --git a/Cargo.lock b/Cargo.lock index 262d38af..784d9105 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11142,6 +11142,7 @@ dependencies = [ "async-trait", "eyre", "itertools 0.14.0", + "lru 0.13.0", "metrics", "metrics-derive", "rand 0.9.2", diff --git a/Cargo.toml b/Cargo.toml index 5f4d3d72..b1dcfa3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace.package] version = "0.0.1" edition = "2021" -rust-version = "1.82" +rust-version = "1.83" license = "MIT OR Apache-2.0" exclude = [".github/"] @@ -220,6 +220,7 @@ clap = { version = "4", features = ["derive", "env"] } derive_more = { version = "2.0", default-features = false } eyre = "0.6" futures = { version = "0.3", default-features = false } +lru = "0.13.0" metrics = "0.24.0" metrics-derive = "0.1" parking_lot = "0.12" diff --git a/crates/chain-orchestrator/src/sync.rs b/crates/chain-orchestrator/src/sync.rs index a94ee46f..be73a69c 100644 --- a/crates/chain-orchestrator/src/sync.rs +++ b/crates/chain-orchestrator/src/sync.rs @@ -26,12 +26,12 @@ impl SyncState { } /// Returns a mutable reference to the sync mode of L1. - pub fn l1_mut(&mut self) -> &mut SyncMode { + pub const fn l1_mut(&mut self) -> &mut SyncMode { &mut self.l1 } /// Returns a mutable reference to the sync mode of L2. - pub fn l2_mut(&mut self) -> &mut SyncMode { + pub const fn l2_mut(&mut self) -> &mut SyncMode { &mut self.l2 } @@ -64,12 +64,12 @@ impl SyncMode { } /// Sets the sync mode to [`SyncMode::Synced`]. - pub fn set_synced(&mut self) { + pub const fn set_synced(&mut self) { *self = Self::Synced; } /// Sets the sync mode to [`SyncMode::Syncing`]. - pub fn set_syncing(&mut self) { + pub const fn set_syncing(&mut self) { *self = Self::Syncing; } } diff --git a/crates/node/src/test_utils/block_builder.rs b/crates/node/src/test_utils/block_builder.rs index a7400eca..38a42097 100644 --- a/crates/node/src/test_utils/block_builder.rs +++ b/crates/node/src/test_utils/block_builder.rs @@ -49,7 +49,7 @@ impl L1MessagesAssertion { impl<'a> BlockBuilder<'a> { /// Create a new block builder. - pub(crate) fn new(fixture: &'a mut TestFixture) -> Self { + pub(crate) const fn new(fixture: &'a mut TestFixture) -> Self { Self { fixture, expected_tx_hashes: Vec::new(), diff --git a/crates/node/src/test_utils/event_utils.rs b/crates/node/src/test_utils/event_utils.rs index 80d38b7d..9d7b7da1 100644 --- a/crates/node/src/test_utils/event_utils.rs +++ b/crates/node/src/test_utils/event_utils.rs @@ -22,7 +22,7 @@ pub struct EventWaiter<'a> { impl<'a> EventWaiter<'a> { /// Create a new multi-node event waiter. - pub fn new(fixture: &'a mut TestFixture, node_indices: Vec) -> Self { + pub const fn new(fixture: &'a mut TestFixture, node_indices: Vec) -> Self { Self { fixture, node_indices, timeout_duration: Duration::from_secs(30) } } diff --git a/crates/node/src/test_utils/fixture.rs b/crates/node/src/test_utils/fixture.rs index ab1766ae..2ae50eb9 100644 --- a/crates/node/src/test_utils/fixture.rs +++ b/crates/node/src/test_utils/fixture.rs @@ -135,17 +135,17 @@ impl TestFixture { } /// Start building a block using the sequencer. - pub fn build_block(&mut self) -> BlockBuilder<'_> { + pub const fn build_block(&mut self) -> BlockBuilder<'_> { BlockBuilder::new(self) } /// Get L1 helper for managing L1 interactions. - pub fn l1(&mut self) -> L1Helper<'_> { + pub const fn l1(&mut self) -> L1Helper<'_> { L1Helper::new(self) } /// Get transaction helper for creating and injecting transactions. - pub fn tx(&mut self) -> TxHelper<'_> { + pub const fn tx(&mut self) -> TxHelper<'_> { TxHelper::new(self) } @@ -417,7 +417,7 @@ impl TestFixtureBuilder { } /// Get a mutable reference to the underlying config for advanced customization. - pub fn config_mut(&mut self) -> &mut ScrollRollupNodeConfig { + pub const fn config_mut(&mut self) -> &mut ScrollRollupNodeConfig { &mut self.config } diff --git a/crates/node/src/test_utils/l1_helpers.rs b/crates/node/src/test_utils/l1_helpers.rs index 9a54d5cf..d6c858cf 100644 --- a/crates/node/src/test_utils/l1_helpers.rs +++ b/crates/node/src/test_utils/l1_helpers.rs @@ -17,7 +17,7 @@ pub struct L1Helper<'a> { impl<'a> L1Helper<'a> { /// Create a new L1 helper. - pub(crate) fn new(fixture: &'a mut TestFixture) -> Self { + pub(crate) const fn new(fixture: &'a mut TestFixture) -> Self { Self { fixture, target_node_index: None } } diff --git a/crates/node/src/test_utils/network_helpers.rs b/crates/node/src/test_utils/network_helpers.rs index 5537b75b..6f9d3876 100644 --- a/crates/node/src/test_utils/network_helpers.rs +++ b/crates/node/src/test_utils/network_helpers.rs @@ -161,7 +161,7 @@ pub struct ReputationChecker<'a> { impl<'a> ReputationChecker<'a> { /// Create a new reputation checker. - pub fn new(fixture: &'a mut TestFixture, observer_node: usize) -> Self { + pub const fn new(fixture: &'a mut TestFixture, observer_node: usize) -> Self { Self { fixture, observer_node, diff --git a/crates/node/src/test_utils/tx_helpers.rs b/crates/node/src/test_utils/tx_helpers.rs index ded7123b..e94bf955 100644 --- a/crates/node/src/test_utils/tx_helpers.rs +++ b/crates/node/src/test_utils/tx_helpers.rs @@ -13,7 +13,7 @@ pub struct TxHelper<'a> { impl<'a> TxHelper<'a> { /// Create a new transaction helper. - pub(crate) fn new(fixture: &'a mut TestFixture) -> Self { + pub(crate) const fn new(fixture: &'a mut TestFixture) -> Self { Self { fixture, target_node_index: 0 } } diff --git a/crates/providers/Cargo.toml b/crates/providers/Cargo.toml index 95b42a42..6e349bab 100644 --- a/crates/providers/Cargo.toml +++ b/crates/providers/Cargo.toml @@ -32,7 +32,7 @@ async-trait.workspace = true auto_impl.workspace = true eyre.workspace = true futures.workspace = true -lru = "0.13.0" +lru.workspace = true reqwest = { workspace = true, features = ["json"] } serde = { workspace = true, features = ["derive"] } thiserror.workspace = true diff --git a/crates/scroll-wire/src/manager.rs b/crates/scroll-wire/src/manager.rs index 5b3c65c9..10a6b1dd 100644 --- a/crates/scroll-wire/src/manager.rs +++ b/crates/scroll-wire/src/manager.rs @@ -61,7 +61,7 @@ impl ScrollWireManager { } /// Returns a mutable reference to the state of the `ScrollWire` protocol. - pub fn state_mut(&mut self) -> &mut HashMap> { + pub const fn state_mut(&mut self) -> &mut HashMap> { &mut self.state } } diff --git a/crates/watcher/Cargo.toml b/crates/watcher/Cargo.toml index 2df1470e..4c442cd9 100644 --- a/crates/watcher/Cargo.toml +++ b/crates/watcher/Cargo.toml @@ -34,6 +34,7 @@ scroll-alloy-consensus.workspace = true arbitrary = { workspace = true, optional = true } async-trait.workspace = true itertools = "0.14" +lru.workspace = true metrics.workspace = true metrics-derive.workspace = true rand = { workspace = true, optional = true } diff --git a/crates/watcher/src/cache.rs b/crates/watcher/src/cache.rs new file mode 100644 index 00000000..d1567c3d --- /dev/null +++ b/crates/watcher/src/cache.rs @@ -0,0 +1,103 @@ +use crate::error::CacheError; + +use super::{EthRequestError, L1WatcherResult}; + +use std::num::NonZeroUsize; + +use alloy_primitives::{TxHash, B256}; +use alloy_provider::Provider; +use alloy_rpc_types_eth::{Transaction, TransactionTrait}; +use lru::LruCache; + +/// The L1 watcher cache. +#[derive(Debug)] +pub(crate) struct Cache { + transaction_cache: TransactionCache, + // TODO: introduce block cache. +} + +impl Cache { + /// Creates a new [`Cache`] instance with the given capacity for the transaction cache. + pub(crate) fn new(transaction_cache_capacity: NonZeroUsize) -> Self { + Self { transaction_cache: TransactionCache::new(transaction_cache_capacity) } + } + + /// Gets the transaction for the given hash, fetching it from the provider if not cached. + pub(crate) async fn get_transaction_by_hash( + &mut self, + tx_hash: TxHash, + provider: &P, + ) -> L1WatcherResult { + self.transaction_cache.get_transaction_by_hash(tx_hash, provider).await + } + + /// Gets the next blob versioned hash for the given transaction hash. + /// + /// Errors if the transaction is not in the cache. This method must be called only after + /// fetching the transaction via [`Self::get_transaction_by_hash`]. + pub(crate) async fn get_transaction_next_blob_versioned_hash( + &mut self, + tx_hash: TxHash, + ) -> L1WatcherResult> { + self.transaction_cache.get_transaction_next_blob_versioned_hash(tx_hash).await + } +} + +/// A cache for transactions fetched from the provider. +#[derive(Debug)] +struct TransactionCache { + cache: LruCache, +} + +#[derive(Debug)] +struct TransactionEntry { + transaction: Transaction, + blob_versioned_hashes: Vec, + blob_versioned_hashes_cursor: usize, +} + +impl TransactionCache { + fn new(capacity: NonZeroUsize) -> Self { + Self { cache: LruCache::new(capacity) } + } + + async fn get_transaction_by_hash( + &mut self, + tx_hash: TxHash, + provider: &P, + ) -> L1WatcherResult { + if let Some(entry) = self.cache.get(&tx_hash) { + return Ok(entry.transaction.clone()); + } + + let transaction = provider + .get_transaction_by_hash(tx_hash) + .await? + .ok_or(EthRequestError::MissingTransactionHash(tx_hash))?; + self.cache.put(tx_hash, transaction.clone().into()); + Ok(transaction) + } + + async fn get_transaction_next_blob_versioned_hash( + &mut self, + tx_hash: TxHash, + ) -> L1WatcherResult> { + if let Some(entry) = self.cache.get_mut(&tx_hash) { + let blob_versioned_hash = + entry.blob_versioned_hashes.get(entry.blob_versioned_hashes_cursor).copied(); + entry.blob_versioned_hashes_cursor += 1; + Ok(blob_versioned_hash) + } else { + Err(CacheError::MissingTransactionInCacheForBlobVersionedHash(tx_hash).into()) + } + } +} + +impl From for TransactionEntry { + fn from(transaction: Transaction) -> Self { + let blob_versioned_hashes = + transaction.blob_versioned_hashes().map(|hashes| hashes.to_vec()).unwrap_or_default(); + + Self { transaction, blob_versioned_hashes, blob_versioned_hashes_cursor: 0 } + } +} diff --git a/crates/watcher/src/error.rs b/crates/watcher/src/error.rs index b9c87cb4..64ac3101 100644 --- a/crates/watcher/src/error.rs +++ b/crates/watcher/src/error.rs @@ -27,6 +27,9 @@ pub enum L1WatcherError { /// The L1 nofication channel was closed. #[error("l1 notification channel closed")] SendError(#[from] SendError>), + /// An error that occurred when accessing data from the cache. + #[error(transparent)] + Cache(#[from] CacheError), } /// An error occurred during a request to the Ethereum JSON RPC provider. @@ -59,3 +62,11 @@ pub enum FilterLogError { #[error("expected {0} notifications, got {1}")] InvalidNotificationCount(usize, usize), } + +/// An error that occurred when accessing data from the cache. +#[derive(Debug, thiserror::Error)] +pub enum CacheError { + /// The transaction for which the next blob versioned hash was requested is not in the cache. + #[error("transaction {0} not found in cache when requesting next blob versioned hash")] + MissingTransactionInCacheForBlobVersionedHash(B256), +} diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index c14cdcd7..8a09c3b1 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -1,5 +1,8 @@ //! L1 watcher for the Scroll Rollup Node. +mod cache; +use cache::Cache; + mod error; pub use error::{EthRequestError, FilterLogError, L1WatcherError}; @@ -29,6 +32,7 @@ use scroll_l1::abi::logs::{ use std::{ cmp::Ordering, fmt::{Debug, Display, Formatter}, + num::NonZeroUsize, sync::Arc, time::Duration, }; @@ -51,6 +55,10 @@ pub const HEADER_CAPACITY: usize = 100 * MAX_UNFINALIZED_BLOCK_COUNT; #[cfg(not(any(test, feature = "test-utils")))] pub const HEADER_CAPACITY: usize = 2 * MAX_UNFINALIZED_BLOCK_COUNT; +/// The default capacity for the transaction cache. +pub const TRANSACTION_CACHE_CAPACITY: NonZeroUsize = + NonZeroUsize::new(100).expect("non zero capacity"); + /// The Ethereum L1 block response. pub type Block = ::BlockResponse; @@ -77,6 +85,8 @@ pub struct L1Watcher { unfinalized_blocks: BoundedVec
, /// The L1 state info relevant to the rollup node. l1_state: L1State, + /// The cache for the L1 watcher. + cache: Cache, /// The latest indexed block. current_block_number: BlockNumber, /// The sender part of the channel for [`L1Notification`]. @@ -255,6 +265,7 @@ where unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY), current_block_number: start_block.saturating_sub(1), l1_state, + cache: Cache::new(TRANSACTION_CACHE_CAPACITY), sender: tx, config, metrics: WatcherMetrics::default(), @@ -525,7 +536,7 @@ where /// Handles the batch commits events. #[tracing::instrument(skip_all)] - async fn handle_batch_commits(&self, logs: &[Log]) -> L1WatcherResult> { + async fn handle_batch_commits(&mut self, logs: &[Log]) -> L1WatcherResult> { // filter commit logs and skip genesis batch (batch_index == 0). let mut commit_logs_with_tx = logs .iter() @@ -556,15 +567,10 @@ where // iterate each group of commits for (tx_hash, group) in groups { // fetch the commit transaction. - let transaction = self - .execution_provider - .get_transaction_by_hash(tx_hash) - .await? - .ok_or(EthRequestError::MissingTransactionHash(tx_hash))?; + let transaction = + self.cache.get_transaction_by_hash(tx_hash, &self.execution_provider).await?; - // get the optional blobs and calldata. - let mut blob_versioned_hashes = - transaction.blob_versioned_hashes().unwrap_or(&[]).iter().copied(); + // get the calldata. let input = Arc::new(transaction.input().clone()); // iterate the logs emitted in the group @@ -572,6 +578,8 @@ where let block_number = raw_log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; let block_hash = raw_log.block_hash.ok_or(FilterLogError::MissingBlockHash)?; + let blob_versioned_hash = + self.cache.get_transaction_next_blob_versioned_hash(tx_hash).await?; // if the log is missing the block timestamp, we need to fetch it. // the block timestamp is necessary in order to derive the beacon // slot and query the blobs. @@ -596,7 +604,7 @@ where block_number, block_timestamp, calldata: input.clone(), - blob_versioned_hash: blob_versioned_hashes.next(), + blob_versioned_hash, finalized_block_number: None, reverted_block_number: None, }, @@ -875,6 +883,7 @@ mod tests { execution_provider: provider, unfinalized_blocks: unfinalized_blocks.into(), l1_state: L1State { head: Default::default(), finalized: Default::default() }, + cache: Cache::new(TRANSACTION_CACHE_CAPACITY), current_block_number: 0, sender: tx, config: Arc::new(NodeConfig::mainnet()), @@ -1107,7 +1116,7 @@ mod tests { effective_gas_price: None, }; - let (watcher, _) = + let (mut watcher, _) = l1_watcher(chain, vec![], vec![tx.clone()], finalized.clone(), latest.clone()); // build test logs.