From c566133efbd6a14a9239bdc76b2de4fc85a4d499 Mon Sep 17 00:00:00 2001 From: Joel Nordell Date: Tue, 10 Dec 2024 10:04:34 -0600 Subject: [PATCH] enhancement: Add --backfill-from option to force crawling from given height for chain & transactions --- chain/src/config.rs | 6 ++++++ chain/src/main.rs | 36 ++++++++++++++++++++++++++---------- transactions/src/config.rs | 6 ++++++ transactions/src/main.rs | 31 +++++++++++++++++++++---------- 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/chain/src/config.rs b/chain/src/config.rs index c6bcb878a..a5d8c072c 100644 --- a/chain/src/config.rs +++ b/chain/src/config.rs @@ -34,6 +34,12 @@ pub struct AppConfig { #[clap(long, env, default_value = "5")] pub initial_query_retry_attempts: usize, + #[clap( + long, + help = "Crawl from given height and do not update crawler_state" + )] + pub backfill_from: Option, + #[clap(flatten)] pub log: LogConfig, } diff --git a/chain/src/main.rs b/chain/src/main.rs index 07d4617c1..d21a639c5 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -68,11 +68,22 @@ async fn main() -> Result<(), MainError> { rlimit::increase_nofile_limit(u64::MAX).unwrap(); // See if we can start from existing crawler_state - let crawler_state = match db_service::try_get_chain_crawler_state(&conn) - .await - .into_db_error()? - { - Some(crawler_state) => { + let crawler_state = match ( + config.backfill_from, + db_service::try_get_chain_crawler_state(&conn) + .await + .into_db_error()?, + ) { + (Some(height), _) => { + tracing::warn!("Backfilling from block height {}", height); + Some(ChainCrawlerState { + last_processed_block: height, + last_processed_epoch: 0, + first_block_in_epoch: 0, + timestamp: 0, + }) + } + (None, Some(crawler_state)) => { tracing::info!( "Found chain crawler state, attempting initial crawl at block \ {}...", @@ -85,6 +96,7 @@ async fn main() -> Result<(), MainError> { client.clone(), conn.clone(), checksums.clone(), + true, ) .await; @@ -127,7 +139,7 @@ async fn main() -> Result<(), MainError> { } } } - None => { + (None, None) => { tracing::info!( "No chain crawler state found, starting from initial_query..." ); @@ -161,6 +173,7 @@ async fn main() -> Result<(), MainError> { client.clone(), conn.clone(), checksums.clone(), + config.backfill_from.is_none(), ) }, crawler_state.last_processed_block, @@ -174,6 +187,7 @@ async fn crawling_fn( client: Arc, conn: Arc, checksums: Checksums, + should_update_crawler_state: bool, ) -> Result<(), MainError> { let should_process = can_process(block_height, client.clone()).await?; @@ -456,10 +470,12 @@ async fn crawling_fn( revealed_pks, )?; - repository::crawler_state::upsert_crawler_state( - transaction_conn, - crawler_state, - )?; + if should_update_crawler_state { + repository::crawler_state::upsert_crawler_state( + transaction_conn, + crawler_state, + )?; + } anyhow::Ok(()) }) diff --git a/transactions/src/config.rs b/transactions/src/config.rs index 16b32ac28..05209b4a8 100644 --- a/transactions/src/config.rs +++ b/transactions/src/config.rs @@ -23,6 +23,12 @@ pub struct AppConfig { #[clap(long, env, default_value_t = 1)] pub from_block_height: u32, + #[clap( + long, + help = "Crawl from given height and do not update crawler_state" + )] + pub backfill_from: Option, + #[clap(long, env)] pub database_url: String, diff --git a/transactions/src/main.rs b/transactions/src/main.rs index adb125816..15e55d906 100644 --- a/transactions/src/main.rs +++ b/transactions/src/main.rs @@ -54,12 +54,18 @@ async fn main() -> Result<(), MainError> { let crawler_state = db_service::get_crawler_state(&conn).await; - let next_block = std::cmp::max( - crawler_state - .map(|cs| cs.last_processed_block + 1) - .unwrap_or(1), - config.from_block_height, - ); + let next_block = match config.backfill_from { + Some(height) => { + tracing::warn!("Backfilling from block height {}", height); + height + } + None => std::cmp::max( + crawler_state + .map(|cs| cs.last_processed_block + 1) + .unwrap_or(1), + config.from_block_height, + ), + }; crawl( move |block_height| { @@ -68,6 +74,7 @@ async fn main() -> Result<(), MainError> { client.clone(), conn.clone(), checksums.clone(), + config.backfill_from.is_none(), ) }, next_block, @@ -81,6 +88,7 @@ async fn crawling_fn( client: Arc, conn: Arc, checksums: Checksums, + should_update_crawler_state: bool, ) -> Result<(), MainError> { let should_process = can_process(block_height, client.clone()).await?; @@ -190,10 +198,13 @@ async fn crawling_fn( transaction_conn, inner_txs, )?; - transaction_repo::insert_crawler_state( - transaction_conn, - crawler_state, - )?; + + if should_update_crawler_state { + transaction_repo::insert_crawler_state( + transaction_conn, + crawler_state, + )?; + } transaction_repo::insert_ibc_sequence( transaction_conn,