Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace.package]
version = "0.0.1"
edition = "2021"
rust-version = "1.82"
rust-version = "1.83"
license = "MIT OR Apache-2.0"
exclude = [".github/"]

Expand Down Expand Up @@ -220,6 +220,7 @@ clap = { version = "4", features = ["derive", "env"] }
derive_more = { version = "2.0", default-features = false }
eyre = "0.6"
futures = { version = "0.3", default-features = false }
lru = "0.13.0"
metrics = "0.24.0"
metrics-derive = "0.1"
parking_lot = "0.12"
Expand Down
8 changes: 4 additions & 4 deletions crates/chain-orchestrator/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ impl SyncState {
}

/// Returns a mutable reference to the sync mode of L1.
pub fn l1_mut(&mut self) -> &mut SyncMode {
pub const fn l1_mut(&mut self) -> &mut SyncMode {
&mut self.l1
}

/// Returns a mutable reference to the sync mode of L2.
pub fn l2_mut(&mut self) -> &mut SyncMode {
pub const fn l2_mut(&mut self) -> &mut SyncMode {
&mut self.l2
}

Expand Down Expand Up @@ -64,12 +64,12 @@ impl SyncMode {
}

/// Sets the sync mode to [`SyncMode::Synced`].
pub fn set_synced(&mut self) {
pub const fn set_synced(&mut self) {
*self = Self::Synced;
}

/// Sets the sync mode to [`SyncMode::Syncing`].
pub fn set_syncing(&mut self) {
pub const fn set_syncing(&mut self) {
*self = Self::Syncing;
}
}
2 changes: 1 addition & 1 deletion crates/node/src/test_utils/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl L1MessagesAssertion {

impl<'a> BlockBuilder<'a> {
/// Create a new block builder.
pub(crate) fn new(fixture: &'a mut TestFixture) -> Self {
pub(crate) const fn new(fixture: &'a mut TestFixture) -> Self {
Self {
fixture,
expected_tx_hashes: Vec::new(),
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/test_utils/event_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct EventWaiter<'a> {

impl<'a> EventWaiter<'a> {
/// Create a new multi-node event waiter.
pub fn new(fixture: &'a mut TestFixture, node_indices: Vec<usize>) -> Self {
pub const fn new(fixture: &'a mut TestFixture, node_indices: Vec<usize>) -> Self {
Self { fixture, node_indices, timeout_duration: Duration::from_secs(30) }
}

Expand Down
8 changes: 4 additions & 4 deletions crates/node/src/test_utils/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,17 @@ impl TestFixture {
}

/// Start building a block using the sequencer.
pub fn build_block(&mut self) -> BlockBuilder<'_> {
pub const fn build_block(&mut self) -> BlockBuilder<'_> {
BlockBuilder::new(self)
}

/// Get L1 helper for managing L1 interactions.
pub fn l1(&mut self) -> L1Helper<'_> {
pub const fn l1(&mut self) -> L1Helper<'_> {
L1Helper::new(self)
}

/// Get transaction helper for creating and injecting transactions.
pub fn tx(&mut self) -> TxHelper<'_> {
pub const fn tx(&mut self) -> TxHelper<'_> {
TxHelper::new(self)
}

Expand Down Expand Up @@ -417,7 +417,7 @@ impl TestFixtureBuilder {
}

/// Get a mutable reference to the underlying config for advanced customization.
pub fn config_mut(&mut self) -> &mut ScrollRollupNodeConfig {
pub const fn config_mut(&mut self) -> &mut ScrollRollupNodeConfig {
&mut self.config
}

Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/test_utils/l1_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct L1Helper<'a> {

impl<'a> L1Helper<'a> {
/// Create a new L1 helper.
pub(crate) fn new(fixture: &'a mut TestFixture) -> Self {
pub(crate) const fn new(fixture: &'a mut TestFixture) -> Self {
Self { fixture, target_node_index: None }
}

Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/test_utils/network_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub struct ReputationChecker<'a> {

impl<'a> ReputationChecker<'a> {
/// Create a new reputation checker.
pub fn new(fixture: &'a mut TestFixture, observer_node: usize) -> Self {
pub const fn new(fixture: &'a mut TestFixture, observer_node: usize) -> Self {
Self {
fixture,
observer_node,
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/test_utils/tx_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct TxHelper<'a> {

impl<'a> TxHelper<'a> {
/// Create a new transaction helper.
pub(crate) fn new(fixture: &'a mut TestFixture) -> Self {
pub(crate) const fn new(fixture: &'a mut TestFixture) -> Self {
Self { fixture, target_node_index: 0 }
}

Expand Down
2 changes: 1 addition & 1 deletion crates/providers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async-trait.workspace = true
auto_impl.workspace = true
eyre.workspace = true
futures.workspace = true
lru = "0.13.0"
lru.workspace = true
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/scroll-wire/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl ScrollWireManager {
}

/// Returns a mutable reference to the state of the `ScrollWire` protocol.
pub fn state_mut(&mut self) -> &mut HashMap<PeerId, LruCache<B256>> {
pub const fn state_mut(&mut self) -> &mut HashMap<PeerId, LruCache<B256>> {
&mut self.state
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ scroll-alloy-consensus.workspace = true
arbitrary = { workspace = true, optional = true }
async-trait.workspace = true
itertools = "0.14"
lru.workspace = true
metrics.workspace = true
metrics-derive.workspace = true
rand = { workspace = true, optional = true }
Expand Down
103 changes: 103 additions & 0 deletions crates/watcher/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use crate::error::CacheError;

use super::{EthRequestError, L1WatcherResult};

use std::num::NonZeroUsize;

use alloy_primitives::{TxHash, B256};
use alloy_provider::Provider;
use alloy_rpc_types_eth::{Transaction, TransactionTrait};
use lru::LruCache;

/// The L1 watcher cache.
#[derive(Debug)]
pub(crate) struct Cache {
transaction_cache: TransactionCache,
// TODO: introduce block cache.
}

impl Cache {
/// Creates a new [`Cache`] instance with the given capacity for the transaction cache.
pub(crate) fn new(transaction_cache_capacity: NonZeroUsize) -> Self {
Self { transaction_cache: TransactionCache::new(transaction_cache_capacity) }
}

/// Gets the transaction for the given hash, fetching it from the provider if not cached.
pub(crate) async fn get_transaction_by_hash<P: Provider>(
&mut self,
tx_hash: TxHash,
provider: &P,
) -> L1WatcherResult<Transaction> {
self.transaction_cache.get_transaction_by_hash(tx_hash, provider).await
}

/// Gets the next blob versioned hash for the given transaction hash.
///
/// Errors if the transaction is not in the cache. This method must be called only after
/// fetching the transaction via [`Self::get_transaction_by_hash`].
pub(crate) async fn get_transaction_next_blob_versioned_hash(
&mut self,
tx_hash: TxHash,
) -> L1WatcherResult<Option<B256>> {
self.transaction_cache.get_transaction_next_blob_versioned_hash(tx_hash).await
}
}

/// A cache for transactions fetched from the provider.
#[derive(Debug)]
struct TransactionCache {
cache: LruCache<TxHash, TransactionEntry>,
}

#[derive(Debug)]
struct TransactionEntry {
transaction: Transaction,
blob_versioned_hashes: Vec<B256>,
blob_versioned_hashes_cursor: usize,
}

impl TransactionCache {
fn new(capacity: NonZeroUsize) -> Self {
Self { cache: LruCache::new(capacity) }
}

async fn get_transaction_by_hash<P: Provider>(
&mut self,
tx_hash: TxHash,
provider: &P,
) -> L1WatcherResult<Transaction> {
if let Some(entry) = self.cache.get(&tx_hash) {
return Ok(entry.transaction.clone());
}

let transaction = provider
.get_transaction_by_hash(tx_hash)
.await?
.ok_or(EthRequestError::MissingTransactionHash(tx_hash))?;
self.cache.put(tx_hash, transaction.clone().into());
Ok(transaction)
}

async fn get_transaction_next_blob_versioned_hash(
&mut self,
tx_hash: TxHash,
) -> L1WatcherResult<Option<B256>> {
if let Some(entry) = self.cache.get_mut(&tx_hash) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wonder why get_transaction_next_blob_versioned_hash also fetches it from the provider when the cache doesn't find it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because get_transaction_next_blob_versioned_hash is a stateful process, I thought it would reduce errors if we made the process more explicit and required a specific sequence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be that there is no need to call get_transaction_next_blob_versioned_hash separately in the future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is possible we can remove this method in the future with a bit of refactoring.

let blob_versioned_hash =
entry.blob_versioned_hashes.get(entry.blob_versioned_hashes_cursor).copied();
entry.blob_versioned_hashes_cursor += 1;
Ok(blob_versioned_hash)
} else {
Err(CacheError::MissingTransactionInCacheForBlobVersionedHash(tx_hash).into())
}
}
}

impl From<Transaction> for TransactionEntry {
fn from(transaction: Transaction) -> Self {
let blob_versioned_hashes =
transaction.blob_versioned_hashes().map(|hashes| hashes.to_vec()).unwrap_or_default();

Self { transaction, blob_versioned_hashes, blob_versioned_hashes_cursor: 0 }
}
}
11 changes: 11 additions & 0 deletions crates/watcher/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum L1WatcherError {
/// The L1 nofication channel was closed.
#[error("l1 notification channel closed")]
SendError(#[from] SendError<Arc<L1Notification>>),
/// An error that occurred when accessing data from the cache.
#[error(transparent)]
Cache(#[from] CacheError),
}

/// An error occurred during a request to the Ethereum JSON RPC provider.
Expand Down Expand Up @@ -59,3 +62,11 @@ pub enum FilterLogError {
#[error("expected {0} notifications, got {1}")]
InvalidNotificationCount(usize, usize),
}

/// An error that occurred when accessing data from the cache.
#[derive(Debug, thiserror::Error)]
pub enum CacheError {
/// The transaction for which the next blob versioned hash was requested is not in the cache.
#[error("transaction {0} not found in cache when requesting next blob versioned hash")]
MissingTransactionInCacheForBlobVersionedHash(B256),
}
31 changes: 20 additions & 11 deletions crates/watcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! L1 watcher for the Scroll Rollup Node.

mod cache;
use cache::Cache;

mod error;
pub use error::{EthRequestError, FilterLogError, L1WatcherError};

Expand Down Expand Up @@ -29,6 +32,7 @@ use scroll_l1::abi::logs::{
use std::{
cmp::Ordering,
fmt::{Debug, Display, Formatter},
num::NonZeroUsize,
sync::Arc,
time::Duration,
};
Expand All @@ -51,6 +55,10 @@ pub const HEADER_CAPACITY: usize = 100 * MAX_UNFINALIZED_BLOCK_COUNT;
#[cfg(not(any(test, feature = "test-utils")))]
pub const HEADER_CAPACITY: usize = 2 * MAX_UNFINALIZED_BLOCK_COUNT;

/// The default capacity for the transaction cache.
pub const TRANSACTION_CACHE_CAPACITY: NonZeroUsize =
NonZeroUsize::new(100).expect("non zero capacity");

/// The Ethereum L1 block response.
pub type Block = <Ethereum as Network>::BlockResponse;

Expand All @@ -77,6 +85,8 @@ pub struct L1Watcher<EP> {
unfinalized_blocks: BoundedVec<Header>,
/// The L1 state info relevant to the rollup node.
l1_state: L1State,
/// The cache for the L1 watcher.
cache: Cache,
/// The latest indexed block.
current_block_number: BlockNumber,
/// The sender part of the channel for [`L1Notification`].
Expand Down Expand Up @@ -255,6 +265,7 @@ where
unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY),
current_block_number: start_block.saturating_sub(1),
l1_state,
cache: Cache::new(TRANSACTION_CACHE_CAPACITY),
sender: tx,
config,
metrics: WatcherMetrics::default(),
Expand Down Expand Up @@ -525,7 +536,7 @@ where

/// Handles the batch commits events.
#[tracing::instrument(skip_all)]
async fn handle_batch_commits(&self, logs: &[Log]) -> L1WatcherResult<Vec<L1Notification>> {
async fn handle_batch_commits(&mut self, logs: &[Log]) -> L1WatcherResult<Vec<L1Notification>> {
// filter commit logs and skip genesis batch (batch_index == 0).
let mut commit_logs_with_tx = logs
.iter()
Expand Down Expand Up @@ -556,22 +567,19 @@ where
// iterate each group of commits
for (tx_hash, group) in groups {
// fetch the commit transaction.
let transaction = self
.execution_provider
.get_transaction_by_hash(tx_hash)
.await?
.ok_or(EthRequestError::MissingTransactionHash(tx_hash))?;
let transaction =
self.cache.get_transaction_by_hash(tx_hash, &self.execution_provider).await?;

// get the optional blobs and calldata.
let mut blob_versioned_hashes =
transaction.blob_versioned_hashes().unwrap_or(&[]).iter().copied();
// get the calldata.
let input = Arc::new(transaction.input().clone());

// iterate the logs emitted in the group
for (raw_log, decoded_log, _) in group {
let block_number =
raw_log.block_number.ok_or(FilterLogError::MissingBlockNumber)?;
let block_hash = raw_log.block_hash.ok_or(FilterLogError::MissingBlockHash)?;
let blob_versioned_hash =
self.cache.get_transaction_next_blob_versioned_hash(tx_hash).await?;
// if the log is missing the block timestamp, we need to fetch it.
// the block timestamp is necessary in order to derive the beacon
// slot and query the blobs.
Expand All @@ -596,7 +604,7 @@ where
block_number,
block_timestamp,
calldata: input.clone(),
blob_versioned_hash: blob_versioned_hashes.next(),
blob_versioned_hash,
finalized_block_number: None,
reverted_block_number: None,
},
Expand Down Expand Up @@ -875,6 +883,7 @@ mod tests {
execution_provider: provider,
unfinalized_blocks: unfinalized_blocks.into(),
l1_state: L1State { head: Default::default(), finalized: Default::default() },
cache: Cache::new(TRANSACTION_CACHE_CAPACITY),
current_block_number: 0,
sender: tx,
config: Arc::new(NodeConfig::mainnet()),
Expand Down Expand Up @@ -1107,7 +1116,7 @@ mod tests {
effective_gas_price: None,
};

let (watcher, _) =
let (mut watcher, _) =
l1_watcher(chain, vec![], vec![tx.clone()], finalized.clone(), latest.clone());

// build test logs.
Expand Down
Loading