Skip to content

Commit

Permalink
Move sync_l1 to common
Browse files Browse the repository at this point in the history
  • Loading branch information
rakanalh committed Jan 15, 2025
1 parent 0f6f838 commit 59d3d3d
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 174 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 3 additions & 51 deletions crates/batch-prover/src/da_block_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context as _};
use borsh::{BorshDeserialize, BorshSerialize};
use citrea_common::cache::L1BlockCache;
use citrea_common::da::get_da_block_at_height;
use citrea_common::da::{get_da_block_at_height, sync_l1};
use citrea_common::utils::merge_state_diffs;
use citrea_common::BatchProverConfig;
use citrea_primitives::compression::compress_blob;
Expand All @@ -27,7 +27,7 @@ use sov_rollup_interface::zk::ZkvmHost;
use sov_stf_runner::{ProverGuestRunConfig, ProverService};
use tokio::select;
use tokio::sync::{mpsc, Mutex};
use tokio::time::{sleep, Duration};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

Expand Down Expand Up @@ -137,6 +137,7 @@ where
self.da_service.clone(),
l1_tx,
self.l1_block_cache.clone(),
BATCH_PROVER_METRICS.scan_l1_block.clone(),
);
tokio::pin!(l1_sync_worker);

Expand Down Expand Up @@ -316,55 +317,6 @@ where
}
}

async fn sync_l1<Da>(
start_l1_height: u64,
da_service: Arc<Da>,
sender: mpsc::Sender<Da::FilteredBlock>,
l1_block_cache: Arc<Mutex<L1BlockCache<Da>>>,
) where
Da: DaService,
{
let mut l1_height = start_l1_height;
info!("Starting to sync from L1 height {}", l1_height);

'block_sync: loop {
let last_finalized_l1_block_header =
match da_service.get_last_finalized_block_header().await {
Ok(header) => header,
Err(e) => {
error!("Could not fetch last finalized L1 block header: {}", e);
sleep(Duration::from_secs(2)).await;
continue;
}
};

let new_l1_height = last_finalized_l1_block_header.height();

for block_number in l1_height + 1..=new_l1_height {
let l1_block =
match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone())
.await
{
Ok(block) => block,
Err(e) => {
error!("Could not fetch last finalized L1 block: {}", e);
sleep(Duration::from_secs(2)).await;
continue 'block_sync;
}
};
if block_number > l1_height {
l1_height = block_number;
if let Err(e) = sender.send(l1_block).await {
error!("Could not notify about L1 block: {}", e);
continue 'block_sync;
}
}
}

sleep(Duration::from_secs(2)).await;
}
}

pub(crate) async fn get_batch_proof_circuit_input_from_commitments<
'txs,
Da: DaService,
Expand Down
2 changes: 2 additions & 0 deletions crates/batch-prover/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub struct BatchProverMetrics {
pub current_l2_block: Gauge,
#[metric(describe = "The duration of processing a single soft confirmation")]
pub process_soft_confirmation: Histogram,
#[metric(describe = "The duration of scanning and processing a single L1 block")]
pub scan_l1_block: Histogram,
}

/// Batch prover metrics
Expand Down
1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ hex = { workspace = true }
hyper = { workspace = true }
jsonrpsee = { workspace = true, features = ["http-client", "server"] }
lru = { workspace = true }
metrics = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
Expand Down
66 changes: 63 additions & 3 deletions crates/common/src/da.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,80 @@
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use alloy_primitives::U64;
use anyhow::anyhow;
use backoff::future::retry as retry_backoff;
use backoff::ExponentialBackoffBuilder;
use jsonrpsee::http_client::HttpClient;
use metrics::Histogram;
use sov_ledger_rpc::LedgerRpcClient;
use sov_rollup_interface::da::{BlockHeaderTrait, SequencerCommitment};
use sov_rollup_interface::services::da::{DaService, SlotData};
use sov_rollup_interface::zk::Proof;
use tokio::sync::Mutex;
use tracing::warn;
use tokio::sync::{mpsc, Mutex};
use tokio::time::sleep;
use tracing::{error, info, warn};

