@@ -339,30 +339,88 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
339339 pub async fn blocks_with_subgraph_triggers (
340340 & self ,
341341 logger : & Logger ,
342- subgraph_filter : & SubgraphFilter ,
342+ filters : & [ SubgraphFilter ] ,
343343 range : SubgraphTriggerScanRange < C > ,
344344 ) -> Result < Vec < BlockWithTriggers < C > > , Error > {
345- let store = self
346- . source_subgraph_stores
347- . get ( & subgraph_filter. subgraph )
348- . ok_or_else ( || anyhow ! ( "Store not found for subgraph: {}" , subgraph_filter. subgraph) ) ?;
345+ if filters. is_empty ( ) {
346+ return Err ( anyhow ! ( "No subgraph filters provided" ) ) ;
347+ }
349348
350- let schema = <dyn crate :: components:: store:: SourceableStore >:: input_schema ( store) ;
349+ let ( blocks, hash_to_entities) = match range {
350+ SubgraphTriggerScanRange :: Single ( block) => {
351+ let hash_to_entities = self
352+ . fetch_entities_for_filters ( filters, block. number ( ) , block. number ( ) )
353+ . await ?;
351354
352- let adapter = self . adapter . clone ( ) ;
355+ ( vec ! [ block] , hash_to_entities)
356+ }
357+ SubgraphTriggerScanRange :: Range ( from, to) => {
358+ let hash_to_entities = self . fetch_entities_for_filters ( filters, from, to) . await ?;
359+
360+ let block_numbers: HashSet < BlockNumber > = hash_to_entities
361+ . iter ( )
362+ . flat_map ( |( _, entities) | entities. keys ( ) . copied ( ) )
363+ . chain ( std:: iter:: once ( to) )
364+ . collect ( ) ;
365+
366+ let blocks = self
367+ . adapter
368+ . load_block_ptrs_by_numbers ( logger. clone ( ) , block_numbers)
369+ . await ?;
353370
354- scan_subgraph_triggers :: < C > ( logger, store, & adapter, & schema, & subgraph_filter, range) . await
371+ ( blocks, hash_to_entities)
372+ }
373+ } ;
374+
375+ create_subgraph_triggers :: < C > ( logger. clone ( ) , blocks, hash_to_entities) . await
376+ }
377+
378+ async fn fetch_entities_for_filters (
379+ & self ,
380+ filters : & [ SubgraphFilter ] ,
381+ from : BlockNumber ,
382+ to : BlockNumber ,
383+ ) -> Result <
384+ Vec < (
385+ DeploymentHash ,
386+ BTreeMap < BlockNumber , Vec < EntitySourceOperation > > ,
387+ ) > ,
388+ Error ,
389+ > {
390+ let futures = filters
391+ . iter ( )
392+ . filter_map ( |filter| {
393+ self . source_subgraph_stores
394+ . get ( & filter. subgraph )
395+ . map ( |store| {
396+ let store = store. clone ( ) ;
397+ let schema = store. input_schema ( ) ;
398+
399+ async move {
400+ let entities =
401+ get_entities_for_range ( & store, filter, & schema, from, to) . await ?;
402+ Ok :: < _ , Error > ( ( filter. subgraph . clone ( ) , entities) )
403+ }
404+ } )
405+ } )
406+ . collect :: < Vec < _ > > ( ) ;
407+
408+ if futures. is_empty ( ) {
409+ return Ok ( Vec :: new ( ) ) ;
410+ }
411+
412+ futures03:: future:: try_join_all ( futures) . await
355413 }
356414}
357415
358416fn create_subgraph_trigger_from_entities (
359- filter : & SubgraphFilter ,
417+ subgraph : & DeploymentHash ,
360418 entities : Vec < EntitySourceOperation > ,
361419) -> Vec < subgraph:: TriggerData > {
362420 entities
363421 . into_iter ( )
364422 . map ( |entity| subgraph:: TriggerData {
365- source : filter . subgraph . clone ( ) ,
423+ source : subgraph. clone ( ) ,
366424 entity,
367425 } )
368426 . collect ( )
@@ -371,21 +429,27 @@ fn create_subgraph_trigger_from_entities(
371429async fn create_subgraph_triggers < C : Blockchain > (
372430 logger : Logger ,
373431 blocks : Vec < C :: Block > ,
374- filter : & SubgraphFilter ,
375- mut entities : BTreeMap < BlockNumber , Vec < EntitySourceOperation > > ,
432+ subgraph_data : Vec < (
433+ DeploymentHash ,
434+ BTreeMap < BlockNumber , Vec < EntitySourceOperation > > ,
435+ ) > ,
376436) -> Result < Vec < BlockWithTriggers < C > > , Error > {
377437 let logger_clone = logger. cheap_clone ( ) ;
378-
379438 let blocks: Vec < BlockWithTriggers < C > > = blocks
380439 . into_iter ( )
381440 . map ( |block| {
382441 let block_number = block. number ( ) ;
383- let trigger_data = entities
384- . remove ( & block_number)
385- . map ( |e| create_subgraph_trigger_from_entities ( filter, e) )
386- . unwrap_or_else ( Vec :: new) ;
442+ let mut all_trigger_data = Vec :: new ( ) ;
443+
444+ for ( hash, entities) in subgraph_data. iter ( ) {
445+ if let Some ( block_entities) = entities. get ( & block_number) {
446+ let trigger_data =
447+ create_subgraph_trigger_from_entities ( hash, block_entities. clone ( ) ) ;
448+ all_trigger_data. extend ( trigger_data) ;
449+ }
450+ }
387451
388- BlockWithTriggers :: new_with_subgraph_triggers ( block, trigger_data , & logger_clone)
452+ BlockWithTriggers :: new_with_subgraph_triggers ( block, all_trigger_data , & logger_clone)
389453 } )
390454 . collect ( ) ;
391455
@@ -397,36 +461,6 @@ pub enum SubgraphTriggerScanRange<C: Blockchain> {
397461 Range ( BlockNumber , BlockNumber ) ,
398462}
399463
400- async fn scan_subgraph_triggers < C : Blockchain > (
401- logger : & Logger ,
402- store : & Arc < dyn SourceableStore > ,
403- adapter : & Arc < dyn TriggersAdapter < C > > ,
404- schema : & InputSchema ,
405- filter : & SubgraphFilter ,
406- range : SubgraphTriggerScanRange < C > ,
407- ) -> Result < Vec < BlockWithTriggers < C > > , Error > {
408- match range {
409- SubgraphTriggerScanRange :: Single ( block) => {
410- let entities =
411- get_entities_for_range ( store, filter, schema, block. number ( ) , block. number ( ) )
412- . await ?;
413- create_subgraph_triggers :: < C > ( logger. clone ( ) , vec ! [ block] , filter, entities) . await
414- }
415- SubgraphTriggerScanRange :: Range ( from, to) => {
416- let entities = get_entities_for_range ( store, filter, schema, from, to) . await ?;
417- let mut block_numbers: HashSet < BlockNumber > = entities. keys ( ) . cloned ( ) . collect ( ) ;
418- // Ensure the 'to' block is included in the block_numbers
419- block_numbers. insert ( to) ;
420-
421- let blocks = adapter
422- . load_block_ptrs_by_numbers ( logger. clone ( ) , block_numbers)
423- . await ?;
424-
425- create_subgraph_triggers :: < C > ( logger. clone ( ) , blocks, filter, entities) . await
426- }
427- }
428- }
429-
430464#[ derive( Debug , Clone , Eq , PartialEq ) ]
431465pub enum EntityOperationKind {
432466 Create ,
@@ -474,11 +508,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
474508 to : BlockNumber ,
475509 filter : & Arc < TriggerFilterWrapper < C > > ,
476510 ) -> Result < ( Vec < BlockWithTriggers < C > > , BlockNumber ) , Error > {
477- if let Some ( subgraph_filter ) = filter. subgraph_filter . first ( ) {
511+ if ! filter. subgraph_filter . is_empty ( ) {
478512 let blocks_with_triggers = self
479513 . blocks_with_subgraph_triggers (
480514 logger,
481- subgraph_filter,
515+ & filter . subgraph_filter ,
482516 SubgraphTriggerScanRange :: Range ( from, to) ,
483517 )
484518 . await ?;
@@ -504,11 +538,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
504538 "block_hash" => block. hash( ) . hash_hex( ) ,
505539 ) ;
506540
507- if let Some ( subgraph_filter ) = filter. subgraph_filter . first ( ) {
541+ if ! filter. subgraph_filter . is_empty ( ) {
508542 let blocks_with_triggers = self
509543 . blocks_with_subgraph_triggers (
510544 logger,
511- subgraph_filter,
545+ & filter . subgraph_filter ,
512546 SubgraphTriggerScanRange :: Single ( block) ,
513547 )
514548 . await ?;
0 commit comments