@@ -16,6 +16,7 @@ use graph::blockchain::{
1616} ;
1717use graph:: components:: store:: { EmptyStore , GetScope , ReadStore , StoredDynamicDataSource } ;
1818use graph:: components:: subgraph:: InstanceDSTemplate ;
19+ use graph:: components:: trigger_processor:: RunnableTriggers ;
1920use graph:: components:: {
2021 store:: ModificationsAndCache ,
2122 subgraph:: { MappingError , PoICausalityRegion , ProofOfIndexing , SharedProofOfIndexing } ,
@@ -537,6 +538,33 @@ where
537538 }
538539 }
539540
541+ async fn match_and_decode_many < ' a , F > (
542+ & ' a self ,
543+ logger : & Logger ,
544+ block : & Arc < C :: Block > ,
545+ triggers : Vec < Trigger < C > > ,
546+ hosts_filter : F ,
547+ ) -> Result < Vec < RunnableTriggers < ' a , C > > , MappingError >
548+ where
549+ F : Fn ( & TriggerData < C > ) -> Box < dyn Iterator < Item = & ' a T :: Host > + Send + ' a > ,
550+ {
551+ let triggers = triggers. into_iter ( ) . map ( |t| match t {
552+ Trigger :: Chain ( t) => TriggerData :: Onchain ( t) ,
553+ Trigger :: Subgraph ( t) => TriggerData :: Subgraph ( t) ,
554+ } ) ;
555+
556+ self . ctx
557+ . decoder
558+ . match_and_decode_many (
559+ & logger,
560+ & block,
561+ triggers,
562+ hosts_filter,
563+ & self . metrics . subgraph ,
564+ )
565+ . await
566+ }
567+
540568 /// Processes a block and returns the updated context and a boolean flag indicating
541569 /// whether new dynamic data sources have been added to the subgraph.
542570 async fn process_block (
@@ -584,18 +612,7 @@ where
584612 // Match and decode all triggers in the block
585613 let hosts_filter = |trigger : & TriggerData < C > | self . ctx . instance . hosts_for_trigger ( trigger) ;
586614 let match_res = self
587- . ctx
588- . decoder
589- . match_and_decode_many (
590- & logger,
591- & block,
592- triggers. into_iter ( ) . map ( |t| match t {
593- Trigger :: Chain ( t) => TriggerData :: Onchain ( t) ,
594- Trigger :: Subgraph ( t) => TriggerData :: Subgraph ( t) ,
595- } ) ,
596- hosts_filter,
597- & self . metrics . subgraph ,
598- )
615+ . match_and_decode_many ( & logger, & block, triggers, hosts_filter)
599616 . await ;
600617
601618 // Process events one after the other, passing in entity operations
@@ -727,19 +744,11 @@ where
727744
728745 // Process the triggers in each host in the same order the
729746 // corresponding data sources have been created.
747+ let hosts_filter = |_: & ' _ TriggerData < C > | -> Box < dyn Iterator < Item = _ > + Send > {
748+ Box :: new ( runtime_hosts. iter ( ) . map ( Arc :: as_ref) )
749+ } ;
730750 let match_res: Result < Vec < _ > , _ > = self
731- . ctx
732- . decoder
733- . match_and_decode_many (
734- & logger,
735- & block,
736- triggers. into_iter ( ) . map ( |t| match t {
737- Trigger :: Chain ( t) => TriggerData :: Onchain ( t) ,
738- Trigger :: Subgraph ( _) => unreachable ! ( ) , // TODO(krishna): Re-evaulate this
739- } ) ,
740- |_| Box :: new ( runtime_hosts. iter ( ) . map ( Arc :: as_ref) ) ,
741- & self . metrics . subgraph ,
742- )
751+ . match_and_decode_many ( & logger, & block, triggers, hosts_filter)
743752 . await ;
744753
745754 let mut res = Ok ( block_state) ;
0 commit comments