diff --git a/Cargo.lock b/Cargo.lock index 4fca96ee9..eb0a3148a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,6 +1890,7 @@ dependencies = [ "hyper", "jsonrpsee", "lru", + "metrics", "serde", "serde_json", "sov-db", diff --git a/crates/batch-prover/src/da_block_handler.rs b/crates/batch-prover/src/da_block_handler.rs index 44e301558..21359c158 100644 --- a/crates/batch-prover/src/da_block_handler.rs +++ b/crates/batch-prover/src/da_block_handler.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context as _}; use borsh::{BorshDeserialize, BorshSerialize}; use citrea_common::cache::L1BlockCache; -use citrea_common::da::get_da_block_at_height; +use citrea_common::da::{get_da_block_at_height, sync_l1}; use citrea_common::utils::merge_state_diffs; use citrea_common::BatchProverConfig; use citrea_primitives::compression::compress_blob; @@ -27,7 +27,7 @@ use sov_rollup_interface::zk::ZkvmHost; use sov_stf_runner::{ProverGuestRunConfig, ProverService}; use tokio::select; use tokio::sync::{mpsc, Mutex}; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -137,6 +137,7 @@ where self.da_service.clone(), l1_tx, self.l1_block_cache.clone(), + BATCH_PROVER_METRICS.scan_l1_block.clone(), ); tokio::pin!(l1_sync_worker); @@ -316,55 +317,6 @@ where } } -async fn sync_l1( - start_l1_height: u64, - da_service: Arc, - sender: mpsc::Sender, - l1_block_cache: Arc>>, -) where - Da: DaService, -{ - let mut l1_height = start_l1_height; - info!("Starting to sync from L1 height {}", l1_height); - - 'block_sync: loop { - let last_finalized_l1_block_header = - match da_service.get_last_finalized_block_header().await { - Ok(header) => header, - Err(e) => { - error!("Could not fetch last finalized L1 block header: {}", e); - sleep(Duration::from_secs(2)).await; - continue; - } - }; - - let new_l1_height = last_finalized_l1_block_header.height(); - - for block_number in l1_height + 1..=new_l1_height { - let l1_block = - match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) - .await - { - Ok(block) => block, - Err(e) => { - error!("Could not fetch last finalized L1 block: {}", e); - sleep(Duration::from_secs(2)).await; - continue 'block_sync; - } - }; - if block_number > l1_height { - l1_height = block_number; - if let Err(e) = sender.send(l1_block).await { - error!("Could not notify about L1 block: {}", e); - continue 'block_sync; - } - } - } - - sleep(Duration::from_secs(2)).await; - } -} - pub(crate) async fn get_batch_proof_circuit_input_from_commitments< 'txs, Da: DaService, diff --git a/crates/batch-prover/src/metrics.rs b/crates/batch-prover/src/metrics.rs index 9d48e279d..707096f91 100644 --- a/crates/batch-prover/src/metrics.rs +++ b/crates/batch-prover/src/metrics.rs @@ -11,6 +11,8 @@ pub struct BatchProverMetrics { pub current_l2_block: Gauge, #[metric(describe = "The duration of processing a single soft confirmation")] pub process_soft_confirmation: Histogram, + #[metric(describe = "The duration of scanning and processing a single L1 block")] + pub scan_l1_block: Histogram, } /// Batch prover metrics diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index de7281fcd..5dd207ccc 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -21,6 +21,7 @@ hex = { workspace = true } hyper = { workspace = true } jsonrpsee = { workspace = true, features = ["http-client", "server"] } lru = { workspace = true } +metrics = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/crates/common/src/da.rs b/crates/common/src/da.rs index 11a8a3224..fc2af556f 100644 --- a/crates/common/src/da.rs +++ b/crates/common/src/da.rs @@ -1,20 +1,80 @@ use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use alloy_primitives::U64; use anyhow::anyhow; use backoff::future::retry as retry_backoff; use backoff::ExponentialBackoffBuilder; use jsonrpsee::http_client::HttpClient; +use metrics::Histogram; use sov_ledger_rpc::LedgerRpcClient; use sov_rollup_interface::da::{BlockHeaderTrait, SequencerCommitment}; use sov_rollup_interface::services::da::{DaService, SlotData}; use sov_rollup_interface::zk::Proof; -use tokio::sync::Mutex; -use tracing::warn; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::sleep; +use tracing::{error, info, warn}; use crate::cache::L1BlockCache; +pub async fn sync_l1( + start_l1_height: u64, + da_service: Arc, + sender: mpsc::Sender, + l1_block_cache: Arc>>, + l1_block_scan_histogram: Histogram, +) where + Da: DaService, +{ + let mut l1_height = start_l1_height; + info!("Starting to sync from L1 height {}", l1_height); + + let start = Instant::now(); + + 'block_sync: loop { + let last_finalized_l1_block_header = + match da_service.get_last_finalized_block_header().await { + Ok(header) => header, + Err(e) => { + error!("Could not fetch last finalized L1 block header: {}", e); + sleep(Duration::from_secs(2)).await; + continue; + } + }; + + let new_l1_height = last_finalized_l1_block_header.height(); + + for block_number in l1_height + 1..=new_l1_height { + let l1_block = + match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) + .await + { + Ok(block) => block, + Err(e) => { + error!("Could not fetch last finalized L1 block: {}", e); + sleep(Duration::from_secs(2)).await; + continue 'block_sync; + } + }; + + if block_number > l1_height { + l1_height = block_number; + l1_block_scan_histogram.record( + Instant::now() + .saturating_duration_since(start) + .as_secs_f64(), + ); + if let Err(e) = sender.send(l1_block).await { + error!("Could not notify about L1 block: {}", e); + continue 'block_sync; + } + } + } + + sleep(Duration::from_secs(2)).await; + } +} + pub async fn get_da_block_at_height( da_service: &Arc, height: u64, diff --git a/crates/fullnode/src/da_block_handler.rs b/crates/fullnode/src/da_block_handler.rs index 6979100c9..6d80da8b7 100644 --- a/crates/fullnode/src/da_block_handler.rs +++ b/crates/fullnode/src/da_block_handler.rs @@ -2,12 +2,11 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Instant; use anyhow::anyhow; use borsh::{BorshDeserialize, BorshSerialize}; use citrea_common::cache::L1BlockCache; -use citrea_common::da::{extract_sequencer_commitments, extract_zk_proofs, get_da_block_at_height}; +use citrea_common::da::{extract_sequencer_commitments, extract_zk_proofs, sync_l1}; use citrea_common::error::SyncError; use citrea_common::utils::check_l2_range_exists; use citrea_primitives::forks::fork_from_block_number; @@ -29,7 +28,7 @@ use sov_rollup_interface::zk::{ }; use tokio::select; use tokio::sync::{mpsc, Mutex}; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -109,6 +108,7 @@ where self.da_service.clone(), l1_tx, self.l1_block_cache.clone(), + FULLNODE_METRICS.scan_l1_block.clone(), ); tokio::pin!(l1_sync_worker); @@ -454,60 +454,3 @@ where Ok(()) } } - -async fn sync_l1( - start_l1_height: u64, - da_service: Arc, - sender: mpsc::Sender, - l1_block_cache: Arc>>, -) where - Da: DaService, -{ - let mut l1_height = start_l1_height; - info!("Starting to sync from L1 height {}", l1_height); - - let start = Instant::now(); - - 'block_sync: loop { - let last_finalized_l1_block_header = - match da_service.get_last_finalized_block_header().await { - Ok(header) => header, - Err(e) => { - error!("Could not fetch last finalized L1 block header: {}", e); - sleep(Duration::from_secs(2)).await; - continue; - } - }; - - let new_l1_height = last_finalized_l1_block_header.height(); - - for block_number in l1_height + 1..=new_l1_height { - let l1_block = - match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) - .await - { - Ok(block) => block, - Err(e) => { - error!("Could not fetch last finalized L1 block: {}", e); - sleep(Duration::from_secs(2)).await; - continue 'block_sync; - } - }; - - if block_number > l1_height { - l1_height = block_number; - FULLNODE_METRICS.scan_l1_block.record( - Instant::now() - .saturating_duration_since(start) - .as_secs_f64(), - ); - if let Err(e) = sender.send(l1_block).await { - error!("Could not notify about L1 block: {}", e); - continue 'block_sync; - } - } - } - - sleep(Duration::from_secs(2)).await; - } -} diff --git a/crates/light-client-prover/src/da_block_handler.rs b/crates/light-client-prover/src/da_block_handler.rs index 7504f78cc..f1ace0e0f 100644 --- a/crates/light-client-prover/src/da_block_handler.rs +++ b/crates/light-client-prover/src/da_block_handler.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use borsh::BorshDeserialize; use citrea_common::cache::L1BlockCache; -use citrea_common::da::get_da_block_at_height; +use citrea_common::da::sync_l1; use citrea_common::LightClientProverConfig; use citrea_primitives::forks::fork_from_block_number; use sov_db::ledger_db::{LightClientProverLedgerOps, SharedLedgerOps}; @@ -20,7 +20,7 @@ use sov_rollup_interface::zk::{ use sov_stf_runner::{ProofData, ProverService}; use tokio::select; use tokio::sync::{mpsc, Mutex}; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -97,13 +97,17 @@ where // .clear_pending_proving_sessions() // .expect("Failed to clear pending proving sessions"); // } - + let start_l1_height = match last_l1_height_scanned { + StartVariant::LastScanned(height) => height + 1, // last scanned block + 1 + StartVariant::FromBlock(height) => height, // first block to scan + }; let (l1_tx, mut l1_rx) = mpsc::channel(1); let l1_sync_worker = sync_l1( - last_l1_height_scanned, + start_l1_height, self.da_service.clone(), l1_tx, self.l1_block_cache.clone(), + LIGHT_CLIENT_METRICS.scan_l1_block.clone(), ); tokio::pin!(l1_sync_worker); @@ -475,58 +479,3 @@ where Ok(proofs[0].clone()) } } - -async fn sync_l1( - start_l1_height: StartVariant, - da_service: Arc, - sender: mpsc::Sender, - l1_block_cache: Arc>>, -) where - Da: DaService, -{ - let mut l1_height = match start_l1_height { - StartVariant::LastScanned(height) => height + 1, // last scanned block + 1 - StartVariant::FromBlock(height) => height, // first block to scan - }; - info!("Starting to sync from L1 height {}", l1_height); - - 'block_sync: loop { - let last_finalized_l1_block_header = - match da_service.get_last_finalized_block_header().await { - Ok(header) => header, - Err(e) => { - error!("Could not fetch last finalized L1 block header: {}", e); - sleep(Duration::from_secs(2)).await; - continue; - } - }; - - let new_l1_height = last_finalized_l1_block_header.height(); - - for block_number in l1_height..=new_l1_height { - let l1_block = - match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) - .await - { - Ok(block) => block, - Err(e) => { - error!("Could not fetch last finalized L1 block: {}", e); - sleep(Duration::from_secs(2)).await; - continue 'block_sync; - } - }; - - if let Err(e) = sender.send(l1_block).await { - error!("Could not notify about L1 block: {}", e); - continue 'block_sync; - } - } - - // next iteration of he loop will start from the next block - if new_l1_height >= l1_height { - l1_height = new_l1_height + 1; - } - - sleep(Duration::from_secs(2)).await; - } -} diff --git a/crates/light-client-prover/src/metrics.rs b/crates/light-client-prover/src/metrics.rs index d07673242..fe2775cbd 100644 --- a/crates/light-client-prover/src/metrics.rs +++ b/crates/light-client-prover/src/metrics.rs @@ -1,4 +1,4 @@ -use metrics::Gauge; +use metrics::{Gauge, Histogram}; use metrics_derive::Metrics; use once_cell::sync::Lazy; @@ -7,6 +7,8 @@ use once_cell::sync::Lazy; pub struct LightClientProverMetrics { #[metric(describe = "The current L1 block number which is used to produce L2 blocks")] pub current_l1_block: Gauge, + #[metric(describe = "The duration of scanning and processing a single L1 block")] + pub scan_l1_block: Histogram, } /// Light client metrics