Skip to content

Commit 14853a5

Browse files
committed
graph, store: Support multiple subgraph datasources
1 parent 56f041a commit 14853a5

File tree

2 files changed

+149
-52
lines changed

2 files changed

+149
-52
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 77 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -339,30 +339,82 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
339339
pub async fn blocks_with_subgraph_triggers(
340340
&self,
341341
logger: &Logger,
342-
subgraph_filter: &SubgraphFilter,
342+
filters: &[SubgraphFilter],
343343
range: SubgraphTriggerScanRange<C>,
344344
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
345-
let store = self
346-
.source_subgraph_stores
347-
.get(&subgraph_filter.subgraph)
348-
.ok_or_else(|| anyhow!("Store not found for subgraph: {}", subgraph_filter.subgraph))?;
345+
if filters.is_empty() {
346+
return Err(anyhow!("No subgraph filters provided"));
347+
}
349348

350-
let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);
349+
let (blocks, hash_to_entities) = match range {
350+
SubgraphTriggerScanRange::Single(block) => {
351+
let hash_to_entities = self
352+
.fetch_entities_for_filters(filters, block.number(), block.number())
353+
.await?;
351354

352-
let adapter = self.adapter.clone();
355+
(vec![block], hash_to_entities)
356+
}
357+
SubgraphTriggerScanRange::Range(from, to) => {
358+
let hash_to_entities = self.fetch_entities_for_filters(filters, from, to).await?;
359+
360+
let block_numbers: HashSet<BlockNumber> = hash_to_entities
361+
.iter()
362+
.flat_map(|(_, entities)| entities.keys().copied())
363+
.chain(std::iter::once(to))
364+
.collect();
365+
366+
let blocks = self
367+
.adapter
368+
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
369+
.await?;
353370

354-
scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
371+
(blocks, hash_to_entities)
372+
}
373+
};
374+
375+
create_subgraph_triggers::<C>(logger.clone(), blocks, hash_to_entities).await
376+
}
377+
378+
async fn fetch_entities_for_filters(
379+
&self,
380+
filters: &[SubgraphFilter],
381+
from: BlockNumber,
382+
to: BlockNumber,
383+
) -> Result<Vec<(DeploymentHash, BTreeMap<BlockNumber, Vec<EntityWithType>>)>, Error> {
384+
let futures = filters
385+
.iter()
386+
.filter_map(|filter| {
387+
self.source_subgraph_stores
388+
.get(&filter.subgraph)
389+
.map(|store| {
390+
let store = store.clone();
391+
let schema = store.input_schema();
392+
393+
async move {
394+
let entities =
395+
get_entities_for_range(&store, filter, &schema, from, to).await?;
396+
Ok::<_, Error>((filter.subgraph.clone(), entities))
397+
}
398+
})
399+
})
400+
.collect::<Vec<_>>();
401+
402+
if futures.is_empty() {
403+
return Ok(Vec::new());
404+
}
405+
406+
futures03::future::try_join_all(futures).await
355407
}
356408
}
357409

