Skip to content

Commit

Permalink
Merge pull request #5 from worldcoin/0xkitsune/fetch-missing-blocks
Browse files Browse the repository at this point in the history
Feat: Fetch missing blocks
  • Loading branch information
0xKitsune authored Dec 13, 2023
2 parents 41fd8af + 299a856 commit b612dc8
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
54 changes: 47 additions & 7 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,26 @@ impl Database {
Ok(block_number.map(|(n,)| n as u64))
}

pub async fn get_latest_block_number(
&self,
chain_id: u64,
) -> eyre::Result<Option<u64>> {
let block_number: Option<(i64,)> = sqlx::query_as(
r#"
SELECT block_number
FROM blocks
WHERE chain_id = $1
ORDER BY block_number DESC
LIMIT 1
"#,
)
.bind(chain_id as i64)
.fetch_optional(&self.pool)
.await?;

Ok(block_number.map(|(n,)| n as u64))
}

pub async fn get_latest_block_fees_by_chain_id(
&self,
chain_id: u64,
Expand Down Expand Up @@ -1069,9 +1089,17 @@ mod tests {
let url =
format!("postgres://postgres:postgres@{db_socket_addr}/database");

let db = Database::new(&DatabaseConfig::connection_string(url)).await?;
for _ in 0..5 {
match Database::new(&DatabaseConfig::connection_string(&url)).await
{
Ok(db) => return Ok((db, db_container)),
Err(_) => {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

Ok((db, db_container))
Err(eyre::eyre!("Failed to connect to the database"))
}

async fn full_update(
Expand Down Expand Up @@ -1386,7 +1414,7 @@ mod tests {
async fn blocks() -> eyre::Result<()> {
let (db, _db_container) = setup_db().await?;

let block_number = 1;
let block_numbers = [0, 1];
let chain_id = 1;
let timestamp = ymd_hms(2023, 11, 23, 12, 32, 2);
let txs = &[
Expand All @@ -1395,7 +1423,10 @@ mod tests {
H256::from_low_u64_be(3),
];

db.save_block(block_number, chain_id, timestamp, txs)
db.save_block(block_numbers[0], chain_id, timestamp, txs)
.await?;

db.save_block(block_numbers[1], chain_id, timestamp, txs)
.await?;

let fee_estimates = FeesEstimate {
Expand All @@ -1405,13 +1436,22 @@ mod tests {

let gas_price = U256::from(1_000_000_007);

db.save_block_fees(block_number, chain_id, &fee_estimates, gas_price)
.await?;
db.save_block_fees(
block_numbers[1],
chain_id,
&fee_estimates,
gas_price,
)
.await?;

let latest_block_number =
db.get_latest_block_number(chain_id)
.await?
.context("Could not get latest block number")?;
let block_fees = db.get_latest_block_fees_by_chain_id(chain_id).await?;

let block_fees = block_fees.context("Missing fees")?;

assert_eq!(latest_block_number, block_numbers[1]);
assert_eq!(
block_fees.fee_estimates.base_fee_per_gas,
fee_estimates.base_fee_per_gas
Expand Down
125 changes: 89 additions & 36 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use chrono::{DateTime, Utc};
use ethers::providers::{Http, Middleware, Provider};
use ethers::types::BlockNumber;
use ethers::types::{Block, BlockNumber, H256};
use eyre::{Context, ContextCompat};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand All @@ -24,55 +24,108 @@ pub async fn index_chain(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
let ws_rpc = app.ws_provider(chain_id).await?;
let rpc = app.http_provider(chain_id).await?;

// Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream
let mut blocks_stream = ws_rpc.subscribe_blocks().await?;

// Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head
if let Some(latest_block) = blocks_stream.next().await {
backfill_to_block(app.clone(), chain_id, &rpc, latest_block)
.await?;
}

// Index incoming blocks from the stream
while let Some(block) = blocks_stream.next().await {
let block_number =
block.number.context("Missing block number")?.as_u64();
index_block(app.clone(), chain_id, &rpc, block).await?;
}
}
}

pub async fn index_block(
app: Arc<App>,
chain_id: u64,
rpc: &Provider<Http>,
block: Block<H256>,
) -> eyre::Result<()> {
let block_number = block.number.context("Missing block number")?.as_u64();

tracing::info!(block_number, "Indexing block");
tracing::info!(block_number, "Indexing block");

let block_timestamp_seconds = block.timestamp.as_u64();
let block_timestamp = DateTime::<Utc>::from_timestamp(
block_timestamp_seconds as i64,
0,
)
let block_timestamp_seconds = block.timestamp.as_u64();
let block_timestamp =
DateTime::<Utc>::from_timestamp(block_timestamp_seconds as i64, 0)
.context("Invalid timestamp")?;

let block = rpc
.get_block(block_number)
.await?
.context("Missing block")?;
let block = rpc
.get_block(block_number)
.await?
.context("Missing block")?;

app.db
.save_block(
block.number.unwrap().as_u64(),
chain_id,
block_timestamp,
&block.transactions,
)
.await?;

app.db
.save_block(
block.number.unwrap().as_u64(),
chain_id,
block_timestamp,
&block.transactions,
)
.await?;
let mined_txs = app.db.mine_txs(chain_id).await?;

let mined_txs = app.db.mine_txs(chain_id).await?;
let metric_labels: [(&str, String); 1] =
[("chain_id", chain_id.to_string())];
for tx in mined_txs {
tracing::info!(
id = tx.0,
hash = ?tx.1,
"Tx mined"
);

let metric_labels = [("chain_id", chain_id.to_string())];
for tx in mined_txs {
tracing::info!(
id = tx.0,
hash = ?tx.1,
"Tx mined"
);
metrics::increment_counter!("tx_mined", &metric_labels);
}

metrics::increment_counter!("tx_mined", &metric_labels);
}
let relayer_addresses = app.db.get_relayer_addresses(chain_id).await?;

let relayer_addresses =
app.db.get_relayer_addresses(chain_id).await?;
update_relayer_nonces(relayer_addresses, &app, rpc, chain_id).await?;
Ok(())
}

update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id)
.await?;
pub async fn backfill_to_block(
app: Arc<App>,
chain_id: u64,
rpc: &Provider<Http>,
latest_block: Block<H256>,
) -> eyre::Result<()> {
// Get the latest block from the db
if let Some(latest_db_block_number) =
app.db.get_latest_block_number(chain_id).await?
{
let next_block_number: u64 = latest_db_block_number + 1;

// Get the first block from the stream and backfill any missing blocks
let latest_block_number = latest_block
.number
.context("Missing block number")?
.as_u64();

if latest_block_number > next_block_number {
// Backfill blocks between the last synced block and the chain head, non inclusive
for block_number in next_block_number..latest_block_number {
let block = rpc
.get_block::<BlockNumber>(block_number.into())
.await?
.context(format!(
"Could not get block at height {}",
block_number
))?;

index_block(app.clone(), chain_id, rpc, block).await?;
}
}
}

// Index the latest block after backfilling
index_block(app.clone(), chain_id, rpc, latest_block).await?;
};
Ok(())
}

pub async fn estimate_gas(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
Expand Down

0 comments on commit b612dc8

Please sign in to comment.