Skip to content

Commit 8883a95

Browse files
committed
node, store: Add 'graphman chain ingest' command
1 parent 17360f5 commit 8883a95

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

node/src/bin/manager.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use graph::components::network_provider::ChainName;
88
use graph::endpoint::EndpointMetrics;
99
use graph::env::ENV_VARS;
1010
use graph::log::logger_with_levels;
11-
use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX};
11+
use graph::prelude::{BlockNumber, MetricsRegistry, BLOCK_NUMBER_MAX};
1212
use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry};
1313
use graph::{
1414
prelude::{
@@ -587,6 +587,19 @@ pub enum ChainCommand {
587587
#[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())]
588588
chain_name: String,
589589
},
590+
591+
/// Ingest a block into the block cache.
592+
///
593+
/// This will overwrite any blocks we may already have in the block
594+
/// cache, and can therefore be used to get rid of duplicate blocks in
595+
/// the block cache as well as making sure that a certain block is in
596+
/// the cache
597+
Ingest {
598+
/// The name of the chain
599+
name: String,
600+
/// The block number to ingest
601+
number: BlockNumber,
602+
},
590603
}
591604

592605
#[derive(Clone, Debug, Subcommand)]
@@ -1452,6 +1465,12 @@ async fn main() -> anyhow::Result<()> {
14521465
}
14531466
}
14541467
}
1468+
Ingest { name, number } => {
1469+
let logger = ctx.logger.cheap_clone();
1470+
let (chain_store, ethereum_adapter) =
1471+
ctx.chain_store_and_adapter(&name).await?;
1472+
commands::chain::ingest(&logger, chain_store, ethereum_adapter, number).await
1473+
}
14551474
}
14561475
}
14571476
Stats(cmd) => {

node/src/manager/commands/chain.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,16 @@ use graph::cheap_clone::CheapClone;
1010
use graph::components::network_provider::ChainIdentifierStore;
1111
use graph::components::network_provider::ChainName;
1212
use graph::components::store::StoreError;
13+
use graph::futures03::compat::Future01CompatExt as _;
1314
use graph::prelude::BlockNumber;
1415
use graph::prelude::ChainStore as _;
16+
use graph::prelude::LightEthereumBlockExt;
1517
use graph::prelude::{anyhow, anyhow::bail};
1618
use graph::slog::Logger;
1719
use graph::{components::store::BlockStore as _, prelude::anyhow::Error};
20+
use graph_chain_ethereum::chain::BlockFinality;
21+
use graph_chain_ethereum::EthereumAdapter;
22+
use graph_chain_ethereum::EthereumAdapterTrait as _;
1823
use graph_store_postgres::add_chain;
1924
use graph_store_postgres::connection_pool::PoolCoordinator;
2025
use graph_store_postgres::find_chain;
@@ -261,3 +266,31 @@ pub fn change_block_cache_shard(
261266

262267
Ok(())
263268
}
269+
270+
pub async fn ingest(
271+
logger: &Logger,
272+
chain_store: Arc<ChainStore>,
273+
ethereum_adapter: Arc<EthereumAdapter>,
274+
number: BlockNumber,
275+
) -> Result<(), Error> {
276+
let Some(block) = ethereum_adapter
277+
.block_by_number(logger, number)
278+
.compat()
279+
.await
280+
.map_err(|e| anyhow!("error getting block number {number}: {}", e))?
281+
else {
282+
bail!("block number {number} not found");
283+
};
284+
let ptr = block.block_ptr();
285+
// For inserting the block, it doesn't matter whether the block is final or not.
286+
let block = Arc::new(BlockFinality::Final(Arc::new(block)));
287+
chain_store.upsert_block(block).await?;
288+
289+
let rows = chain_store.confirm_block_hash(ptr.number, &ptr.hash)?;
290+
291+
println!("Inserted block {}", ptr);
292+
if rows > 0 {
293+
println!(" (also deleted {rows} duplicate row(s) with that number)");
294+
}
295+
Ok(())
296+
}

0 commit comments

Comments
 (0)