use crate::cache::L1BlockCache;

pub async fn sync_l1<Da>(
start_l1_height: u64,
da_service: Arc<Da>,
sender: mpsc::Sender<Da::FilteredBlock>,
l1_block_cache: Arc<Mutex<L1BlockCache<Da>>>,
l1_block_scan_histogram: Histogram,
) where
Da: DaService,
{
let mut l1_height = start_l1_height;
info!("Starting to sync from L1 height {}", l1_height);

let start = Instant::now();

'block_sync: loop {
let last_finalized_l1_block_header =
match da_service.get_last_finalized_block_header().await {
Ok(header) => header,
Err(e) => {
error!("Could not fetch last finalized L1 block header: {}", e);
sleep(Duration::from_secs(2)).await;
continue;
}
};

let new_l1_height = last_finalized_l1_block_header.height();

for block_number in l1_height + 1..=new_l1_height {
let l1_block =
match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone())
.await
{
Ok(block) => block,
Err(e) => {
error!("Could not fetch last finalized L1 block: {}", e);
sleep(Duration::from_secs(2)).await;
continue 'block_sync;
}
};

if block_number > l1_height {
l1_height = block_number;
l1_block_scan_histogram.record(
Instant::now()
.saturating_duration_since(start)
.as_secs_f64(),
);
if let Err(e) = sender.send(l1_block).await {
error!("Could not notify about L1 block: {}", e);
continue 'block_sync;
}
}
}

sleep(Duration::from_secs(2)).await;
}
}

pub async fn get_da_block_at_height<Da: DaService>(
da_service: &Arc<Da>,
height: u64,
Expand Down
63 changes: 3 additions & 60 deletions crates/fullnode/src/da_block_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;

use anyhow::anyhow;
use borsh::{BorshDeserialize, BorshSerialize};
use citrea_common::cache::L1BlockCache;
use citrea_common::da::{extract_sequencer_commitments, extract_zk_proofs, get_da_block_at_height};
use citrea_common::da::{extract_sequencer_commitments, extract_zk_proofs, sync_l1};
use citrea_common::error::SyncError;
use citrea_common::utils::check_l2_range_exists;
use citrea_primitives::forks::fork_from_block_number;
Expand All @@ -29,7 +28,7 @@ use sov_rollup_interface::zk::{
};
use tokio::select;
use tokio::sync::{mpsc, Mutex};
use tokio::time::{sleep, Duration};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

Expand Down Expand Up @@ -109,6 +108,7 @@ where
self.da_service.clone(),
l1_tx,
self.l1_block_cache.clone(),
FULLNODE_METRICS.scan_l1_block.clone(),
);
tokio::pin!(l1_sync_worker);

Expand Down Expand Up @@ -454,60 +454,3 @@ where
Ok(())
}
}

async fn sync_l1<Da>(
start_l1_height: u64,
da_service: Arc<Da>,
sender: mpsc::Sender<Da::FilteredBlock>,
l1_block_cache: Arc<Mutex<L1BlockCache<Da>>>,
) where
Da: DaService,
{
let mut l1_height = start_l1_height;
info!("Starting to sync from L1 height {}", l1_height);

let start = Instant::now();

'block_sync: loop {
let last_finalized_l1_block_header =
match da_service.get_last_finalized_block_header().await {
Ok(header) => header,
Err(e) => {
error!("Could not fetch last finalized L1 block header: {}", e);
sleep(Duration::from_secs(2)).await;
continue;
}
};

let new_l1_height = last_finalized_l1_block_header.height();

for block_number in l1_height + 1..=new_l1_height {
let l1_block =
match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone())
.await
{
Ok(block) => block,
Err(e) => {
error!("Could not fetch last finalized L1 block: {}", e);
sleep(Duration::from_secs(2)).await;
continue 'block_sync;
}
};

if block_number > l1_height {
l1_height = block_number;
FULLNODE_METRICS.scan_l1_block.record(
Instant::now()
.saturating_duration_since(start)
.as_secs_f64(),
);
if let Err(e) = sender.send(l1_block).await {
error!("Could not notify about L1 block: {}", e);
continue 'block_sync;
}
}
}

