Skip to content

Commit 481fbb5

Browse files
MoonBoi9001claude
andcommitted
feat(index-node): add blockForPoi query for dispute investigation
Adds a new index-node GraphQL query that searches a block range to find which block produced a given POI. This supports dispute investigation where an indexer may have submitted a POI for block N that actually corresponds to block X < N. The resolver fetches all poi2$ digest entries in a single DB call, batch fetches block hashes in 50k chunks, and runs ProofOfIndexingFinisher for each block until a match is found. Also adds network_for_deployment to the StatusStore trait and its implementation chain (SubgraphStore, Store) as a supporting method for resolving the chain store from a deployment hash. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3056290 commit 481fbb5

File tree

6 files changed

+345
-3
lines changed

6 files changed

+345
-3
lines changed

graph/src/components/store/traits.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr};
1010
use crate::components::metrics::stopwatch::StopwatchMetrics;
1111
use crate::components::network_provider::ChainName;
1212
use crate::components::server::index_node::VersionInfo;
13+
use crate::components::subgraph::ProofOfIndexingVersion;
1314
use crate::components::subgraph::SubgraphVersionSwitchingMode;
1415
use crate::components::transaction_receipt;
1516
use crate::components::versions::ApiVersion;
@@ -741,6 +742,28 @@ pub trait QueryStore: Send + Sync {
741742
fn deployment_id(&self) -> DeploymentId;
742743
}
743744

745+
/// A single POI digest entry from the `poi2$` table, representing the
746+
/// accumulated digest for a causality region over a block range.
747+
#[derive(Clone, Debug)]
748+
pub struct PoiDigestEntry {
749+
/// The causality region identifier (the entity id in poi2$)
750+
pub id: Id,
751+
/// The accumulated digest bytes
752+
pub digest: Vec<u8>,
753+
/// Start of the block range (inclusive)
754+
pub start_block: BlockNumber,
755+
/// End of the block range (exclusive, i32::MAX if open-ended)
756+
pub end_block: BlockNumber,
757+
}
758+
759+
/// The full POI digest history for a deployment, containing all digest
760+
/// entries and the POI version needed to compute proofs.
761+
#[derive(Clone, Debug)]
762+
pub struct PoiDigestHistory {
763+
pub entries: Vec<PoiDigestEntry>,
764+
pub poi_version: ProofOfIndexingVersion,
765+
}
766+
744767
/// A view of the store that can provide information about the indexing status
745768
/// of any subgraph and any deployment
746769
#[async_trait]
@@ -790,6 +813,19 @@ pub trait StatusStore: Send + Sync + 'static {
790813
block_number: BlockNumber,
791814
fetch_block_ptr: &dyn BlockPtrForNumber,
792815
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
816+
817+
/// Retrieve the full POI digest history for a deployment within a block
818+
/// range. Returns all `poi2$` entries whose block ranges overlap the
819+
/// given range, along with the deployment's `ProofOfIndexingVersion`.
820+
/// Returns `None` if the deployment doesn't exist or has no POI data.
821+
async fn get_poi_digest_history(
822+
&self,
823+
subgraph_id: &DeploymentHash,
824+
block_range: std::ops::Range<BlockNumber>,
825+
) -> Result<Option<PoiDigestHistory>, StoreError>;
826+
827+
/// Get the network for a deployment
828+
async fn network_for_deployment(&self, id: &DeploymentHash) -> Result<String, StoreError>;
793829
}
794830

795831
#[async_trait]

server/index-node/src/resolver.rs

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use std::collections::BTreeMap;
1+
use std::collections::{BTreeMap, HashMap};
22

33
use async_trait::async_trait;
4+
use graph::components::subgraph::ProofOfIndexingFinisher;
45
use graph::data::query::Trace;
56
use graph::data::store::Id;
67
use graph::prelude::alloy::primitives::Address;
@@ -454,6 +455,183 @@ where
454455
Ok(r::Value::List(public_poi_results))
455456
}
456457