358410
fn create_subgraph_trigger_from_entities(
359-
filter: &SubgraphFilter,
411+
subgraph: &DeploymentHash,
360412
entities: Vec<EntityWithType>,
361413
) -> Vec<subgraph::TriggerData> {
362414
entities
363415
.into_iter()
364416
.map(|entity| subgraph::TriggerData {
365-
source: filter.subgraph.clone(),
417+
source: subgraph.clone(),
366418
entity,
367419
})
368420
.collect()
@@ -371,21 +423,24 @@ fn create_subgraph_trigger_from_entities(
371423
async fn create_subgraph_triggers<C: Blockchain>(
372424
logger: Logger,
373425
blocks: Vec<C::Block>,
374-
filter: &SubgraphFilter,
375-
mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
426+
subgraph_data: Vec<(DeploymentHash, BTreeMap<BlockNumber, Vec<EntityWithType>>)>,
376427
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
377428
let logger_clone = logger.cheap_clone();
378-
379429
let blocks: Vec<BlockWithTriggers<C>> = blocks
380430
.into_iter()
381431
.map(|block| {
382432
let block_number = block.number();
383-
let trigger_data = entities
384-
.remove(&block_number)
385-
.map(|e| create_subgraph_trigger_from_entities(filter, e))
386-
.unwrap_or_else(Vec::new);
433+
let mut all_trigger_data = Vec::new();
434+
435+
for (hash, entities) in subgraph_data.iter() {
436+
if let Some(block_entities) = entities.get(&block_number) {
437+
let trigger_data =
438+
create_subgraph_trigger_from_entities(hash, block_entities.clone());
439+
all_trigger_data.extend(trigger_data);
440+
}
441+
}
387442

388-
BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger_clone)
443+
BlockWithTriggers::new_with_subgraph_triggers(block, all_trigger_data, &logger_clone)
389444
})
390445
.collect();
391446

@@ -397,36 +452,6 @@ pub enum SubgraphTriggerScanRange<C: Blockchain> {
397452
Range(BlockNumber, BlockNumber),
398453
}
399454

400-
async fn scan_subgraph_triggers<C: Blockchain>(
401-
logger: &Logger,
402-
store: &Arc<dyn SourceableStore>,
403-
adapter: &Arc<dyn TriggersAdapter<C>>,
404-
schema: &InputSchema,
405-
filter: &SubgraphFilter,
406-
range: SubgraphTriggerScanRange<C>,
407-
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
408-
match range {
409-
SubgraphTriggerScanRange::Single(block) => {
410-
let entities =
411-
get_entities_for_range(store, filter, schema, block.number(), block.number())
412-
.await?;
413-
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
414-
}
415-
SubgraphTriggerScanRange::Range(from, to) => {
416-
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
417-
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
418-
// Ensure the 'to' block is included in the block_numbers
419-
block_numbers.insert(to);
420-
421-
let blocks = adapter
422-
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
423-
.await?;
424-
425-
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
426-
}
427-
}
428-
}
429-
430455
#[derive(Debug, Clone, Eq, PartialEq)]
431456
pub enum EntitySubgraphOperation {
432457
Create,
@@ -474,11 +499,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
474499
to: BlockNumber,
475500
filter: &Arc<TriggerFilterWrapper<C>>,
476501
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
477-
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
502+
if !filter.subgraph_filter.is_empty() {
478503
let blocks_with_triggers = self
479504
.blocks_with_subgraph_triggers(
480505
logger,
481-
subgraph_filter,
506+
&filter.subgraph_filter,
482507
SubgraphTriggerScanRange::Range(from, to),
483508
)
484509
.await?;
@@ -504,11 +529,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
504529
"block_hash" => block.hash().hash_hex(),
505530
);
506531

507-
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
532+
if !filter.subgraph_filter.is_empty() {
508533
let blocks_with_triggers = self
509534
.blocks_with_subgraph_triggers(
510535
logger,
511-
subgraph_filter,
536+
&filter.subgraph_filter,
512537
SubgraphTriggerScanRange::Single(block),
513538
)
514539
.await?;

store/test-store/tests/chain/ethereum/manifest.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,78 @@ specVersion: 1.3.0
214214
}
215215
}
216216

217+
#[tokio::test]
218+
async fn multiple_subgraph_ds_manifest() {
219+
let yaml = "
220+
schema:
221+
file:
222+
/: /ipfs/Qmschema
223+
dataSources:
224+
- name: SubgraphSource1
225+
kind: subgraph
226+
entities:
227+
- Gravatar
228+
network: mainnet
229+
source:
230+
address: 'QmSWWT2yrTFDZSL8tRyoHEVrcEKAUsY2hj2TMQDfdDZU8h'
231+
startBlock: 9562480
232+
mapping:
233+
apiVersion: 0.0.6
234+
language: wasm/assemblyscript
235+
entities:
236+
- TestEntity
237+
file:
238+
/: /ipfs/Qmmapping
239+
handlers:
240+
- handler: handleEntity
241+
entity: User
242+
- name: SubgraphSource2
243+
kind: subgraph
244+
entities:
245+
- Profile
246+
network: mainnet
247+
source:
248+
address: 'QmT8B2R7J9yzbZXkqRefmZPkXmE8pCsRKmMj3rGN1Qoe4k'
249+
startBlock: 9562500
250+
mapping:
251+
apiVersion: 0.0.6
252+
language: wasm/assemblyscript
253+
entities:
254+
- TestEntity2
255+
file:
256+
/: /ipfs/Qmmapping
257+
handlers:
258+
- handler: handleProfile
259+
entity: Profile
260+
specVersion: 1.3.0
261+
";
262+
263+
let manifest = resolve_manifest(yaml, SPEC_VERSION_1_3_0).await;
264+
265+
assert_eq!("Qmmanifest", manifest.id.as_str());
266+
assert_eq!(manifest.data_sources.len(), 2);
267+
268+
// Validate first data source
269+
match &manifest.data_sources[0] {
270+
DataSourceEnum::Subgraph(ds) => {
271+
assert_eq!(ds.name, "SubgraphSource1");
272+
assert_eq!(ds.kind, "subgraph");
273+
assert_eq!(ds.source.start_block, 9562480);
274+
}
275+
_ => panic!("Expected a subgraph data source"),
276+
}
277+
278+
// Validate second data source
279+
match &manifest.data_sources[1] {
280+
DataSourceEnum::Subgraph(ds) => {
281+
assert_eq!(ds.name, "SubgraphSource2");
282+
assert_eq!(ds.kind, "subgraph");
283+
assert_eq!(ds.source.start_block, 9562500);
284+
}
285+
_ => panic!("Expected a subgraph data source"),
286+
}
287+
}
288+
217289
#[tokio::test]
218290
async fn graft_manifest() {
219291
const YAML: &str = "

0 commit comments

Comments
 (0)