diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 9676582..965117a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -303,6 +303,26 @@ impl Database { Ok(block_number.map(|(n,)| n as u64)) } + pub async fn get_latest_block_number( + &self, + chain_id: u64, + ) -> eyre::Result> { + let block_number: Option<(i64,)> = sqlx::query_as( + r#" + SELECT block_number + FROM blocks + WHERE chain_id = $1 + ORDER BY block_number DESC + LIMIT 1 + "#, + ) + .bind(chain_id as i64) + .fetch_optional(&self.pool) + .await?; + + Ok(block_number.map(|(n,)| n as u64)) + } + pub async fn get_latest_block_fees_by_chain_id( &self, chain_id: u64, @@ -1069,9 +1089,17 @@ mod tests { let url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); - let db = Database::new(&DatabaseConfig::connection_string(url)).await?; + for _ in 0..5 { + match Database::new(&DatabaseConfig::connection_string(&url)).await + { + Ok(db) => return Ok((db, db_container)), + Err(_) => { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } - Ok((db, db_container)) + Err(eyre::eyre!("Failed to connect to the database")) } async fn full_update( @@ -1386,7 +1414,7 @@ mod tests { async fn blocks() -> eyre::Result<()> { let (db, _db_container) = setup_db().await?; - let block_number = 1; + let block_numbers = [0, 1]; let chain_id = 1; let timestamp = ymd_hms(2023, 11, 23, 12, 32, 2); let txs = &[ @@ -1395,7 +1423,10 @@ mod tests { H256::from_low_u64_be(3), ]; - db.save_block(block_number, chain_id, timestamp, txs) + db.save_block(block_numbers[0], chain_id, timestamp, txs) + .await?; + + db.save_block(block_numbers[1], chain_id, timestamp, txs) .await?; let fee_estimates = FeesEstimate { @@ -1405,13 +1436,22 @@ mod tests { let gas_price = U256::from(1_000_000_007); - db.save_block_fees(block_number, chain_id, &fee_estimates, gas_price) - .await?; + db.save_block_fees( + block_numbers[1], + chain_id, + &fee_estimates, + gas_price, + ) + .await?; + let latest_block_number = + db.get_latest_block_number(chain_id) + .await? + .context("Could not get latest block number")?; let block_fees = db.get_latest_block_fees_by_chain_id(chain_id).await?; - let block_fees = block_fees.context("Missing fees")?; + assert_eq!(latest_block_number, block_numbers[1]); assert_eq!( block_fees.fee_estimates.base_fee_per_gas, fee_estimates.base_fee_per_gas diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 0483938..eca129f 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -3,7 +3,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use ethers::providers::{Http, Middleware, Provider}; -use ethers::types::BlockNumber; +use ethers::types::{Block, BlockNumber, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -24,55 +24,108 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { let ws_rpc = app.ws_provider(chain_id).await?; let rpc = app.http_provider(chain_id).await?; + // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream let mut blocks_stream = ws_rpc.subscribe_blocks().await?; + // Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head + if let Some(latest_block) = blocks_stream.next().await { + backfill_to_block(app.clone(), chain_id, &rpc, latest_block) + .await?; + } + + // Index incoming blocks from the stream while let Some(block) = blocks_stream.next().await { - let block_number = - block.number.context("Missing block number")?.as_u64(); + index_block(app.clone(), chain_id, &rpc, block).await?; + } + } +} + +pub async fn index_block( + app: Arc, + chain_id: u64, + rpc: &Provider, + block: Block, +) -> eyre::Result<()> { + let block_number = block.number.context("Missing block number")?.as_u64(); - tracing::info!(block_number, "Indexing block"); + tracing::info!(block_number, "Indexing block"); - let block_timestamp_seconds = block.timestamp.as_u64(); - let block_timestamp = DateTime::::from_timestamp( - block_timestamp_seconds as i64, - 0, - ) + let block_timestamp_seconds = block.timestamp.as_u64(); + let block_timestamp = + DateTime::::from_timestamp(block_timestamp_seconds as i64, 0) .context("Invalid timestamp")?; - let block = rpc - .get_block(block_number) - .await? - .context("Missing block")?; + let block = rpc + .get_block(block_number) + .await? + .context("Missing block")?; + + app.db + .save_block( + block.number.unwrap().as_u64(), + chain_id, + block_timestamp, + &block.transactions, + ) + .await?; - app.db - .save_block( - block.number.unwrap().as_u64(), - chain_id, - block_timestamp, - &block.transactions, - ) - .await?; + let mined_txs = app.db.mine_txs(chain_id).await?; - let mined_txs = app.db.mine_txs(chain_id).await?; + let metric_labels: [(&str, String); 1] = + [("chain_id", chain_id.to_string())]; + for tx in mined_txs { + tracing::info!( + id = tx.0, + hash = ?tx.1, + "Tx mined" + ); - let metric_labels = [("chain_id", chain_id.to_string())]; - for tx in mined_txs { - tracing::info!( - id = tx.0, - hash = ?tx.1, - "Tx mined" - ); + metrics::increment_counter!("tx_mined", &metric_labels); + } - metrics::increment_counter!("tx_mined", &metric_labels); - } + let relayer_addresses = app.db.get_relayer_addresses(chain_id).await?; - let relayer_addresses = - app.db.get_relayer_addresses(chain_id).await?; + update_relayer_nonces(relayer_addresses, &app, rpc, chain_id).await?; + Ok(()) +} - update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id) - .await?; +pub async fn backfill_to_block( + app: Arc, + chain_id: u64, + rpc: &Provider, + latest_block: Block, +) -> eyre::Result<()> { + // Get the latest block from the db + if let Some(latest_db_block_number) = + app.db.get_latest_block_number(chain_id).await? + { + let next_block_number: u64 = latest_db_block_number + 1; + + // Get the first block from the stream and backfill any missing blocks + let latest_block_number = latest_block + .number + .context("Missing block number")? + .as_u64(); + + if latest_block_number > next_block_number { + // Backfill blocks between the last synced block and the chain head, non inclusive + for block_number in next_block_number..latest_block_number { + let block = rpc + .get_block::(block_number.into()) + .await? + .context(format!( + "Could not get block at height {}", + block_number + ))?; + + index_block(app.clone(), chain_id, rpc, block).await?; + } } - } + + // Index the latest block after backfilling + index_block(app.clone(), chain_id, rpc, latest_block).await?; + }; + Ok(()) } pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> {