458+
async fn resolve_block_for_poi(
459+
&self,
460+
field: &a::Field,
461+
) -> Result<r::Value, QueryExecutionError> {
462+
const MAX_BLOCK_RANGE: i32 = 1_000_000;
463+
const BLOCK_HASH_CHUNK_SIZE: usize = 50_000;
464+
465+
let deployment_id = field
466+
.get_required::<DeploymentHash>("subgraph")
467+
.expect("Valid subgraph required");
468+
let target_poi_hash = field
469+
.get_required::<BlockHash>("targetPoi")
470+
.expect("Valid targetPoi required");
471+
let start_block = field
472+
.get_required::<BlockNumber>("startBlock")
473+
.expect("Valid startBlock required");
474+
let end_block = field
475+
.get_required::<BlockNumber>("endBlock")
476+
.expect("Valid endBlock required");
477+
478+
if end_block <= start_block {
479+
return Ok(r::Value::Null);
480+
}
481+
482+
if end_block - start_block > MAX_BLOCK_RANGE {
483+
return Err(QueryExecutionError::TooExpensive);
484+
}
485+
486+
let target_bytes: [u8; 32] = match target_poi_hash.as_slice().try_into() {
487+
Ok(bytes) => bytes,
488+
Err(_) => {
489+
error!(
490+
self.logger,
491+
"Invalid targetPoi: expected 32 bytes";
492+
"got_bytes" => target_poi_hash.as_slice().len()
493+
);
494+
return Ok(r::Value::Null);
495+
}
496+
};
497+
498+
// Resolve the network for this deployment
499+
let network = match self.store.network_for_deployment(&deployment_id).await {
500+
Ok(n) => n,
501+
Err(e) => {
502+
error!(
503+
self.logger,
504+
"Failed to resolve network for deployment";
505+
"subgraph" => &deployment_id,
506+
"error" => format!("{:?}", e)
507+
);
508+
return Ok(r::Value::Null);
509+
}
510+
};
511+
512+
// Fetch the full digest history for the block range
513+
let history = match self
514+
.store
515+
.get_poi_digest_history(&deployment_id, start_block..end_block)
516+
.await
517+
{
518+
Ok(Some(h)) => h,
519+
Ok(None) => return Ok(r::Value::Null),
520+
Err(e) => {
521+
error!(
522+
self.logger,
523+
"Failed to fetch POI digest history";
524+
"subgraph" => &deployment_id,
525+
"error" => format!("{:?}", e)
526+
);
527+
return Ok(r::Value::Null);
528+
}
529+
};
530+
531+
let poi_version = history.poi_version;
532+
533+
// Build a lookup structure: for each causality region id, a sorted
534+
// vec of (start_block, end_block, digest) for binary search.
535+
let mut region_entries: HashMap<Id, Vec<(BlockNumber, BlockNumber, Vec<u8>)>> =
536+
HashMap::new();
537+
for entry in history.entries {
538+
region_entries.entry(entry.id).or_default().push((
539+
entry.start_block,
540+
entry.end_block,
541+
entry.digest,
542+
));
543+
}
544+
// Entries are already sorted by (id, start_block) from the SQL query,
545+
// but sort each region's vec to be safe.
546+
for entries in region_entries.values_mut() {
547+
entries.sort_by_key(|(start, _, _)| *start);
548+
}
549+
550+
let chain_store = match self.store.block_store().chain_store(&network).await {
551+
Some(cs) => cs,
552+
None => {
553+
error!(
554+
self.logger,
555+
"Chain store not found for network";
556+
"network" => &network
557+
);
558+
return Ok(r::Value::Null);
559+
}
560+
};
561+
562+
// Process blocks in chunks to avoid excessive memory usage
563+
let mut current = start_block;
564+
while current < end_block {
565+
let chunk_end = std::cmp::min(current + BLOCK_HASH_CHUNK_SIZE as i32, end_block);
566+
567+
// Batch-fetch block hashes for this chunk using block_ptrs_by_numbers
568+
let block_numbers: Vec<BlockNumber> = (current..chunk_end).collect();
569+
let block_ptrs = match chain_store
570+
.cheap_clone()
571+
.block_ptrs_by_numbers(block_numbers)
572+
.await
573+
{
574+
Ok(ptrs) => ptrs,
575+
Err(e) => {
576+
error!(
577+
self.logger,
578+
"Failed to fetch block hashes";
579+
"network" => &network,
580+
"range" => format!("{}..{}", current, chunk_end),
581+
"error" => format!("{:?}", e)
582+
);
583+
return Ok(r::Value::Null);
584+
}
585+
};
586+
587+
for block_num in current..chunk_end {
588+
let ptrs = match block_ptrs.get(&block_num) {
589+
Some(p) if p.len() == 1 => p,
590+
_ => continue, // Skip blocks with no hash or ambiguous hashes
591+
};
592+
let block_hash = &ptrs[0].hash;
593+
let block_ptr = BlockPtr::new(block_hash.clone(), block_num);
594+
595+
// Compute POI for this block using digest entries
596+
let mut finisher = ProofOfIndexingFinisher::new(
597+
&block_ptr,
598+
&deployment_id,
599+
&Some(Address::ZERO),
600+
poi_version,
601+
);
602+
603+
for (region_id, entries) in &region_entries {
604+
// Binary search for the entry that covers this block number
605+
let idx = entries.partition_point(|(start, _, _)| *start <= block_num);
606+
if idx == 0 {
607+
continue;
608+
}
609+
let (start, end, ref digest) = entries[idx - 1];
610+
if block_num >= start && block_num < end {
611+
finisher.add_causality_region(region_id, digest);
612+
}
613+
}
614+
615+
let computed_poi = finisher.finish();
616+
if computed_poi == target_bytes {
617+
return Ok(object! {
618+
__typename: "PoiSearchResult",
619+
deployment: deployment_id.to_string(),
620+
block: object! {
621+
hash: block_hash.hash_hex(),
622+
number: block_num,
623+
},
624+
proofOfIndexing: format!("0x{}", hex::encode(computed_poi)),
625+
});
626+
}
627+
}
628+
629+
current = chunk_end;
630+
}
631+
632+
Ok(r::Value::Null)
633+
}
634+
457635
async fn resolve_indexing_status_for_version(
458636
&self,
459637
field: &a::Field,
@@ -858,6 +1036,7 @@ where
8581036
// The top-level `subgraphVersions` field
8591037
(None, "apiVersions") => self.resolve_api_versions(field),
8601038
(None, "version") => self.version(),
1039+
(None, "blockForPoi") => self.resolve_block_for_poi(field).await,
8611040

8621041
// Resolve fields of `Object` values (e.g. the `latestBlock` field of `EthereumBlock`)
8631042
(value, _) => Ok(value.unwrap_or(r::Value::Null)),

server/index-node/src/schema.graphql

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ type Query {
4646
blockHash: Bytes!
4747
): [CachedEthereumCall!]
4848
apiVersions(subgraphId: String!): [ApiVersion!]!
49+
"""
50+
Find the block number that produced a given proof of indexing.
51+
Used for dispute investigation to verify which block an indexer
52+
actually synced to when they submitted a POI.
53+
"""
54+
blockForPoi(
55+
subgraph: String!
56+
targetPoi: Bytes!
57+
startBlock: Int!
58+
endBlock: Int!
59+
): PoiSearchResult
4960
}
5061

