From c72998b93230e2e84f5618818020b87f36da8c36 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 12:24:11 -0700 Subject: [PATCH 1/9] core: Extract transact_block_state into function Reduce the duplication between normal and wasm block processing by factoring the common part into a function --- core/src/subgraph/runner.rs | 433 ++++++++++++++---------------------- 1 file changed, 173 insertions(+), 260 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 71c36886d2e..fef9b6c0c2e 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -356,6 +356,157 @@ where } } + async fn transact_block_state( + &mut self, + logger: &Logger, + block_ptr: BlockPtr, + firehose_cursor: FirehoseCursor, + block_time: BlockTime, + mut block_state: BlockState, + proof_of_indexing: SharedProofOfIndexing, + offchain_mods: Vec, + processed_offchain_data_sources: Vec, + cancel_handle: &CancelHandle, + ) -> Result { + let has_errors = block_state.has_errors(); + let is_non_fatal_errors_active = self + .inputs + .features + .contains(&SubgraphFeature::NonFatalErrors); + + // Avoid writing to store if block stream has been canceled + if cancel_handle.is_canceled() { + return Err(ProcessingError::Canceled); + } + + if let Some(proof_of_indexing) = proof_of_indexing.into_inner() { + update_proof_of_indexing( + proof_of_indexing, + block_time, + &self.metrics.host.stopwatch, + &mut block_state.entity_cache, + ) + .await + .non_deterministic()?; + } + + let section = self + .metrics + .host + .stopwatch + .start_section("as_modifications"); + let ModificationsAndCache { + modifications: mut mods, + entity_lfu_cache: cache, + evict_stats, + } = block_state + .entity_cache + .as_modifications(block_ptr.number) + .map_err(|e| ProcessingError::Unknown(e.into()))?; + section.end(); + + trace!(self.logger, "Entity cache statistics"; + "weight" => evict_stats.new_weight, + "evicted_weight" => evict_stats.evicted_weight, + "count" => evict_stats.new_count, + "evicted_count" => evict_stats.evicted_count, + "stale_update" => evict_stats.stale_update, + "hit_rate" => format!("{:.0}%", evict_stats.hit_rate_pct()), + "accesses" => evict_stats.accesses, + "evict_time_ms" => evict_stats.evict_time.as_millis()); + + mods.extend(offchain_mods); + + // Put the cache back in the state, asserting that the placeholder cache was not used. + assert!(self.state.entity_lfu_cache.is_empty()); + self.state.entity_lfu_cache = cache; + + if !mods.is_empty() { + info!(&logger, "Applying {} entity operation(s)", mods.len()); + } + + let err_count = block_state.deterministic_errors.len(); + for (i, e) in block_state.deterministic_errors.iter().enumerate() { + let message = format!("{:#}", e).replace('\n', "\t"); + error!(&logger, "Subgraph error {}/{}", i + 1, err_count; + "error" => message, + "code" => LogCode::SubgraphSyncingFailure + ); + } + + // Transact entity operations into the store and update the + // subgraph's block stream pointer + let _section = self.metrics.host.stopwatch.start_section("transact_block"); + let start = Instant::now(); + + // If a deterministic error has happened, make the PoI to be the only entity that'll be stored. + if has_errors && !is_non_fatal_errors_active { + let is_poi_entity = + |entity_mod: &EntityModification| entity_mod.key().entity_type.is_poi(); + mods.retain(is_poi_entity); + // Confidence check + assert!( + mods.len() == 1, + "There should be only one PoI EntityModification" + ); + } + + let BlockState { + deterministic_errors, + persisted_data_sources, + metrics: block_state_metrics, + .. + } = block_state; + + let first_error = deterministic_errors.first().cloned(); + + let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; + + self.inputs + .store + .transact_block_operations( + block_ptr.clone(), + block_time, + firehose_cursor, + mods, + &self.metrics.host.stopwatch, + persisted_data_sources, + deterministic_errors, + processed_offchain_data_sources, + is_non_fatal_errors_active, + is_caught_up, + ) + .await + .classify() + .detail("Failed to transact block operations")?; + + // For subgraphs with `nonFatalErrors` feature disabled, we consider + // any error as fatal. + // + // So we do an early return to make the subgraph stop processing blocks. + // + // In this scenario the only entity that is stored/transacted is the PoI, + // all of the others are discarded. + if has_errors && !is_non_fatal_errors_active { + // Only the first error is reported. + return Err(ProcessingError::Deterministic(Box::new( + first_error.unwrap(), + ))); + } + + let elapsed = start.elapsed().as_secs_f64(); + self.metrics + .subgraph + .block_ops_transaction_duration + .observe(elapsed); + + block_state_metrics + .flush_metrics_to_store(&logger, block_ptr, self.inputs.deployment.id) + .non_deterministic()?; + + Ok(has_errors) + } + /// Processes a block and returns the updated context and a boolean flag indicating /// whether new dynamic data sources have been added to the subgraph. async fn process_block( @@ -625,55 +776,6 @@ where } } - let has_errors = block_state.has_errors(); - let is_non_fatal_errors_active = self - .inputs - .features - .contains(&SubgraphFeature::NonFatalErrors); - - // Apply entity operations and advance the stream - - // Avoid writing to store if block stream has been canceled - if block_stream_cancel_handle.is_canceled() { - return Err(ProcessingError::Canceled); - } - - if let Some(proof_of_indexing) = proof_of_indexing.into_inner() { - update_proof_of_indexing( - proof_of_indexing, - block.timestamp(), - &self.metrics.host.stopwatch, - &mut block_state.entity_cache, - ) - .await - .non_deterministic()?; - } - - let section = self - .metrics - .host - .stopwatch - .start_section("as_modifications"); - let ModificationsAndCache { - modifications: mut mods, - entity_lfu_cache: cache, - evict_stats, - } = block_state - .entity_cache - .as_modifications(block.number()) - .map_err(|e| ProcessingError::Unknown(e.into()))?; - section.end(); - - trace!(self.logger, "Entity cache statistics"; - "weight" => evict_stats.new_weight, - "evicted_weight" => evict_stats.evicted_weight, - "count" => evict_stats.new_count, - "evicted_count" => evict_stats.evicted_count, - "stale_update" => evict_stats.stale_update, - "hit_rate" => format!("{:.0}%", evict_stats.hit_rate_pct()), - "accesses" => evict_stats.accesses, - "evict_time_ms" => evict_stats.evict_time.as_millis()); - // Check for offchain events and process them, including their entity modifications in the // set to be transacted. let offchain_events = self @@ -685,95 +787,23 @@ where self.handle_offchain_triggers(offchain_events, &block) .await .non_deterministic()?; - mods.extend(offchain_mods); - - // Put the cache back in the state, asserting that the placeholder cache was not used. - assert!(self.state.entity_lfu_cache.is_empty()); - self.state.entity_lfu_cache = cache; + block_state + .persisted_data_sources + .extend(persisted_off_chain_data_sources); - if !mods.is_empty() { - info!(&logger, "Applying {} entity operation(s)", mods.len()); - } - - let err_count = block_state.deterministic_errors.len(); - for (i, e) in block_state.deterministic_errors.iter().enumerate() { - let message = format!("{:#}", e).replace('\n', "\t"); - error!(&logger, "Subgraph error {}/{}", i + 1, err_count; - "error" => message, - "code" => LogCode::SubgraphSyncingFailure - ); - } - - // Transact entity operations into the store and update the - // subgraph's block stream pointer - let _section = self.metrics.host.stopwatch.start_section("transact_block"); - let start = Instant::now(); - - // If a deterministic error has happened, make the PoI to be the only entity that'll be stored. - if has_errors && !is_non_fatal_errors_active { - let is_poi_entity = - |entity_mod: &EntityModification| entity_mod.key().entity_type.is_poi(); - mods.retain(is_poi_entity); - // Confidence check - assert!( - mods.len() == 1, - "There should be only one PoI EntityModification" - ); - } - - let BlockState { - deterministic_errors, - mut persisted_data_sources, - metrics: block_state_metrics, - .. - } = block_state; - - let first_error = deterministic_errors.first().cloned(); - - let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; - - persisted_data_sources.extend(persisted_off_chain_data_sources); - self.inputs - .store - .transact_block_operations( + let has_errors = self + .transact_block_state( + &logger, block_ptr.clone(), + firehose_cursor.clone(), block.timestamp(), - firehose_cursor, - mods, - &self.metrics.host.stopwatch, - persisted_data_sources, - deterministic_errors, + block_state, + proof_of_indexing, + offchain_mods, processed_offchain_data_sources, - is_non_fatal_errors_active, - is_caught_up, + block_stream_cancel_handle, ) - .await - .classify() - .detail("Failed to transact block operations")?; - - // For subgraphs with `nonFatalErrors` feature disabled, we consider - // any error as fatal. - // - // So we do an early return to make the subgraph stop processing blocks. - // - // In this scenario the only entity that is stored/transacted is the PoI, - // all of the others are discarded. - if has_errors && !is_non_fatal_errors_active { - // Only the first error is reported. - return Err(ProcessingError::Deterministic(Box::new( - first_error.unwrap(), - ))); - } - - let elapsed = start.elapsed().as_secs_f64(); - self.metrics - .subgraph - .block_ops_transaction_duration - .observe(elapsed); - - block_state_metrics - .flush_metrics_to_store(&logger, block_ptr, self.inputs.deployment.id) - .non_deterministic()?; + .await?; // To prevent a buggy pending version from replacing a current version, if errors are // present the subgraph will be unassigned. @@ -1336,7 +1366,7 @@ where // Causality region for onchain triggers. let causality_region = PoICausalityRegion::from_network(&self.inputs.network); - let mut block_state = { + let block_state = { match self .process_wasm_block( &proof_of_indexing, @@ -1371,136 +1401,19 @@ where } }; - let has_errors = block_state.has_errors(); - let is_non_fatal_errors_active = self - .inputs - .features - .contains(&SubgraphFeature::NonFatalErrors); - - // Apply entity operations and advance the stream - - // Avoid writing to store if block stream has been canceled - if cancel_handle.is_canceled() { - return Err(ProcessingError::Canceled.into()); - } - - if let Some(proof_of_indexing) = proof_of_indexing.into_inner() { - update_proof_of_indexing( - proof_of_indexing, - block_time, - &self.metrics.host.stopwatch, - &mut block_state.entity_cache, - ) - .await - .non_deterministic()?; - } - - let section = self - .metrics - .host - .stopwatch - .start_section("as_modifications"); - let ModificationsAndCache { - modifications: mut mods, - entity_lfu_cache: cache, - evict_stats, - } = block_state - .entity_cache - .as_modifications(block_ptr.number) - .map_err(|e| ProcessingError::Unknown(e.into()))?; - section.end(); - - trace!(self.logger, "Entity cache statistics"; - "weight" => evict_stats.new_weight, - "evicted_weight" => evict_stats.evicted_weight, - "count" => evict_stats.new_count, - "evicted_count" => evict_stats.evicted_count, - "stale_update" => evict_stats.stale_update, - "hit_rate" => format!("{:.0}%", evict_stats.hit_rate_pct()), - "accesses" => evict_stats.accesses, - "evict_time_ms" => evict_stats.evict_time.as_millis()); - - // Put the cache back in the state, asserting that the placeholder cache was not used. - assert!(self.state.entity_lfu_cache.is_empty()); - self.state.entity_lfu_cache = cache; - - if !mods.is_empty() { - info!(&logger, "Applying {} entity operation(s)", mods.len()); - } - - let err_count = block_state.deterministic_errors.len(); - for (i, e) in block_state.deterministic_errors.iter().enumerate() { - let message = format!("{:#}", e).replace('\n', "\t"); - error!(&logger, "Subgraph error {}/{}", i + 1, err_count; - "error" => message, - "code" => LogCode::SubgraphSyncingFailure - ); - } - - // Transact entity operations into the store and update the - // subgraph's block stream pointer - let _section = self.metrics.host.stopwatch.start_section("transact_block"); - let start = Instant::now(); - - // If a deterministic error has happened, make the PoI to be the only entity that'll be stored. - if has_errors && !is_non_fatal_errors_active { - let is_poi_entity = - |entity_mod: &EntityModification| entity_mod.key().entity_type.is_poi(); - mods.retain(is_poi_entity); - // Confidence check - assert!( - mods.len() == 1, - "There should be only one PoI EntityModification" - ); - } - - let BlockState { - deterministic_errors, - .. - } = block_state; - - let first_error = deterministic_errors.first().cloned(); - - // We consider a subgraph caught up when it's at most 1 blocks behind the chain head. - let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; - - self.inputs - .store - .transact_block_operations( - block_ptr, + let has_errors = self + .transact_block_state( + &logger, + block_ptr.clone(), + cursor.clone(), block_time, - cursor, - mods, - &self.metrics.host.stopwatch, + block_state, + proof_of_indexing, vec![], - deterministic_errors, vec![], - is_non_fatal_errors_active, - is_caught_up, + cancel_handle, ) - .await - .classify() - .detail("Failed to transact block operations")?; - - // For subgraphs with `nonFatalErrors` feature disabled, we consider - // any error as fatal. - // - // So we do an early return to make the subgraph stop processing blocks. - // - // In this scenario the only entity that is stored/transacted is the PoI, - // all of the others are discarded. - if has_errors && !is_non_fatal_errors_active { - // Only the first error is reported. - return Err(ProcessingError::Deterministic(Box::new( - first_error.unwrap(), - ))); - } - - let elapsed = start.elapsed().as_secs_f64(); - self.metrics - .subgraph - .block_ops_transaction_duration - .observe(elapsed); + .await?; // To prevent a buggy pending version from replacing a current version, if errors are // present the subgraph will be unassigned. From 6f558d6bd1d9805f27508c1b1854b92f6f3b451d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 15:38:01 -0700 Subject: [PATCH 2/9] core: block_state does not need to be mutable in transact_block_state --- core/src/subgraph/runner.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index fef9b6c0c2e..df589fc8a1a 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -362,7 +362,7 @@ where block_ptr: BlockPtr, firehose_cursor: FirehoseCursor, block_time: BlockTime, - mut block_state: BlockState, + block_state: BlockState, proof_of_indexing: SharedProofOfIndexing, offchain_mods: Vec, processed_offchain_data_sources: Vec, @@ -374,6 +374,14 @@ where .features .contains(&SubgraphFeature::NonFatalErrors); + let BlockState { + deterministic_errors, + persisted_data_sources, + metrics: block_state_metrics, + mut entity_cache, + .. + } = block_state; + // Avoid writing to store if block stream has been canceled if cancel_handle.is_canceled() { return Err(ProcessingError::Canceled); @@ -384,7 +392,7 @@ where proof_of_indexing, block_time, &self.metrics.host.stopwatch, - &mut block_state.entity_cache, + &mut entity_cache, ) .await .non_deterministic()?; @@ -399,8 +407,7 @@ where modifications: mut mods, entity_lfu_cache: cache, evict_stats, - } = block_state - .entity_cache + } = entity_cache .as_modifications(block_ptr.number) .map_err(|e| ProcessingError::Unknown(e.into()))?; section.end(); @@ -425,8 +432,8 @@ where info!(&logger, "Applying {} entity operation(s)", mods.len()); } - let err_count = block_state.deterministic_errors.len(); - for (i, e) in block_state.deterministic_errors.iter().enumerate() { + let err_count = deterministic_errors.len(); + for (i, e) in deterministic_errors.iter().enumerate() { let message = format!("{:#}", e).replace('\n', "\t"); error!(&logger, "Subgraph error {}/{}", i + 1, err_count; "error" => message, @@ -451,13 +458,6 @@ where ); } - let BlockState { - deterministic_errors, - persisted_data_sources, - metrics: block_state_metrics, - .. - } = block_state; - let first_error = deterministic_errors.first().cloned(); let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; From c67f109a9d00e07b38704019ef316fe66d13eb0b Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 15:43:18 -0700 Subject: [PATCH 3/9] core: Make the handling of nonfatal errors a little less mindbending --- core/src/subgraph/inputs.rs | 8 ++++++++ core/src/subgraph/runner.rs | 16 ++++------------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index ca52073ab06..91bbdd131f4 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -75,4 +75,12 @@ impl IndexingInputs { instrument: *instrument, } } + + pub fn errors_are_non_fatal(&self) -> bool { + self.features.contains(&SubgraphFeature::NonFatalErrors) + } + + pub fn errors_are_fatal(&self) -> bool { + !self.features.contains(&SubgraphFeature::NonFatalErrors) + } } diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index df589fc8a1a..808d7318784 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -21,10 +21,7 @@ use graph::components::{ subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing}, }; use graph::data::store::scalar::Bytes; -use graph::data::subgraph::{ - schema::{SubgraphError, SubgraphHealth}, - SubgraphFeature, -}; +use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth}; use graph::data_source::{ offchain, CausalityRegion, DataSource, DataSourceCreationError, TriggerData, }; @@ -369,11 +366,6 @@ where cancel_handle: &CancelHandle, ) -> Result { let has_errors = block_state.has_errors(); - let is_non_fatal_errors_active = self - .inputs - .features - .contains(&SubgraphFeature::NonFatalErrors); - let BlockState { deterministic_errors, persisted_data_sources, @@ -447,7 +439,7 @@ where let start = Instant::now(); // If a deterministic error has happened, make the PoI to be the only entity that'll be stored. - if has_errors && !is_non_fatal_errors_active { + if has_errors && self.inputs.errors_are_fatal() { let is_poi_entity = |entity_mod: &EntityModification| entity_mod.key().entity_type.is_poi(); mods.retain(is_poi_entity); @@ -473,7 +465,7 @@ where persisted_data_sources, deterministic_errors, processed_offchain_data_sources, - is_non_fatal_errors_active, + self.inputs.errors_are_non_fatal(), is_caught_up, ) .await @@ -487,7 +479,7 @@ where // // In this scenario the only entity that is stored/transacted is the PoI, // all of the others are discarded. - if has_errors && !is_non_fatal_errors_active { + if has_errors && self.inputs.errors_are_fatal() { // Only the first error is reported. return Err(ProcessingError::Deterministic(Box::new( first_error.unwrap(), From 91bd6a7446546f6e37487d90bbeeb74e3e550a9a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 16:32:43 -0700 Subject: [PATCH 4/9] core, graph: Specialize Cancelable a little to what we actually use This is so we can write down types involving cancelables --- core/src/subgraph/registrar.rs | 4 +-- core/src/subgraph/runner.rs | 2 +- graph/src/blockchain/block_stream.rs | 2 +- graph/src/ext/futures.rs | 51 ++++++++++------------------ 4 files changed, 21 insertions(+), 38 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 3a712b6daa9..325c4c4560a 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -120,9 +120,7 @@ where .compat() .map_err(SubgraphAssignmentProviderError::Unknown) .map_err(CancelableError::Error) - .cancelable(&assignment_event_stream_cancel_handle, || { - Err(CancelableError::Cancel) - }) + .cancelable(&assignment_event_stream_cancel_handle) .compat() .for_each(move |assignment_event| { assert_eq!(assignment_event.node_id(), &node_id); diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 808d7318784..3fd068188f6 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -267,7 +267,7 @@ where ) .await? .map_err(CancelableError::from) - .cancelable(&block_stream_canceler, || Err(CancelableError::Cancel)); + .cancelable(&block_stream_canceler); // Keep the stream's cancel guard around to be able to shut it down when the subgraph // deployment is unassigned diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index b9f602d802c..99f2dabd1ac 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1024,7 +1024,7 @@ mod test { let mut stream = BufferedBlockStream::spawn_from_stream(buffer_size, stream) .map_err(CancelableError::Error) - .cancelable(&guard, || Err(CancelableError::Cancel)); + .cancelable(&guard); let mut blocks = HashSet::::new(); let mut count = 0; diff --git a/graph/src/ext/futures.rs b/graph/src/ext/futures.rs index c25550a426f..7c5eb0fc96e 100644 --- a/graph/src/ext/futures.rs +++ b/graph/src/ext/futures.rs @@ -12,42 +12,45 @@ use std::time::Duration; /// /// Created by calling `cancelable` extension method. /// Can be canceled through the corresponding `CancelGuard`. -pub struct Cancelable { +pub struct Cancelable { inner: T, cancel_receiver: Fuse>, - on_cancel: C, } -impl Cancelable { +impl Cancelable { pub fn get_mut(&mut self) -> &mut T { &mut self.inner } } /// It's not viable to use `select` directly, so we do a custom implementation. -impl S::Item + Unpin> Stream for Cancelable { - type Item = S::Item; +impl> + Unpin, R, E: Display + Debug> Stream for Cancelable { + type Item = Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Error if the stream was canceled by dropping the sender. match self.cancel_receiver.poll_unpin(cx) { Poll::Ready(Ok(_)) => unreachable!(), - Poll::Ready(Err(_)) => Poll::Ready(Some((self.on_cancel)())), - Poll::Pending => Pin::new(&mut self.inner).poll_next(cx), + Poll::Ready(Err(_)) => Poll::Ready(Some(Err(CancelableError::Cancel))), + Poll::Pending => Pin::new(&mut self.inner) + .poll_next(cx) + .map_err(|x| CancelableError::Error(x)), } } } -impl F::Output + Unpin> Future for Cancelable { - type Output = F::Output; +impl> + Unpin, R, E: Display + Debug> Future for Cancelable { + type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Error if the future was canceled by dropping the sender. // `canceled` is fused so we may ignore `Ok`s. match self.cancel_receiver.poll_unpin(cx) { Poll::Ready(Ok(_)) => unreachable!(), - Poll::Ready(Err(_)) => Poll::Ready((self.on_cancel)()), - Poll::Pending => Pin::new(&mut self.inner).poll(cx), + Poll::Ready(Err(_)) => Poll::Ready(Err(CancelableError::Cancel)), + Poll::Pending => Pin::new(&mut self.inner) + .poll(cx) + .map_err(|x| CancelableError::Error(x)), } } } @@ -209,25 +212,16 @@ pub trait StreamExtension: Stream + Sized { /// When `cancel` is called on a `CancelGuard` or it is dropped, /// `Cancelable` receives an error. /// - fn cancelable Self::Item>( - self, - guard: &impl Canceler, - on_cancel: C, - ) -> Cancelable; + fn cancelable(self, guard: &impl Canceler) -> Cancelable; } impl StreamExtension for S { - fn cancelable S::Item>( - self, - guard: &impl Canceler, - on_cancel: C, - ) -> Cancelable { + fn cancelable(self, guard: &impl Canceler) -> Cancelable { let (canceler, cancel_receiver) = oneshot::channel(); guard.add_cancel_sender(canceler); Cancelable { inner: self, cancel_receiver: cancel_receiver.fuse(), - on_cancel, } } } @@ -237,27 +231,18 @@ pub trait FutureExtension: Future + Sized { /// `Cancelable` receives an error. /// /// `on_cancel` is called to make an error value upon cancelation. - fn cancelable Self::Output>( - self, - guard: &impl Canceler, - on_cancel: C, - ) -> Cancelable; + fn cancelable(self, guard: &impl Canceler) -> Cancelable; fn timeout(self, dur: Duration) -> tokio::time::Timeout; } impl FutureExtension for F { - fn cancelable F::Output>( - self, - guard: &impl Canceler, - on_cancel: C, - ) -> Cancelable { + fn cancelable(self, guard: &impl Canceler) -> Cancelable { let (canceler, cancel_receiver) = oneshot::channel(); guard.add_cancel_sender(canceler); Cancelable { inner: self, cancel_receiver: cancel_receiver.fuse(), - on_cancel, } } From c91a09efcc0b87102bec102aa4d14af4b1f7cf09 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 16:54:37 -0700 Subject: [PATCH 5/9] core: Reduce noise by storing CancelHandle in Runner --- core/src/subgraph/registrar.rs | 1 - core/src/subgraph/runner.rs | 115 ++++++++++++++++----------------- 2 files changed, 54 insertions(+), 62 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 325c4c4560a..6f7ae17425f 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -119,7 +119,6 @@ where assignment_event_stream .compat() .map_err(SubgraphAssignmentProviderError::Unknown) - .map_err(CancelableError::Error) .cancelable(&assignment_event_stream_cancel_handle) .compat() .for_each(move |assignment_event| { diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 3fd068188f6..e69f23ee67d 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -8,7 +8,7 @@ use crate::subgraph::stream::new_block_stream; use anyhow::Context as _; use async_trait::async_trait; use graph::blockchain::block_stream::{ - BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, + BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, }; use graph::blockchain::{ Block, BlockTime, Blockchain, DataSource as _, SubgraphFilter, Trigger, TriggerFilter as _, @@ -26,8 +26,8 @@ use graph::data_source::{ offchain, CausalityRegion, DataSource, DataSourceCreationError, TriggerData, }; use graph::env::EnvVars; +use graph::ext::futures::Cancelable; use graph::futures03::stream::StreamExt; -use graph::futures03::TryStreamExt; use graph::prelude::{ anyhow, hex, retry, thiserror, BlockNumber, BlockPtr, BlockState, CancelGuard, CancelHandle, CancelToken as _, CancelableError, CheapClone as _, EntityCache, EntityModification, Error, @@ -60,6 +60,7 @@ where inputs: Arc>, logger: Logger, pub metrics: RunnerMetrics, + cancel_handle: Option, } #[derive(Debug, thiserror::Error)] @@ -99,6 +100,7 @@ where }, logger, metrics, + cancel_handle: None, } } @@ -206,6 +208,39 @@ where self.build_filter() } + async fn start_block_stream(&mut self) -> Result>>, Error> { + let block_stream_canceler = CancelGuard::new(); + let block_stream_cancel_handle = block_stream_canceler.handle(); + // TriggerFilter needs to be rebuilt eveytime the blockstream is restarted + self.ctx.filter = Some(self.build_filter()); + + let block_stream = new_block_stream( + &self.inputs, + self.ctx.filter.clone().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line + &self.metrics.subgraph, + ) + .await? + .cancelable(&block_stream_canceler); + + self.cancel_handle = Some(block_stream_cancel_handle); + + // Keep the stream's cancel guard around to be able to shut it down when the subgraph + // deployment is unassigned + self.ctx + .instances + .insert(self.inputs.deployment.id, block_stream_canceler); + + Ok(block_stream) + } + + fn is_canceled(&self) -> bool { + if let Some(ref cancel_handle) = self.cancel_handle { + cancel_handle.is_canceled() + } else { + false + } + } + pub async fn run(self) -> Result<(), SubgraphRunnerError> { self.run_inner(false).await.map(|_| ()) } @@ -255,27 +290,9 @@ where loop { debug!(self.logger, "Starting or restarting subgraph"); - let block_stream_canceler = CancelGuard::new(); - let block_stream_cancel_handle = block_stream_canceler.handle(); - // TriggerFilter needs to be rebuilt eveytime the blockstream is restarted - self.ctx.filter = Some(self.build_filter()); + let mut block_stream = self.start_block_stream().await?; - let mut block_stream = new_block_stream( - &self.inputs, - self.ctx.filter.clone().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line - &self.metrics.subgraph, - ) - .await? - .map_err(CancelableError::from) - .cancelable(&block_stream_canceler); - - // Keep the stream's cancel guard around to be able to shut it down when the subgraph - // deployment is unassigned - self.ctx - .instances - .insert(self.inputs.deployment.id, block_stream_canceler); - - debug!(self.logger, "Starting block stream"); + debug!(self.logger, "Started block stream"); self.metrics.subgraph.deployment_status.running(); @@ -291,21 +308,18 @@ where // This will require some code refactor in how the BlockStream is created let block_start = Instant::now(); - let action = self - .handle_stream_event(event, &block_stream_cancel_handle) - .await - .map(|res| { - self.metrics - .subgraph - .observe_block_processed(block_start.elapsed(), res.block_finished()); - res - })?; + let action = self.handle_stream_event(event).await.map(|res| { + self.metrics + .subgraph + .observe_block_processed(block_start.elapsed(), res.block_finished()); + res + })?; self.update_deployment_synced_metric(); // It is possible that the subgraph was unassigned, but the runner was in // a retry delay state and did not observe the cancel signal. - if block_stream_cancel_handle.is_canceled() { + if self.is_canceled() { // It is also possible that the runner was in a retry delay state while // the subgraph was reassigned and a new runner was started. if self.ctx.instances.contains(&self.inputs.deployment.id) { @@ -363,7 +377,6 @@ where proof_of_indexing: SharedProofOfIndexing, offchain_mods: Vec, processed_offchain_data_sources: Vec, - cancel_handle: &CancelHandle, ) -> Result { let has_errors = block_state.has_errors(); let BlockState { @@ -375,7 +388,7 @@ where } = block_state; // Avoid writing to store if block stream has been canceled - if cancel_handle.is_canceled() { + if self.is_canceled() { return Err(ProcessingError::Canceled); } @@ -503,7 +516,6 @@ where /// whether new dynamic data sources have been added to the subgraph. async fn process_block( &mut self, - block_stream_cancel_handle: &CancelHandle, block: BlockWithTriggers, firehose_cursor: FirehoseCursor, ) -> Result { @@ -793,7 +805,6 @@ where proof_of_indexing, offchain_mods, processed_offchain_data_sources, - block_stream_cancel_handle, ) .await?; @@ -1114,7 +1125,6 @@ where async fn handle_stream_event( &mut self, event: Option, CancelableError>>, - cancel_handle: &CancelHandle, ) -> Result { let action = match event { Some(Ok(BlockStreamEvent::ProcessWasmBlock( @@ -1130,14 +1140,7 @@ where .stopwatch .start_section(PROCESS_WASM_BLOCK_SECTION_NAME); let res = self - .handle_process_wasm_block( - block_ptr.clone(), - block_time, - data, - handler, - cursor, - cancel_handle, - ) + .handle_process_wasm_block(block_ptr.clone(), block_time, data, handler, cursor) .await; let start = Instant::now(); self.handle_action(start, block_ptr, res).await? @@ -1148,8 +1151,7 @@ where .stream .stopwatch .start_section(PROCESS_BLOCK_SECTION_NAME); - self.handle_process_block(block, cursor, cancel_handle) - .await? + self.handle_process_block(block, cursor).await? } Some(Ok(BlockStreamEvent::Revert(revert_to_ptr, cursor))) => { let _section = self @@ -1161,7 +1163,7 @@ where } // Log and drop the errors from the block_stream // The block stream will continue attempting to produce blocks - Some(Err(e)) => self.handle_err(e, cancel_handle).await?, + Some(Err(e)) => self.handle_err(e).await?, // If the block stream ends, that means that there is no more indexing to do. // Typically block streams produce indefinitely, but tests are an example of finite block streams. None => Action::Stop, @@ -1304,24 +1306,19 @@ trait StreamEventHandler { block_data: Box<[u8]>, handler: String, cursor: FirehoseCursor, - cancel_handle: &CancelHandle, ) -> Result; async fn handle_process_block( &mut self, block: BlockWithTriggers, cursor: FirehoseCursor, - cancel_handle: &CancelHandle, ) -> Result; async fn handle_revert( &mut self, revert_to_ptr: BlockPtr, cursor: FirehoseCursor, ) -> Result; - async fn handle_err( - &mut self, - err: CancelableError, - cancel_handle: &CancelHandle, - ) -> Result; + async fn handle_err(&mut self, err: CancelableError) + -> Result; fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool; } @@ -1338,7 +1335,6 @@ where block_data: Box<[u8]>, handler: String, cursor: FirehoseCursor, - cancel_handle: &CancelHandle, ) -> Result { let logger = self.logger.new(o!( "block_number" => format!("{:?}", block_ptr.number), @@ -1403,7 +1399,6 @@ where proof_of_indexing, vec![], vec![], - cancel_handle, ) .await?; @@ -1427,7 +1422,6 @@ where &mut self, block: BlockWithTriggers, cursor: FirehoseCursor, - cancel_handle: &CancelHandle, ) -> Result { let block_ptr = block.ptr(); self.metrics @@ -1460,7 +1454,7 @@ where let start = Instant::now(); - let res = self.process_block(cancel_handle, block, cursor).await; + let res = self.process_block(block, cursor).await; self.handle_action(start, block_ptr, res).await } @@ -1519,9 +1513,8 @@ where async fn handle_err( &mut self, err: CancelableError, - cancel_handle: &CancelHandle, ) -> Result { - if cancel_handle.is_canceled() { + if self.is_canceled() { debug!(&self.logger, "Subgraph block stream shut down cleanly"); return Ok(Action::Stop); } From a8cacba38733f58680ac43c72670d8724dac7743 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 17:28:25 -0700 Subject: [PATCH 6/9] core, graph: Factor out the decision to cancel a buggy subgraph --- core/src/subgraph/runner.rs | 131 +++++++++++----------- graph/src/components/subgraph/instance.rs | 4 - 2 files changed, 64 insertions(+), 71 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index e69f23ee67d..b56d2289b3f 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -36,6 +36,7 @@ use graph::prelude::{ }; use graph::schema::EntityKey; use graph::slog::{debug, error, info, o, trace, warn, Logger}; +use graph::util::lfu_cache::EvictStats; use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -377,8 +378,19 @@ where proof_of_indexing: SharedProofOfIndexing, offchain_mods: Vec, processed_offchain_data_sources: Vec, - ) -> Result { - let has_errors = block_state.has_errors(); + ) -> Result<(), ProcessingError> { + fn log_evict_stats(logger: &Logger, evict_stats: &EvictStats) { + trace!(logger, "Entity cache statistics"; + "weight" => evict_stats.new_weight, + "evicted_weight" => evict_stats.evicted_weight, + "count" => evict_stats.new_count, + "evicted_count" => evict_stats.evicted_count, + "stale_update" => evict_stats.stale_update, + "hit_rate" => format!("{:.0}%", evict_stats.hit_rate_pct()), + "accesses" => evict_stats.accesses, + "evict_time_ms" => evict_stats.evict_time.as_millis()); + } + let BlockState { deterministic_errors, persisted_data_sources, @@ -386,6 +398,8 @@ where mut entity_cache, .. } = block_state; + let first_error = deterministic_errors.first().cloned(); + let has_errors = first_error.is_some(); // Avoid writing to store if block stream has been canceled if self.is_canceled() { @@ -412,20 +426,10 @@ where modifications: mut mods, entity_lfu_cache: cache, evict_stats, - } = entity_cache - .as_modifications(block_ptr.number) - .map_err(|e| ProcessingError::Unknown(e.into()))?; + } = entity_cache.as_modifications(block_ptr.number).classify()?; section.end(); - trace!(self.logger, "Entity cache statistics"; - "weight" => evict_stats.new_weight, - "evicted_weight" => evict_stats.evicted_weight, - "count" => evict_stats.new_count, - "evicted_count" => evict_stats.evicted_count, - "stale_update" => evict_stats.stale_update, - "hit_rate" => format!("{:.0}%", evict_stats.hit_rate_pct()), - "accesses" => evict_stats.accesses, - "evict_time_ms" => evict_stats.evict_time.as_millis()); + log_evict_stats(&self.logger, &evict_stats); mods.extend(offchain_mods); @@ -463,8 +467,6 @@ where ); } - let first_error = deterministic_errors.first().cloned(); - let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; self.inputs @@ -509,7 +511,30 @@ where .flush_metrics_to_store(&logger, block_ptr, self.inputs.deployment.id) .non_deterministic()?; - Ok(has_errors) + if has_errors { + self.maybe_cancel()?; + } + + Ok(()) + } + + /// Cancel the subgraph if `disable_fail_fast` is not set and it is not + /// synced + fn maybe_cancel(&self) -> Result<(), ProcessingError> { + // To prevent a buggy pending version from replacing a current version, if errors are + // present the subgraph will be unassigned. + let store = &self.inputs.store; + if !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() { + store + .unassign_subgraph() + .map_err(|e| ProcessingError::Unknown(e.into()))?; + + // Use `Canceled` to avoiding setting the subgraph health to failed, an error was + // just transacted so it will be already be set to unhealthy. + Err(ProcessingError::Canceled.into()) + } else { + Ok(()) + } } /// Processes a block and returns the updated context and a boolean flag indicating @@ -795,31 +820,17 @@ where .persisted_data_sources .extend(persisted_off_chain_data_sources); - let has_errors = self - .transact_block_state( - &logger, - block_ptr.clone(), - firehose_cursor.clone(), - block.timestamp(), - block_state, - proof_of_indexing, - offchain_mods, - processed_offchain_data_sources, - ) - .await?; - - // To prevent a buggy pending version from replacing a current version, if errors are - // present the subgraph will be unassigned. - let store = &self.inputs.store; - if has_errors && !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() { - store - .unassign_subgraph() - .map_err(|e| ProcessingError::Unknown(e.into()))?; - - // Use `Canceled` to avoiding setting the subgraph health to failed, an error was - // just transacted so it will be already be set to unhealthy. - return Err(ProcessingError::Canceled); - } + self.transact_block_state( + &logger, + block_ptr.clone(), + firehose_cursor.clone(), + block.timestamp(), + block_state, + proof_of_indexing, + offchain_mods, + processed_offchain_data_sources, + ) + .await?; match needs_restart { true => Ok(Action::Restart), @@ -1389,31 +1400,17 @@ where } }; - let has_errors = self - .transact_block_state( - &logger, - block_ptr.clone(), - cursor.clone(), - block_time, - block_state, - proof_of_indexing, - vec![], - vec![], - ) - .await?; - - // To prevent a buggy pending version from replacing a current version, if errors are - // present the subgraph will be unassigned. - let store = &self.inputs.store; - if has_errors && !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() { - store - .unassign_subgraph() - .map_err(|e| ProcessingError::Unknown(e.into()))?; - - // Use `Canceled` to avoiding setting the subgraph health to failed, an error was - // just transacted so it will be already be set to unhealthy. - return Err(ProcessingError::Canceled.into()); - }; + self.transact_block_state( + &logger, + block_ptr.clone(), + cursor.clone(), + block_time, + block_state, + proof_of_indexing, + vec![], + vec![], + ) + .await?; Ok(Action::Continue) } diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 11b473a878d..c6d3f0c7e85 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -131,10 +131,6 @@ impl BlockState { write_capacity_remaining.saturating_sub(other.write_capacity_remaining); } - pub fn has_errors(&self) -> bool { - !self.deterministic_errors.is_empty() - } - pub fn has_created_data_sources(&self) -> bool { assert!(!self.in_handler); !self.created_data_sources.is_empty() From c3194635c7ef07832671f2ca234ce5200cc43cc3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 17:43:38 -0700 Subject: [PATCH 7/9] core: Factor refetching blocks into a method --- core/src/subgraph/runner.rs | 66 ++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index b56d2289b3f..c6347aba664 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -689,29 +689,18 @@ where vec![], )); - let block: Arc = if self.inputs.chain.is_refetch_block_required() { - let cur = firehose_cursor.clone(); - let log = logger.cheap_clone(); - let chain = self.inputs.chain.cheap_clone(); - Arc::new( - retry( - "refetch firehose block after dynamic datasource was added", - &logger, - ) - .limit(5) - .no_timeout() - .run(move || { - let cur = cur.clone(); - let log = log.cheap_clone(); - let chain = chain.cheap_clone(); - async move { chain.refetch_firehose_block(&log, cur).await } - }) - .await - .non_deterministic()?, - ) - } else { - block.cheap_clone() - }; + // TODO: We have to pass a reference to `block` to + // `refetch_block`, otherwise the call to + // handle_offchain_triggers below gets an error that `block` + // has moved. That is extremely fishy since it means that + // `handle_offchain_triggers` uses the non-refetched block + // + // It's also not clear why refetching needs to happen inside + // the loop; will firehose really return something diffrent + // each time even though the cursor doesn't change? + let block = self + .refetch_block(&logger, &block, &firehose_cursor) + .await?; // Reprocess the triggers from this block that match the new data sources let block_with_triggers = self @@ -838,6 +827,37 @@ where } } + /// Refetch the block if it that is needed. Otherwise return the block as is. + async fn refetch_block( + &mut self, + logger: &Logger, + block: &Arc, + firehose_cursor: &FirehoseCursor, + ) -> Result, ProcessingError> { + if !self.inputs.chain.is_refetch_block_required() { + return Ok(block.cheap_clone()); + } + + let cur = firehose_cursor.clone(); + let log = logger.cheap_clone(); + let chain = self.inputs.chain.cheap_clone(); + let block = retry( + "refetch firehose block after dynamic datasource was added", + logger, + ) + .limit(5) + .no_timeout() + .run(move || { + let cur = cur.clone(); + let log = log.cheap_clone(); + let chain = chain.cheap_clone(); + async move { chain.refetch_firehose_block(&log, cur).await } + }) + .await + .non_deterministic()?; + Ok(Arc::new(block)) + } + async fn process_wasm_block( &mut self, proof_of_indexing: &SharedProofOfIndexing, From f4232ec640da630f2830f9efc83898a0b45cc69b Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 17:48:29 -0700 Subject: [PATCH 8/9] core: A little more noise reduction --- core/src/subgraph/runner.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index c6347aba664..925d61101af 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -544,6 +544,14 @@ where block: BlockWithTriggers, firehose_cursor: FirehoseCursor, ) -> Result { + fn log_triggers_found(logger: &Logger, triggers: &[Trigger]) { + if triggers.len() == 1 { + info!(logger, "1 trigger found in this block"); + } else if triggers.len() > 1 { + info!(logger, "{} triggers found in this block", triggers.len()); + } + } + let triggers = block.trigger_data; let block = Arc::new(block.block); let block_ptr = block.ptr(); @@ -711,19 +719,7 @@ where .non_deterministic()?; let triggers = block_with_triggers.trigger_data; - - if triggers.len() == 1 { - info!( - &logger, - "1 trigger found in this block for the new data sources" - ); - } else if triggers.len() > 1 { - info!( - &logger, - "{} triggers found in this block for the new data sources", - triggers.len() - ); - } + log_triggers_found(&logger, &triggers); // Add entity operations for the new data sources to the block state // and add runtimes for the data sources to the subgraph instance. From 2f365c3d5df1fad750421fde6ba2446ab5f43d84 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 18:31:44 -0700 Subject: [PATCH 9/9] core: Reduce some noise around match_and_decode_many --- core/src/subgraph/runner.rs | 57 +++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 925d61101af..fcd8fa30fbb 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -16,6 +16,7 @@ use graph::blockchain::{ }; use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource}; use graph::components::subgraph::InstanceDSTemplate; +use graph::components::trigger_processor::RunnableTriggers; use graph::components::{ store::ModificationsAndCache, subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing}, @@ -537,6 +538,33 @@ where } } + async fn match_and_decode_many<'a, F>( + &'a self, + logger: &Logger, + block: &Arc, + triggers: Vec>, + hosts_filter: F, + ) -> Result>, MappingError> + where + F: Fn(&TriggerData) -> Box + Send + 'a>, + { + let triggers = triggers.into_iter().map(|t| match t { + Trigger::Chain(t) => TriggerData::Onchain(t), + Trigger::Subgraph(t) => TriggerData::Subgraph(t), + }); + + self.ctx + .decoder + .match_and_decode_many( + &logger, + &block, + triggers, + hosts_filter, + &self.metrics.subgraph, + ) + .await + } + /// Processes a block and returns the updated context and a boolean flag indicating /// whether new dynamic data sources have been added to the subgraph. async fn process_block( @@ -584,18 +612,7 @@ where // Match and decode all triggers in the block let hosts_filter = |trigger: &TriggerData| self.ctx.instance.hosts_for_trigger(trigger); let match_res = self - .ctx - .decoder - .match_and_decode_many( - &logger, - &block, - triggers.into_iter().map(|t| match t { - Trigger::Chain(t) => TriggerData::Onchain(t), - Trigger::Subgraph(t) => TriggerData::Subgraph(t), - }), - hosts_filter, - &self.metrics.subgraph, - ) + .match_and_decode_many(&logger, &block, triggers, hosts_filter) .await; // Process events one after the other, passing in entity operations @@ -727,19 +744,11 @@ where // Process the triggers in each host in the same order the // corresponding data sources have been created. + let hosts_filter = |_: &'_ TriggerData| -> Box + Send> { + Box::new(runtime_hosts.iter().map(Arc::as_ref)) + }; let match_res: Result, _> = self - .ctx - .decoder - .match_and_decode_many( - &logger, - &block, - triggers.into_iter().map(|t| match t { - Trigger::Chain(t) => TriggerData::Onchain(t), - Trigger::Subgraph(_) => unreachable!(), // TODO(krishna): Re-evaulate this - }), - |_| Box::new(runtime_hosts.iter().map(Arc::as_ref)), - &self.metrics.subgraph, - ) + .match_and_decode_many(&logger, &block, triggers, hosts_filter) .await; let mut res = Ok(block_state);