Skip to content

ref: split fetcher and parser functionality #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion state-reconstruct-fetcher/src/blob_http_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::Deserialize;
use tokio::time::{sleep, Duration};

use crate::types::ParseError;
use crate::ParseError;

/// `MAX_RETRIES` is the maximum number of retries on failed blob retrieval.
const MAX_RETRIES: u8 = 5;
Expand Down
195 changes: 15 additions & 180 deletions state-reconstruct-fetcher/src/l1_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{cmp, fs::File, future::Future, sync::Arc};

use ethers::{
abi::{Contract, Function},
prelude::*,
};
use ethers::{abi::Contract, prelude::*};
use eyre::Result;
use rand::random;
use thiserror::Error;
Expand All @@ -14,17 +11,17 @@ use tokio::{
use tokio_util::sync::CancellationToken;

use crate::{
blob_http_client::BlobHttpClient,
constants::ethereum::{BLOB_BLOCK, BLOCK_STEP, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR},
constants::ethereum::{BLOCK_STEP, GENESIS_BLOCK, ZK_SYNC_ADDR},
database::InnerDB,
metrics::L1Metrics,
types::{v1::V1, v2::V2, CommitBlock, ParseError},
parser::Parser,
types::CommitBlock,
};

/// `MAX_RETRIES` is the maximum number of retries on failed L1 call.
const MAX_RETRIES: u8 = 5;
/// The interval in seconds in which to poll for new blocks.
const LONG_POLLING_INTERVAL_S: u64 = 120;
pub const LONG_POLLING_INTERVAL_S: u64 = 120;
/// The interval in seconds to wait before retrying to fetch a previously failed transaction.
const FAILED_FETCH_RETRY_INTERVAL_S: u64 = 10;
/// The interval in seconds in which to print metrics.
Expand Down Expand Up @@ -59,9 +56,9 @@ pub struct L1FetcherOptions {
}

#[derive(Clone)]
struct Contracts {
v1: Contract,
v2: Contract,
pub struct Contracts {
pub v1: Contract,
pub v2: Contract,
}

pub struct L1Fetcher {
Expand Down Expand Up @@ -174,7 +171,13 @@ impl L1Fetcher {
token.clone(),
current_l1_block_number.as_u64(),
);
let parse_handle = self.spawn_parsing_handler(calldata_rx, sink, token.clone())?;
let parser = Parser::new(
self.metrics.clone(),
self.contracts.clone(),
self.config.blobs_url.clone(),
current_l1_block_number.as_u64(),
)?;
let parse_handle = parser.spawn_parsing_handler(calldata_rx, sink, token.clone())?;
let main_handle = self.spawn_main_handler(
hash_tx,
token,
Expand Down Expand Up @@ -442,93 +445,6 @@ impl L1Fetcher {
})
}

fn spawn_parsing_handler(
&self,
mut l1_tx_rx: mpsc::Receiver<Transaction>,
sink: mpsc::Sender<CommitBlock>,
cancellation_token: CancellationToken,
) -> Result<tokio::task::JoinHandle<Option<u64>>> {
let metrics = self.metrics.clone();
let contracts = self.contracts.clone();
let client = BlobHttpClient::new(self.config.blobs_url.clone())?;
Ok(tokio::spawn({
async move {
let mut boojum_mode = false;
let mut function =
contracts.v1.functions_by_name("commitBlocks").unwrap()[0].clone();
let mut last_block_number_processed = None;

while let Some(tx) = l1_tx_rx.recv().await {
if cancellation_token.is_cancelled() {
tracing::debug!("Shutting down parsing handler...");
return last_block_number_processed;
}

let before = Instant::now();
let Some(block_number) = tx.block_number else {
tracing::error!("transaction has no block number");
break;
};
let block_number = block_number.as_u64();

if !boojum_mode && block_number >= BOOJUM_BLOCK {
tracing::debug!("Reached `BOOJUM_BLOCK`, changing commit block format");
boojum_mode = true;
function =
contracts.v2.functions_by_name("commitBatches").unwrap()[0].clone();
}

let blocks = loop {
match parse_calldata(block_number, &function, &tx.input, &client).await {
Ok(blks) => break blks,
Err(e) => match e {
ParseError::BlobStorageError(_) => {
if cancellation_token.is_cancelled() {
tracing::debug!("Shutting down parsing...");
return last_block_number_processed;
}
sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await;
}
ParseError::BlobFormatError(data, inner) => {
tracing::error!("Cannot parse {}: {}", data, inner);
cancellation_token.cancel();
return last_block_number_processed;
}
_ => {
tracing::error!("Failed to parse calldata: {e}");
cancellation_token.cancel();
return last_block_number_processed;
}
},
}
};

let mut metrics = metrics.lock().await;
for blk in blocks {
metrics.latest_l2_block_num = blk.l2_block_number;
if let Err(e) = sink.send(blk).await {
if cancellation_token.is_cancelled() {
tracing::debug!("Shutting down parsing task...");
} else {
tracing::error!("Cannot send block: {e}");
cancellation_token.cancel();
}

return last_block_number_processed;
}
}

last_block_number_processed = Some(block_number);
let duration = before.elapsed();
metrics.parsing.add(duration);
}

// Return the last processed l1 block number, so we can resume from the same point later on.
last_block_number_processed
}
}))
}

async fn retry_call<T, Fut>(callback: impl Fn() -> Fut, err: L1FetchError) -> Result<T>
where
Fut: Future<Output = Result<T, ProviderError>>,
Expand All @@ -545,84 +461,3 @@ impl L1Fetcher {
Err(err.into())
}
}

pub async fn parse_calldata(
l1_block_number: u64,
commit_blocks_fn: &Function,
calldata: &[u8],
client: &BlobHttpClient,
) -> Result<Vec<CommitBlock>, ParseError> {
let mut parsed_input = commit_blocks_fn
.decode_input(&calldata[4..])
.map_err(|e| ParseError::InvalidCalldata(e.to_string()))?;

if parsed_input.len() != 2 {
return Err(ParseError::InvalidCalldata(format!(
"invalid number of parameters (got {}, expected 2) for commitBlocks function",
parsed_input.len()
)));
}

let new_blocks_data = parsed_input
.pop()
.ok_or_else(|| ParseError::InvalidCalldata("new blocks data".to_string()))?;
let stored_block_info = parsed_input
.pop()
.ok_or_else(|| ParseError::InvalidCalldata("stored block info".to_string()))?;

let abi::Token::Tuple(stored_block_info) = stored_block_info else {
return Err(ParseError::InvalidCalldata(
"invalid StoredBlockInfo".to_string(),
));
};

let abi::Token::Uint(_previous_l2_block_number) = stored_block_info[0].clone() else {
return Err(ParseError::InvalidStoredBlockInfo(
"cannot parse previous L2 block number".to_string(),
));
};

let abi::Token::Uint(_previous_enumeration_index) = stored_block_info[2].clone() else {
return Err(ParseError::InvalidStoredBlockInfo(
"cannot parse previous enumeration index".to_string(),
));
};

// Parse blocks using [`CommitBlockInfoV1`] or [`CommitBlockInfoV2`]
let mut block_infos =
parse_commit_block_info(&new_blocks_data, l1_block_number, client).await?;
// Supplement every `CommitBlock` element with L1 block number information.
block_infos
.iter_mut()
.for_each(|e| e.l1_block_number = Some(l1_block_number));
Ok(block_infos)
}

async fn parse_commit_block_info(
data: &abi::Token,
l1_block_number: u64,
client: &BlobHttpClient,
) -> Result<Vec<CommitBlock>, ParseError> {
let abi::Token::Array(data) = data else {
return Err(ParseError::InvalidCommitBlockInfo(
"cannot convert newBlocksData to array".to_string(),
));
};

let mut result = vec![];
for d in data {
let commit_block = {
if l1_block_number >= BLOB_BLOCK {
CommitBlock::try_from_token_resolve(d, client).await?
} else if l1_block_number >= BOOJUM_BLOCK {
CommitBlock::try_from_token::<V2>(d)?
} else {
CommitBlock::try_from_token::<V1>(d)?
}
};

result.push(commit_block);
}

Ok(result)
}
32 changes: 32 additions & 0 deletions state-reconstruct-fetcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,40 @@
#![feature(array_chunks)]
#![feature(iter_next_chunk)]
use thiserror::Error;

pub mod blob_http_client;
pub mod constants;
pub mod database;
pub mod l1_fetcher;
pub mod metrics;
pub mod parser;
pub mod types;

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
pub enum ParseError {
#[error("invalid Calldata: {0}")]
InvalidCalldata(String),

#[error("invalid StoredBlockInfo: {0}")]
InvalidStoredBlockInfo(String),

#[error("invalid CommitBlockInfo: {0}")]
InvalidCommitBlockInfo(String),

#[allow(dead_code)]
#[error("invalid compressed bytecode: {0}")]
InvalidCompressedByteCode(String),

#[error("invalid compressed value: {0}")]
InvalidCompressedValue(String),

#[error("invalid pubdata source: {0}")]
InvalidPubdataSource(String),

#[error("blob storage error: {0}")]
BlobStorageError(String),

#[error("blob format error: {0}")]
BlobFormatError(String, String),
}
88 changes: 88 additions & 0 deletions state-reconstruct-fetcher/src/parser/calldata_tokens.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use ethers::{
abi::{self, Token},
types::U256,
};

use crate::ParseError;

pub struct CalldataToken {
pub new_blocks_data: NewBlocksDataToken,
pub stored_block_info: StoredBlockInfoToken,
}

impl TryFrom<Vec<Token>> for CalldataToken {
type Error = ParseError;

fn try_from(mut value: Vec<Token>) -> Result<Self, Self::Error> {
if value.len() != 2 {
return Err(ParseError::InvalidCalldata(format!(
"invalid number of parameters (got {}, expected 2) for commitBlocks function",
value.len()
)));
}

let new_blocks_data = value
.pop()
.ok_or_else(|| ParseError::InvalidCalldata("new blocks data".to_string()))?;
let stored_block_info = value
.pop()
.ok_or_else(|| ParseError::InvalidCalldata("stored block info".to_string()))?;

Ok(CalldataToken {
new_blocks_data: NewBlocksDataToken::try_from(new_blocks_data)?,
stored_block_info: StoredBlockInfoToken::try_from(stored_block_info)?,
})
}
}

pub struct NewBlocksDataToken {
pub data: Vec<Token>,
}

impl TryFrom<Token> for NewBlocksDataToken {
type Error = ParseError;

fn try_from(value: Token) -> Result<Self, Self::Error> {
let abi::Token::Array(data) = value else {
return Err(ParseError::InvalidCommitBlockInfo(
"cannot convert newBlocksData to array".to_string(),
));
};

Ok(Self { data })
}
}

pub struct StoredBlockInfoToken {
pub previous_l2_block_number: U256,
pub previous_enumeration_index: U256,
}

impl TryFrom<Token> for StoredBlockInfoToken {
type Error = ParseError;

fn try_from(value: Token) -> Result<Self, Self::Error> {
let abi::Token::Tuple(stored_block_info) = value else {
return Err(ParseError::InvalidCalldata(
"invalid StoredBlockInfo".to_string(),
));
};

let abi::Token::Uint(previous_l2_block_number) = stored_block_info[0].clone() else {
return Err(ParseError::InvalidStoredBlockInfo(
"cannot parse previous L2 block number".to_string(),
));
};

let abi::Token::Uint(previous_enumeration_index) = stored_block_info[2].clone() else {
return Err(ParseError::InvalidStoredBlockInfo(
"cannot parse previous enumeration index".to_string(),
));
};

Ok(StoredBlockInfoToken {
previous_l2_block_number,
previous_enumeration_index,
})
}
}
Loading