5162
type Version {
@@ -203,6 +214,12 @@ type ProofOfIndexingResult {
203214
proofOfIndexing: Bytes
204215
}
205216

217+
type PoiSearchResult {
218+
block: Block!
219+
deployment: String!
220+
proofOfIndexing: Bytes!
221+
}
222+
206223
type ApiVersion {
207224
"""
208225
Version number in SemVer format

store/postgres/src/deployment_store.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ use graph::anyhow::Context;
88
use graph::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor};
99
use graph::blockchain::BlockTime;
1010
use graph::components::store::write::RowGroup;
11+
use graph::components::store::PoiDigestHistory;
1112
use graph::components::store::{
1213
Batch, DeploymentLocator, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest,
1314
PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats,
1415
};
1516
use graph::components::versions::VERSIONS;
1617
use graph::data::graphql::IntoValue;
1718
use graph::data::query::Trace;
18-
use graph::data::store::{IdList, SqlQueryObject};
19+
use graph::data::store::{Id, IdList, SqlQueryObject};
1920
use graph::data::subgraph::{status, SPEC_VERSION_0_0_6};
2021
use graph::data_source::CausalityRegion;
2122
use graph::derive::CheapClone;
@@ -1032,6 +1033,69 @@ impl DeploymentStore {
10321033
Ok(Some(finisher.finish()))
10331034
}
10341035

1036+
/// Retrieve all POI digest entries from the `poi2$` table whose block
1037+
/// ranges overlap the given `block_range`, along with the deployment's
1038+
/// POI version. Used by the `blockForPoi` resolver to reconstruct POIs
1039+
/// without per-block entity queries.
1040+
pub(crate) async fn get_poi_digest_history(
1041+
&self,
1042+
site: Arc<Site>,
1043+
block_range: Range<BlockNumber>,
1044+
) -> Result<Option<PoiDigestHistory>, StoreError> {
1045+
use diesel::sql_types::{Binary, Integer, Text};
1046+
use graph::components::store::PoiDigestEntry;
1047+
1048+
let info = self.subgraph_info(site.cheap_clone()).await?;
1049+
1050+
#[derive(QueryableByName)]
1051+
struct DigestRow {
1052+
#[diesel(sql_type = Text)]
1053+
id: String,
1054+
#[diesel(sql_type = Binary)]
1055+
digest: Vec<u8>,
1056+
#[diesel(sql_type = Integer)]
1057+
start_block: i32,
1058+
#[diesel(sql_type = Integer)]
1059+
end_block: i32,
1060+
}
1061+
1062+
let query = format!(
1063+
r#"SELECT id, digest, lower(block_range) as start_block,
1064+
coalesce(upper(block_range), 2147483647) as end_block
1065+
FROM "{}"."poi2$"
1066+
WHERE block_range && int4range($1, $2)
1067+
ORDER BY id, lower(block_range)"#,
1068+
site.namespace,
1069+
);
1070+
1071+
let mut conn = self.pool.get_permitted().await?;
1072+
let rows = diesel::sql_query(query)
1073+
.bind::<Integer, _>(block_range.start)
1074+
.bind::<Integer, _>(block_range.end)
1075+
.load::<DigestRow>(&mut conn)
1076+
.await
1077+
.map_err(StoreError::from)?;
1078+
1079+
if rows.is_empty() {
1080+
return Ok(None);
1081+
}
1082+
1083+
let entries = rows
1084+
.into_iter()
1085+
.map(|row| PoiDigestEntry {
1086+
id: Id::String(row.id.into()),
1087+
digest: row.digest,
1088+
start_block: row.start_block,
1089+
end_block: row.end_block,
1090+
})
1091+
.collect();
1092+
1093+
Ok(Some(PoiDigestHistory {
1094+
entries,
1095+
poi_version: info.poi_version,
1096+
}))
1097+
}
1098+
10351099
/// Get the entity matching `key` from the deployment `site`. Only
10361100
/// consider entities as of the given `block`
10371101
pub(crate) async fn get(

store/postgres/src/store.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,20 @@ impl StatusStore for Store {
171171
.await
172172
}
173173

174+
async fn get_poi_digest_history(
175+
&self,
176+
subgraph_id: &DeploymentHash,
177+
block_range: std::ops::Range<BlockNumber>,
178+
) -> Result<Option<graph::components::store::PoiDigestHistory>, StoreError> {
179+
self.subgraph_store
180+
.get_poi_digest_history(subgraph_id, block_range)
181+
.await
182+
}
183+
184+
async fn network_for_deployment(&self, id: &DeploymentHash) -> Result<String, StoreError> {
185+
self.subgraph_store.network_for_deployment(id).await
186+
}
187+
174188
async fn query_permit(&self) -> QueryPermit {
175189
// Status queries go to the primary shard.
176190
self.block_store.query_permit_primary().await

0 commit comments

Comments
 (0)