From 3d369f329191e912629abb037785dfe166e180af Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 20:56:22 -0500 Subject: [PATCH] updated logic to back fill blocks that have been missed --- .gitignore | 1 + src/db.rs | 20 +++++++++ src/tasks/index.rs | 106 +++++++++++++++++++++++++++++---------------- 3 files changed, 89 insertions(+), 38 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 9676582..b724e24 100644 --- a/src/db.rs +++ b/src/db.rs @@ -303,6 +303,26 @@ impl Database { Ok(block_number.map(|(n,)| n as u64)) } + pub async fn get_latest_block_number( + &self, + chain_id: u64, + ) -> eyre::Result { + let (block_number,): (i64,) = sqlx::query_as( + r#" + SELECT block_number + FROM blocks + WHERE chain_id = $1 + ORDER BY block_number DESC + LIMIT 1 + "#, + ) + .bind(chain_id as i64) + .fetch_one(&self.pool) + .await?; + + Ok(block_number as u64) + } + pub async fn get_latest_block_fees_by_chain_id( &self, chain_id: u64, diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 0483938..554bdba 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -3,7 +3,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use ethers::providers::{Http, Middleware, Provider}; -use ethers::types::BlockNumber; +use ethers::types::{Block, BlockNumber, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -26,53 +26,83 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { let mut blocks_stream = ws_rpc.subscribe_blocks().await?; + let next_block_number = + app.db.get_latest_block_number(chain_id).await? + 1; + + if let Some(latest_block) = blocks_stream.next().await { + let latest_block_number = latest_block + .number + .context("Missing block number")? + .as_u64(); + + if latest_block_number > next_block_number { + for block_number in next_block_number..=latest_block_number { + let block = rpc + .get_block::(block_number.into()) + .await? + .context(format!( + "Could not get block at height {}", + block_number + ))?; + + index_block(app.clone(), chain_id, &rpc, block).await?; + } + } + } + while let Some(block) = blocks_stream.next().await { - let block_number = - block.number.context("Missing block number")?.as_u64(); + index_block(app.clone(), chain_id, &rpc, block).await?; + } + } +} - tracing::info!(block_number, "Indexing block"); +pub async fn index_block( + app: Arc, + chain_id: u64, + rpc: &Provider, + block: Block, +) -> eyre::Result<()> { + let block_number = block.number.context("Missing block number")?.as_u64(); - let block_timestamp_seconds = block.timestamp.as_u64(); - let block_timestamp = DateTime::::from_timestamp( - block_timestamp_seconds as i64, - 0, - ) - .context("Invalid timestamp")?; + tracing::info!(block_number, "Indexing block"); - let block = rpc - .get_block(block_number) - .await? - .context("Missing block")?; + let block_timestamp_seconds = block.timestamp.as_u64(); + let block_timestamp = + DateTime::::from_timestamp(block_timestamp_seconds as i64, 0) + .context("Invalid timestamp")?; - app.db - .save_block( - block.number.unwrap().as_u64(), - chain_id, - block_timestamp, - &block.transactions, - ) - .await?; + let block = rpc + .get_block(block_number) + .await? + .context("Missing block")?; + + app.db + .save_block( + block.number.unwrap().as_u64(), + chain_id, + block_timestamp, + &block.transactions, + ) + .await?; - let mined_txs = app.db.mine_txs(chain_id).await?; + let mined_txs = app.db.mine_txs(chain_id).await?; - let metric_labels = [("chain_id", chain_id.to_string())]; - for tx in mined_txs { - tracing::info!( - id = tx.0, - hash = ?tx.1, - "Tx mined" - ); + let metric_labels: [(&str, String); 1] = + [("chain_id", chain_id.to_string())]; + for tx in mined_txs { + tracing::info!( + id = tx.0, + hash = ?tx.1, + "Tx mined" + ); - metrics::increment_counter!("tx_mined", &metric_labels); - } + metrics::increment_counter!("tx_mined", &metric_labels); + } - let relayer_addresses = - app.db.get_relayer_addresses(chain_id).await?; + let relayer_addresses = app.db.get_relayer_addresses(chain_id).await?; - update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id) - .await?; - } - } + update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id).await?; + Ok(()) } pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> {