sleep(Duration::from_secs(2)).await;
}
}
67 changes: 8 additions & 59 deletions crates/light-client-prover/src/da_block_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use borsh::BorshDeserialize;
use citrea_common::cache::L1BlockCache;
use citrea_common::da::get_da_block_at_height;
use citrea_common::da::sync_l1;
use citrea_common::LightClientProverConfig;
use citrea_primitives::forks::fork_from_block_number;
use sov_db::ledger_db::{LightClientProverLedgerOps, SharedLedgerOps};
Expand All @@ -20,7 +20,7 @@ use sov_rollup_interface::zk::{
use sov_stf_runner::{ProofData, ProverService};
use tokio::select;
use tokio::sync::{mpsc, Mutex};
use tokio::time::{sleep, Duration};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

Expand Down Expand Up @@ -97,13 +97,17 @@ where
// .clear_pending_proving_sessions()
// .expect("Failed to clear pending proving sessions");
// }

let start_l1_height = match last_l1_height_scanned {
StartVariant::LastScanned(height) => height + 1, // last scanned block + 1
StartVariant::FromBlock(height) => height, // first block to scan
};
let (l1_tx, mut l1_rx) = mpsc::channel(1);
let l1_sync_worker = sync_l1(
last_l1_height_scanned,
start_l1_height,
self.da_service.clone(),
l1_tx,
self.l1_block_cache.clone(),
LIGHT_CLIENT_METRICS.scan_l1_block.clone(),
);
tokio::pin!(l1_sync_worker);

Expand Down Expand Up @@ -475,58 +479,3 @@ where
Ok(proofs[0].clone())
}
}

async fn sync_l1<Da>(
start_l1_height: StartVariant,
da_service: Arc<Da>,
sender: mpsc::Sender<Da::FilteredBlock>,
l1_block_cache: Arc<Mutex<L1BlockCache<Da>>>,
) where
Da: DaService,
{
let mut l1_height = match start_l1_height {
StartVariant::LastScanned(height) => height + 1, // last scanned block + 1
StartVariant::FromBlock(height) => height, // first block to scan
};
info!("Starting to sync from L1 height {}", l1_height);

'block_sync: loop {
let last_finalized_l1_block_header =
match da_service.get_last_finalized_block_header().await {
Ok(header) => header,
Err(e) => {
error!("Could not fetch last finalized L1 block header: {}", e);
sleep(Duration::from_secs(2)).await;
continue;
}
};

let new_l1_height = last_finalized_l1_block_header.height();

for block_number in l1_height..=new_l1_height {
let l1_block =
match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone())
.await
{
Ok(block) => block,
Err(e) => {
error!("Could not fetch last finalized L1 block: {}", e);
sleep(Duration::from_secs(2)).await;
continue 'block_sync;
}
};

if let Err(e) = sender.send(l1_block).await {
error!("Could not notify about L1 block: {}", e);
continue 'block_sync;
}
}

// next iteration of he loop will start from the next block
if new_l1_height >= l1_height {
l1_height = new_l1_height + 1;
}

sleep(Duration::from_secs(2)).await;
}
}
4 changes: 3 additions & 1 deletion crates/light-client-prover/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metrics::Gauge;
use metrics::{Gauge, Histogram};
use metrics_derive::Metrics;
use once_cell::sync::Lazy;

Expand All @@ -7,6 +7,8 @@ use once_cell::sync::Lazy;
pub struct LightClientProverMetrics {
#[metric(describe = "The current L1 block number which is used to produce L2 blocks")]
pub current_l1_block: Gauge,
#[metric(describe = "The duration of scanning and processing a single L1 block")]
pub scan_l1_block: Histogram,
}

/// Light client metrics
Expand Down

0 comments on commit 59d3d3d

Please sign in to comment.