From 34984d5ea2cc677f850242285f99ee833a1fe987 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 22 Sep 2025 14:24:56 -0700 Subject: [PATCH 1/9] core, node: Asyncify starting all subgraphs --- core/src/subgraph/registrar.rs | 113 ++++++++++++++++----------------- node/src/launcher.rs | 6 +- 2 files changed, 57 insertions(+), 62 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 87a7ebe4663..ca1eaf58b2a 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -11,16 +11,16 @@ use graph::components::subgraph::Settings; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::subgraph::Graft; use graph::data::value::Word; -use graph::futures01; -use graph::futures01::future; use graph::futures01::stream; use graph::futures01::Future; use graph::futures01::Stream; +use graph::futures03; use graph::futures03::compat::Future01CompatExt; use graph::futures03::compat::Stream01CompatExt; use graph::futures03::future::FutureExt; use graph::futures03::future::TryFutureExt; use graph::futures03::stream::TryStreamExt; +use graph::futures03::StreamExt; use graph::prelude::{ CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, SubgraphRegistrar as SubgraphRegistrarTrait, *, @@ -80,7 +80,7 @@ where } } - pub fn start(&self) -> impl Future { + pub async fn start(self: Arc) -> Result<(), Error> { let logger_clone1 = self.logger.clone(); let logger_clone2 = self.logger.clone(); let provider = self.provider.clone(); @@ -113,37 +113,37 @@ where let assignment_event_stream = self.assignment_events(); // Deploy named subgraphs found in store - self.start_assigned_subgraphs().and_then(move |()| { - // Spawn a task to handle assignment events. - // Blocking due to store interactions. Won't be blocking after #905. - graph::spawn_blocking( - assignment_event_stream - .compat() - .map_err(SubgraphAssignmentProviderError::Unknown) - .cancelable(&assignment_event_stream_cancel_handle) + self.start_assigned_subgraphs().await?; + + // Spawn a task to handle assignment events. + // Blocking due to store interactions. Won't be blocking after #905. + graph::spawn_blocking( + assignment_event_stream + .compat() + .map_err(SubgraphAssignmentProviderError::Unknown) + .cancelable(&assignment_event_stream_cancel_handle) + .compat() + .for_each(move |assignment_event| { + assert_eq!(assignment_event.node_id(), &node_id); + handle_assignment_event( + assignment_event, + provider.clone(), + logger_clone1.clone(), + ) + .boxed() .compat() - .for_each(move |assignment_event| { - assert_eq!(assignment_event.node_id(), &node_id); - handle_assignment_event( - assignment_event, - provider.clone(), - logger_clone1.clone(), - ) - .boxed() - .compat() - }) - .map_err(move |e| match e { - CancelableError::Cancel => panic!("assignment event stream canceled"), - CancelableError::Error(e) => { - error!(logger_clone2, "Assignment event stream failed: {}", e); - panic!("assignment event stream failed: {}", e); - } - }) - .compat(), - ); + }) + .map_err(move |e| match e { + CancelableError::Cancel => panic!("assignment event stream canceled"), + CancelableError::Error(e) => { + error!(logger_clone2, "Assignment event stream failed: {}", e); + panic!("assignment event stream failed: {}", e); + } + }) + .compat(), + ); - Ok(()) - }) + Ok(()) } pub fn assignment_events(&self) -> impl Stream + Send { @@ -220,36 +220,33 @@ where .flatten() } - fn start_assigned_subgraphs(&self) -> impl Future { + async fn start_assigned_subgraphs(&self) -> Result<(), Error> { let provider = self.provider.clone(); let logger = self.logger.clone(); let node_id = self.node_id.clone(); - future::result(self.store.active_assignments(&self.node_id)) - .map_err(|e| anyhow!("Error querying subgraph assignments: {}", e)) - .and_then(move |deployments| { - // This operation should finish only after all subgraphs are - // started. We wait for the spawned tasks to complete by giving - // each a `sender` and waiting for all of them to be dropped, so - // the receiver terminates without receiving anything. - let deployments = HashSet::::from_iter(deployments); - let deployments_len = deployments.len(); - let (sender, receiver) = futures01::sync::mpsc::channel::<()>(1); - for id in deployments { - let sender = sender.clone(); - let logger = logger.clone(); - - graph::spawn( - start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)), - ); - } - drop(sender); - receiver.collect().then(move |_| { - info!(logger, "Started all assigned subgraphs"; - "count" => deployments_len, "node_id" => &node_id); - future::ok(()) - }) - }) + let deployments = self + .store + .active_assignments(&self.node_id) + .map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))?; + // This operation should finish only after all subgraphs are + // started. We wait for the spawned tasks to complete by giving + // each a `sender` and waiting for all of them to be dropped, so + // the receiver terminates without receiving anything. + let deployments = HashSet::::from_iter(deployments); + let deployments_len = deployments.len(); + let (sender, receiver) = futures03::channel::mpsc::channel::<()>(1); + for id in deployments { + let sender = sender.clone(); + let logger = logger.clone(); + + graph::spawn(start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender))); + } + drop(sender); + let _: Vec<_> = receiver.collect().await; + info!(logger, "Started all assigned subgraphs"; + "count" => deployments_len, "node_id" => &node_id); + Ok(()) } } diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 1776e0feba3..8855ef1a954 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -1,8 +1,6 @@ use anyhow::Result; use git_testament::{git_testament, render_testament}; -use graph::futures01::Future as _; -use graph::futures03::compat::Future01CompatExt; use graph::futures03::future::TryFutureExt; use crate::config::Config; @@ -524,9 +522,9 @@ pub async fn run( graph::spawn( subgraph_registrar + .cheap_clone() .start() - .map_err(|e| panic!("failed to initialize subgraph provider {}", e)) - .compat(), + .map_err(|e| panic!("failed to initialize subgraph provider {}", e)), ); // Start admin JSON-RPC server. From a7e925583f39c26444aa47cad68b78d0111691d1 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 22 Sep 2025 14:33:21 -0700 Subject: [PATCH 2/9] graph, store: Remove unused facilities for filtering StoreEventStreams --- graph/src/components/store/mod.rs | 33 ++---------------------------- graph/src/lib.rs | 4 ++-- store/postgres/src/store_events.rs | 2 +- 3 files changed, 5 insertions(+), 34 deletions(-) diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 585df5945f1..076f31fca4a 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -18,7 +18,7 @@ use strum_macros::Display; pub use traits::*; pub use write::Batch; -use futures01::{Async, Stream}; +use futures01::Stream; use serde::{Deserialize, Serialize}; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashSet}; @@ -633,37 +633,8 @@ impl PartialEq for StoreEvent { } } -/// A `StoreEventStream` produces the `StoreEvents`. Various filters can be applied -/// to it to reduce which and how many events are delivered by the stream. -pub struct StoreEventStream { - source: S, -} - /// A boxed `StoreEventStream` -pub type StoreEventStreamBox = - StoreEventStream, Error = ()> + Send>>; - -impl Stream for StoreEventStream -where - S: Stream, Error = ()> + Send, -{ - type Item = Arc; - type Error = (); - - fn poll(&mut self) -> Result>, Self::Error> { - self.source.poll() - } -} - -impl StoreEventStream -where - S: Stream, Error = ()> + Send + 'static, -{ - // Create a new `StoreEventStream` from another such stream - pub fn new(source: S) -> Self { - StoreEventStream { source } - } -} +pub type StoreEventStreamBox = Box, Error = ()> + Send>; /// An entity operation that can be transacted into the store. #[derive(Clone, Debug, PartialEq)] diff --git a/graph/src/lib.rs b/graph/src/lib.rs index ee288c5729a..3ebeeeb24d8 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -131,8 +131,8 @@ pub mod prelude { EntityCollection, EntityFilter, EntityLink, EntityOperation, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityQuery, EntityRange, EntityWindow, EthereumCallCache, ParentLink, PartialBlockPtr, PoolWaitStats, QueryStore, - QueryStoreManager, StoreError, StoreEvent, StoreEventStream, StoreEventStreamBox, - SubgraphStore, UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, + QueryStoreManager, StoreError, StoreEvent, StoreEventStreamBox, SubgraphStore, + UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, }; pub use crate::components::subgraph::{ BlockState, HostMetrics, InstanceDSTemplateInfo, RuntimeHost, RuntimeHostBuilder, diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index b9da04f30d7..0a006aa87e1 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -221,6 +221,6 @@ impl SubscriptionManagerTrait for SubscriptionManager { self.subscriptions.write().unwrap().insert(id, sender); // Return the subscription ID and entity change stream - StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat())) + Box::new(ReceiverStream::new(receiver).map(Ok).compat()) } } From f62b6fe5146bfe9ae73ed446b8cfe11011885302 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 24 Sep 2025 09:09:52 -0700 Subject: [PATCH 3/9] all: Modernize the async code for starting subgraphs --- core/src/subgraph/registrar.rs | 220 +++++++++++++++------------- graph/src/components/store/mod.rs | 4 +- node/src/manager/commands/listen.rs | 21 +-- store/postgres/src/store_events.rs | 2 +- 4 files changed, 127 insertions(+), 120 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index ca1eaf58b2a..e649224b9f1 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -11,15 +11,12 @@ use graph::components::subgraph::Settings; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::subgraph::Graft; use graph::data::value::Word; -use graph::futures01::stream; -use graph::futures01::Future; -use graph::futures01::Stream; use graph::futures03; -use graph::futures03::compat::Future01CompatExt; -use graph::futures03::compat::Stream01CompatExt; use graph::futures03::future::FutureExt; use graph::futures03::future::TryFutureExt; +use graph::futures03::stream; use graph::futures03::stream::TryStreamExt; +use graph::futures03::Stream; use graph::futures03::StreamExt; use graph::prelude::{ CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, @@ -81,13 +78,6 @@ where } pub async fn start(self: Arc) -> Result<(), Error> { - let logger_clone1 = self.logger.clone(); - let logger_clone2 = self.logger.clone(); - let provider = self.provider.clone(); - let node_id = self.node_id.clone(); - let assignment_event_stream_cancel_handle = - self.assignment_event_stream_cancel_guard.handle(); - // The order of the following three steps is important: // - Start assignment event stream // - Read assignments table and start assigned subgraphs @@ -109,114 +99,138 @@ where // The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning // (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent. + fn panic_on_cancel( + logger: &Logger, + e: CancelableError, + ) -> ! { + match e { + CancelableError::Cancel => { + panic!("assignment event stream canceled") + } + CancelableError::Error(e) => { + error!(logger, "Assignment event stream failed: {}", e); + panic!("assignment event stream failed: {}", e); + } + } + } + // Start event stream - let assignment_event_stream = self.assignment_events(); + let assignment_event_stream = self.cheap_clone().assignment_events().await; // Deploy named subgraphs found in store self.start_assigned_subgraphs().await?; // Spawn a task to handle assignment events. // Blocking due to store interactions. Won't be blocking after #905. - graph::spawn_blocking( - assignment_event_stream - .compat() - .map_err(SubgraphAssignmentProviderError::Unknown) + let assignment_event_stream_cancel_handle = + self.assignment_event_stream_cancel_guard.handle(); + + let fut = + Box::pin(assignment_event_stream.map_err(SubgraphAssignmentProviderError::Unknown)) .cancelable(&assignment_event_stream_cancel_handle) - .compat() - .for_each(move |assignment_event| { - assert_eq!(assignment_event.node_id(), &node_id); - handle_assignment_event( - assignment_event, - provider.clone(), - logger_clone1.clone(), - ) - .boxed() - .compat() - }) - .map_err(move |e| match e { - CancelableError::Cancel => panic!("assignment event stream canceled"), - CancelableError::Error(e) => { - error!(logger_clone2, "Assignment event stream failed: {}", e); - panic!("assignment event stream failed: {}", e); + .for_each({ + move |event| { + let this = self.cheap_clone(); + let provider = self.provider.clone(); + async move { + if let Err(e) = match event { + Ok(event) => { + assert_eq!(event.node_id(), &this.node_id); + handle_assignment_event(event, provider.clone(), &this.logger) + .await + } + Err(e) => Err(e), + } { + panic_on_cancel(&this.logger, e); + }; + } } - }) - .compat(), - ); + }); + graph::spawn_blocking(fut); Ok(()) } - pub fn assignment_events(&self) -> impl Stream + Send { - let store = self.store.clone(); - let node_id = self.node_id.clone(); - let logger = self.logger.clone(); + /// Maps an assignment change to an assignment event by checking the + /// current state in the database, ignoring changes that do not affect + /// this node or do not require anything to change. + fn map_assignment(&self, change: AssignmentChange) -> Result, Error> { + let (deployment, operation) = change.into_parts(); + trace!(self.logger, "Received assignment change"; + "deployment" => %deployment, + "operation" => format!("{:?}", operation), + ); + + match operation { + AssignmentOperation::Set => { + let assigned = self + .store + .assignment_status(&deployment) + .map_err(|e| anyhow!("Failed to get subgraph assignment entity: {}", e))?; + + let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string())); + if let Some((assigned, is_paused)) = assigned { + if &assigned == &self.node_id { + if is_paused { + // Subgraph is paused, so we don't start it + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); + return Ok(None); + } + + // Start subgraph on this node + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); + Ok(Some(AssignmentEvent::Add { + deployment, + node_id: self.node_id.clone(), + })) + } else { + // Ensure it is removed from this node + debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); + Ok(Some(AssignmentEvent::Remove { + deployment, + node_id: self.node_id.clone(), + })) + } + } else { + // Was added/updated, but is now gone. + debug!(self.logger, "Deployment assignee not found in database"; "action" => "ignore"); + Ok(None) + } + } + AssignmentOperation::Removed => { + // Send remove event without checking node ID. + // If node ID does not match, then this is a no-op when handled in + // assignment provider. + Ok(Some(AssignmentEvent::Remove { + deployment, + node_id: self.node_id.clone(), + })) + } + } + } + + pub async fn assignment_events( + self: Arc, + ) -> impl Stream> + Send { self.subscription_manager .subscribe() - .map_err(|()| anyhow!("Entity change stream failed")) - .map(|event| { - let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect(); - stream::iter_ok(changes) - }) + .map(|event| futures03::stream::iter(event.changes.clone())) .flatten() - .and_then( - move |(deployment, operation)| -> Result + Send>, _> { - trace!(logger, "Received assignment change"; - "deployment" => %deployment, - "operation" => format!("{:?}", operation), - ); - - match operation { - AssignmentOperation::Set => { - store - .assignment_status(&deployment) - .map_err(|e| { - anyhow!("Failed to get subgraph assignment entity: {}", e) - }) - .map(|assigned| -> Box + Send> { - let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string())); - if let Some((assigned,is_paused)) = assigned { - if assigned == node_id { - - if is_paused{ - // Subgraph is paused, so we don't start it - debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); - return Box::new(stream::empty()); - } - - // Start subgraph on this node - debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); - Box::new(stream::once(Ok(AssignmentEvent::Add { - deployment, - node_id: node_id.clone(), - }))) - } else { - // Ensure it is removed from this node - debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); - Box::new(stream::once(Ok(AssignmentEvent::Remove { - deployment, - node_id: node_id.clone(), - }))) - } - } else { - // Was added/updated, but is now gone. - debug!(logger, "Deployment assignee not found in database"; "action" => "ignore"); - Box::new(stream::empty()) - } - }) - } - AssignmentOperation::Removed => { - // Send remove event without checking node ID. - // If node ID does not match, then this is a no-op when handled in - // assignment provider. - Ok(Box::new(stream::once(Ok(AssignmentEvent::Remove { - deployment, - node_id: node_id.clone(), - })))) + .then({ + let this = self.cheap_clone(); + move |change| { + let this = this.cheap_clone(); + + async move { + match this.map_assignment(change) { + Ok(Some(event)) => stream::once(futures03::future::ok(event)).boxed(), + Ok(None) => stream::empty().boxed(), + Err(e) => stream::once(futures03::future::err(e)).boxed(), } } - }, - ) + } + }) .flatten() } @@ -235,6 +249,8 @@ where // the receiver terminates without receiving anything. let deployments = HashSet::::from_iter(deployments); let deployments_len = deployments.len(); + debug!(logger, "Starting all assigned subgraphs"; + "count" => deployments_len, "node_id" => &node_id); let (sender, receiver) = futures03::channel::mpsc::channel::<()>(1); for id in deployments { let sender = sender.clone(); @@ -442,7 +458,7 @@ where async fn handle_assignment_event( event: AssignmentEvent, provider: Arc, - logger: Logger, + logger: &Logger, ) -> Result<(), CancelableError> { let logger = logger.clone(); diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 076f31fca4a..f3872b16580 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -10,6 +10,7 @@ use diesel::sql_types::Integer; use diesel_derives::{AsExpression, FromSqlRow}; pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache}; use slog::Logger; +use tokio_stream::wrappers::ReceiverStream; pub use super::subgraph::Entity; pub use err::{StoreError, StoreResult}; @@ -18,7 +19,6 @@ use strum_macros::Display; pub use traits::*; pub use write::Batch; -use futures01::Stream; use serde::{Deserialize, Serialize}; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashSet}; @@ -634,7 +634,7 @@ impl PartialEq for StoreEvent { } /// A boxed `StoreEventStream` -pub type StoreEventStreamBox = Box, Error = ()> + Send>; +pub type StoreEventStreamBox = ReceiverStream>; /// An entity operation that can be transacted into the store. #[derive(Clone, Debug, PartialEq)] diff --git a/node/src/manager/commands/listen.rs b/node/src/manager/commands/listen.rs index 69c3ff93cbf..d53dfaae455 100644 --- a/node/src/manager/commands/listen.rs +++ b/node/src/manager/commands/listen.rs @@ -1,8 +1,8 @@ use std::io::Write; use std::sync::Arc; -use graph::futures01::Stream as _; -use graph::futures03::compat::Future01CompatExt; +use graph::futures03::{future, StreamExt}; + use graph::{ components::store::SubscriptionManager as _, prelude::{serde_json, Error}, @@ -12,25 +12,16 @@ use graph_store_postgres::SubscriptionManager; async fn listen(mgr: Arc) -> Result<(), Error> { let events = mgr.subscribe(); println!("press ctrl-c to stop"); - let res = events - .inspect(move |event| { - serde_json::to_writer_pretty(std::io::stdout(), event) + events + .for_each(move |event| { + serde_json::to_writer_pretty(std::io::stdout(), &event) .expect("event can be serialized to JSON"); writeln!(std::io::stdout()).unwrap(); std::io::stdout().flush().unwrap(); + future::ready(()) }) - .collect() - .compat() .await; - match res { - Ok(_) => { - println!("stream finished") - } - Err(()) => { - eprintln!("stream failed") - } - } Ok(()) } diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 0a006aa87e1..300022d200e 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -221,6 +221,6 @@ impl SubscriptionManagerTrait for SubscriptionManager { self.subscriptions.write().unwrap().insert(id, sender); // Return the subscription ID and entity change stream - Box::new(ReceiverStream::new(receiver).map(Ok).compat()) + ReceiverStream::new(receiver) } } From a5ef1bd0192266fa4955924954b2185f34731548 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 25 Sep 2025 15:15:40 -0700 Subject: [PATCH 4/9] core, graph,store: Asyncify SubgraphStore.active_assignments --- core/src/subgraph/registrar.rs | 1 + graph/src/components/store/traits.rs | 3 ++- store/postgres/src/primary.rs | 36 ++++++++++++++++++++++++---- store/postgres/src/subgraph_store.rs | 6 ++++- 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index e649224b9f1..07b646df3d8 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -242,6 +242,7 @@ where let deployments = self .store .active_assignments(&self.node_id) + .await .map_err(|e| anyhow!("Error querying subgraph assignments: {}", e))?; // This operation should finish only after all subgraphs are // started. We wait for the spawned tasks to complete by giving diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index eae3a1b0b4c..1348977852a 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -129,7 +129,8 @@ pub trait SubgraphStore: Send + Sync + 'static { fn assignments(&self, node: &NodeId) -> Result, StoreError>; /// Returns assignments that are not paused - fn active_assignments(&self, node: &NodeId) -> Result, StoreError>; + async fn active_assignments(&self, node: &NodeId) + -> Result, StoreError>; /// Return `true` if a subgraph `name` exists, regardless of whether the /// subgraph has any deployments attached to it diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 4ed8bada0a4..1f8d39e3de5 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -30,6 +30,7 @@ use diesel::{ Connection as _, }; use graph::{ + cheap_clone::CheapClone, components::store::DeploymentLocator, data::{ store::scalar::ToPrimitive, @@ -1886,8 +1887,9 @@ pub fn is_empty(conn: &mut PgConnection) -> Result { /// a query returns either success or anything but a /// `Err(StoreError::DatabaseUnavailable)`. This only works for tables that /// are mirrored through `refresh_tables` +#[derive(Clone, CheapClone)] pub struct Mirror { - pools: Vec, + pools: Arc>, } impl Mirror { @@ -1917,6 +1919,7 @@ impl Mirror { pools.push(pool.clone()); pools }); + let pools = Arc::new(pools); Mirror { pools } } @@ -1925,7 +1928,7 @@ impl Mirror { /// used for non-critical uses like command line tools pub fn primary_only(primary: ConnectionPool) -> Mirror { Mirror { - pools: vec![primary], + pools: Arc::new(vec![primary]), } } @@ -1940,7 +1943,7 @@ impl Mirror { mut f: impl 'a + FnMut(&mut PooledConnection>) -> Result, ) -> Result { - for pool in &self.pools { + for pool in self.pools.as_ref() { let mut conn = match pool.get() { Ok(conn) => conn, Err(StoreError::DatabaseUnavailable) => continue, @@ -1955,6 +1958,27 @@ impl Mirror { Err(StoreError::DatabaseUnavailable) } + /// An async version of `read` that spawns a blocking task to do the + /// actual work. This is useful when you want to call `read` from an + /// async context + pub(crate) async fn read_async(&self, mut f: F) -> Result + where + T: 'static + Send, + F: 'static + + Send + + FnMut(&mut PooledConnection>) -> Result, + { + let this = self.cheap_clone(); + let res = graph::spawn_blocking(async move { this.read(|conn| f(conn)) }).await; + match res { + Ok(v) => v, + Err(e) => Err(internal_error!( + "spawn_blocking in read_async failed: {}", + e + )), + } + } + /// Refresh the contents of mirrored tables from the primary (through /// the fdw mapping that `ForeignServer` establishes) pub(crate) fn refresh_tables( @@ -2050,8 +2074,10 @@ impl Mirror { self.read(|conn| queries::assignments(conn, node)) } - pub fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { - self.read(|conn| queries::active_assignments(conn, node)) + pub async fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { + let node = node.clone(); + self.read_async(move |conn| queries::active_assignments(conn, &node)) + .await } pub fn assigned_node(&self, site: &Site) -> Result, StoreError> { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 2cb2df8a0d6..47ce34e6873 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1469,9 +1469,13 @@ impl SubgraphStoreTrait for SubgraphStore { .map(|sites| sites.iter().map(|site| site.into()).collect()) } - fn active_assignments(&self, node: &NodeId) -> Result, StoreError> { + async fn active_assignments( + &self, + node: &NodeId, + ) -> Result, StoreError> { self.mirror .active_assignments(node) + .await .map(|sites| sites.iter().map(|site| site.into()).collect()) } From eff3e19a6fa10f79f1d25b964bad2e68e3e5baff Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 25 Sep 2025 15:18:32 -0700 Subject: [PATCH 5/9] core, graph, store: Asyncify SubgraphStore.assignment_status --- core/src/subgraph/registrar.rs | 11 +++++++---- graph/src/components/store/traits.rs | 2 +- store/postgres/src/primary.rs | 8 ++++++-- store/postgres/src/subgraph_store.rs | 4 ++-- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 07b646df3d8..2f527b40128 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -121,7 +121,6 @@ where self.start_assigned_subgraphs().await?; // Spawn a task to handle assignment events. - // Blocking due to store interactions. Won't be blocking after #905. let assignment_event_stream_cancel_handle = self.assignment_event_stream_cancel_guard.handle(); @@ -147,14 +146,17 @@ where } }); - graph::spawn_blocking(fut); + graph::spawn(fut); Ok(()) } /// Maps an assignment change to an assignment event by checking the /// current state in the database, ignoring changes that do not affect /// this node or do not require anything to change. - fn map_assignment(&self, change: AssignmentChange) -> Result, Error> { + async fn map_assignment( + &self, + change: AssignmentChange, + ) -> Result, Error> { let (deployment, operation) = change.into_parts(); trace!(self.logger, "Received assignment change"; @@ -167,6 +169,7 @@ where let assigned = self .store .assignment_status(&deployment) + .await .map_err(|e| anyhow!("Failed to get subgraph assignment entity: {}", e))?; let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string())); @@ -223,7 +226,7 @@ where let this = this.cheap_clone(); async move { - match this.map_assignment(change) { + match this.map_assignment(change).await { Ok(Some(event)) => stream::once(futures03::future::ok(event)).boxed(), Ok(None) => stream::empty().boxed(), Err(e) => stream::once(futures03::future::err(e)).boxed(), diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 1348977852a..f29c66f4784 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -121,7 +121,7 @@ pub trait SubgraphStore: Send + Sync + 'static { /// the subgraph is assigned to, and `is_paused` is true if the /// subgraph is paused. /// Returns None if the deployment does not exist. - fn assignment_status( + async fn assignment_status( &self, deployment: &DeploymentLocator, ) -> Result, StoreError>; diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 1f8d39e3de5..a92652b54aa 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -2088,8 +2088,12 @@ impl Mirror { /// the subgraph is assigned to, and `is_paused` is true if the /// subgraph is paused. /// Returns None if the deployment does not exist. - pub fn assignment_status(&self, site: &Site) -> Result, StoreError> { - self.read(|conn| queries::assignment_status(conn, site)) + pub async fn assignment_status( + &self, + site: Arc, + ) -> Result, StoreError> { + self.read_async(move |conn| queries::assignment_status(conn, &site)) + .await } pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result, StoreError> { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 47ce34e6873..7f5993735c2 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1455,12 +1455,12 @@ impl SubgraphStoreTrait for SubgraphStore { /// the subgraph is assigned to, and `is_paused` is true if the /// subgraph is paused. /// Returns None if the deployment does not exist. - fn assignment_status( + async fn assignment_status( &self, deployment: &DeploymentLocator, ) -> Result, StoreError> { let site = self.find_site(deployment.id.into())?; - self.mirror.assignment_status(site.as_ref()) + self.mirror.assignment_status(site).await } fn assignments(&self, node: &NodeId) -> Result, StoreError> { From a4413395c89b2669b77e93f2c209eb1d42d38171 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 25 Sep 2025 15:38:20 -0700 Subject: [PATCH 6/9] all: Fold the reachable parts of registrar::start_subgraph into provider --- core/src/subgraph/provider.rs | 20 +++++----- core/src/subgraph/registrar.rs | 45 +++-------------------- graph/src/components/subgraph/provider.rs | 6 +-- node/src/manager/commands/run.rs | 3 +- tests/src/fixture/mod.rs | 8 +--- 5 files changed, 20 insertions(+), 62 deletions(-) diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index a7122442531..e31a516066a 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -1,5 +1,5 @@ -use std::collections::HashSet; use std::sync::Mutex; +use std::{collections::HashSet, time::Instant}; use async_trait::async_trait; @@ -67,28 +67,28 @@ impl SubgraphAssignmentProvider { #[async_trait] impl SubgraphAssignmentProviderTrait for SubgraphAssignmentProvider { - async fn start( - &self, - loc: DeploymentLocator, - stop_block: Option, - ) -> Result<(), SubgraphAssignmentProviderError> { + async fn start(&self, loc: DeploymentLocator, stop_block: Option) { let logger = self.logger_factory.subgraph_logger(&loc); // If subgraph ID already in set if !self.deployment_registry.insert(loc.id) { info!(logger, "Subgraph deployment is already running"); - return Err(SubgraphAssignmentProviderError::AlreadyRunning( - loc.hash.clone(), - )); + return; } + let start_time = Instant::now(); + self.instance_manager .cheap_clone() .start_subgraph(loc, stop_block) .await; - Ok(()) + debug!( + logger, + "Subgraph started"; + "start_ms" => start_time.elapsed().as_millis() + ); } async fn stop( diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 2f527b40128..4236da13ef8 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -1,5 +1,4 @@ use std::collections::HashSet; -use std::time::Instant; use async_trait::async_trait; use graph::blockchain::Blockchain; @@ -12,7 +11,6 @@ use graph::data::subgraph::schema::DeploymentCreate; use graph::data::subgraph::Graft; use graph::data::value::Word; use graph::futures03; -use graph::futures03::future::FutureExt; use graph::futures03::future::TryFutureExt; use graph::futures03::stream; use graph::futures03::stream::TryStreamExt; @@ -238,7 +236,6 @@ where } async fn start_assigned_subgraphs(&self) -> Result<(), Error> { - let provider = self.provider.clone(); let logger = self.logger.clone(); let node_id = self.node_id.clone(); @@ -258,9 +255,12 @@ where let (sender, receiver) = futures03::channel::mpsc::channel::<()>(1); for id in deployments { let sender = sender.clone(); - let logger = logger.clone(); + let provider = self.provider.cheap_clone(); - graph::spawn(start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender))); + graph::spawn(async move { + provider.start(id, None).await; + drop(sender) + }); } drop(sender); let _: Vec<_> = receiver.collect().await; @@ -473,7 +473,7 @@ async fn handle_assignment_event( deployment, node_id: _, } => { - start_subgraph(deployment, provider.clone(), logger).await; + provider.start(deployment, None).await; Ok(()) } AssignmentEvent::Remove { @@ -486,39 +486,6 @@ async fn handle_assignment_event( } } -async fn start_subgraph( - deployment: DeploymentLocator, - provider: Arc, - logger: Logger, -) { - let logger = logger - .new(o!("subgraph_id" => deployment.hash.to_string(), "sgd" => deployment.id.to_string())); - - trace!(logger, "Start subgraph"); - - let start_time = Instant::now(); - let result = provider.start(deployment.clone(), None).await; - - debug!( - logger, - "Subgraph started"; - "start_ms" => start_time.elapsed().as_millis() - ); - - match result { - Ok(()) => (), - Err(SubgraphAssignmentProviderError::AlreadyRunning(_)) => (), - Err(e) => { - // Errors here are likely an issue with the subgraph. - error!( - logger, - "Subgraph instance failed to start"; - "error" => e.to_string() - ); - } - } -} - /// Resolves the subgraph's earliest block async fn resolve_start_block( manifest: &SubgraphManifest, diff --git a/graph/src/components/subgraph/provider.rs b/graph/src/components/subgraph/provider.rs index 5edc22391c8..3c994ee219f 100644 --- a/graph/src/components/subgraph/provider.rs +++ b/graph/src/components/subgraph/provider.rs @@ -5,11 +5,7 @@ use crate::{components::store::DeploymentLocator, prelude::*}; /// Common trait for subgraph providers. #[async_trait] pub trait SubgraphAssignmentProvider: Send + Sync + 'static { - async fn start( - &self, - deployment: DeploymentLocator, - stop_block: Option, - ) -> Result<(), SubgraphAssignmentProviderError>; + async fn start(&self, deployment: DeploymentLocator, stop_block: Option); async fn stop( &self, deployment: DeploymentLocator, diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 38048c55ba3..060341fb6e0 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -216,8 +216,7 @@ pub async fn run( let locator = locate(subgraph_store.as_ref(), &hash)?; - SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block)) - .await?; + SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block)).await; loop { tokio::time::sleep(Duration::from_millis(1000)).await; diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index c969edb2a78..3d0ce287b8d 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -286,8 +286,7 @@ impl TestContext { self.provider .start(self.deployment.clone(), Some(stop_block.number)) - .await - .expect("unable to start subgraph"); + .await; debug!(self.logger, "TEST: syncing to {}", stop_block.number); @@ -305,10 +304,7 @@ impl TestContext { // In case the subgraph has been previously started. self.provider.stop(self.deployment.clone()).await.unwrap(); - self.provider - .start(self.deployment.clone(), None) - .await - .expect("unable to start subgraph"); + self.provider.start(self.deployment.clone(), None).await; wait_for_sync( &self.logger, From b133b2dec5136b09a14c45387158cc6e95e26707 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 25 Sep 2025 15:44:46 -0700 Subject: [PATCH 7/9] all: Simplify return type of SubgraphAssignmentProvider.stop The method would never fil, so no reason to use a `Result` --- core/src/subgraph/provider.rs | 6 +----- core/src/subgraph/registrar.rs | 11 +++-------- graph/src/components/subgraph/provider.rs | 5 +---- tests/src/fixture/mod.rs | 4 ++-- tests/tests/runner_tests.rs | 6 +----- 5 files changed, 8 insertions(+), 24 deletions(-) diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index e31a516066a..2ea4327838b 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -91,15 +91,11 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss ); } - async fn stop( - &self, - deployment: DeploymentLocator, - ) -> Result<(), SubgraphAssignmentProviderError> { + async fn stop(&self, deployment: DeploymentLocator) { // If subgraph ID was in set if self.deployment_registry.remove(&deployment.id) { // Shut down subgraph processing self.instance_manager.stop_subgraph(deployment).await; } - Ok(()) } } diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 4236da13ef8..6fcb8548df0 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -472,18 +472,13 @@ async fn handle_assignment_event( AssignmentEvent::Add { deployment, node_id: _, - } => { - provider.start(deployment, None).await; - Ok(()) - } + } => provider.start(deployment, None).await, AssignmentEvent::Remove { deployment, node_id: _, - } => match provider.stop(deployment).await { - Ok(()) => Ok(()), - Err(e) => Err(CancelableError::Error(e)), - }, + } => provider.stop(deployment).await, } + Ok(()) } /// Resolves the subgraph's earliest block diff --git a/graph/src/components/subgraph/provider.rs b/graph/src/components/subgraph/provider.rs index 3c994ee219f..3e33f6fd5bf 100644 --- a/graph/src/components/subgraph/provider.rs +++ b/graph/src/components/subgraph/provider.rs @@ -6,8 +6,5 @@ use crate::{components::store::DeploymentLocator, prelude::*}; #[async_trait] pub trait SubgraphAssignmentProvider: Send + Sync + 'static { async fn start(&self, deployment: DeploymentLocator, stop_block: Option); - async fn stop( - &self, - deployment: DeploymentLocator, - ) -> Result<(), SubgraphAssignmentProviderError>; + async fn stop(&self, deployment: DeploymentLocator); } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 3d0ce287b8d..362cef37f44 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -282,7 +282,7 @@ impl TestContext { pub async fn start_and_sync_to(&self, stop_block: BlockPtr) { // In case the subgraph has been previously started. - self.provider.stop(self.deployment.clone()).await.unwrap(); + self.provider.stop(self.deployment.clone()).await; self.provider .start(self.deployment.clone(), Some(stop_block.number)) @@ -302,7 +302,7 @@ impl TestContext { pub async fn start_and_sync_to_error(&self, stop_block: BlockPtr) -> SubgraphError { // In case the subgraph has been previously started. - self.provider.stop(self.deployment.clone()).await.unwrap(); + self.provider.stop(self.deployment.clone()).await; self.provider.start(self.deployment.clone(), None).await; diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 880591b8fa3..cd2c059e2dc 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -82,11 +82,7 @@ async fn data_source_revert() -> anyhow::Result<()> { let stop_block = test_ptr(2); base_ctx.start_and_sync_to(stop_block).await; - base_ctx - .provider - .stop(base_ctx.deployment.clone()) - .await - .unwrap(); + base_ctx.provider.stop(base_ctx.deployment.clone()).await; // Test loading data sources from DB. let stop_block = test_ptr(3); From 2e9725e9f5fdf7aba86f1c5afc75d22dd706080e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 25 Sep 2025 15:56:47 -0700 Subject: [PATCH 8/9] core: Simplify handle_assignment_event --- core/src/subgraph/registrar.rs | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 6fcb8548df0..237ede0dbe8 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -130,15 +130,12 @@ where let this = self.cheap_clone(); let provider = self.provider.clone(); async move { - if let Err(e) = match event { + match event { Ok(event) => { assert_eq!(event.node_id(), &this.node_id); - handle_assignment_event(event, provider.clone(), &this.logger) - .await + handle_assignment_event(event, provider.clone()).await } - Err(e) => Err(e), - } { - panic_on_cancel(&this.logger, e); + Err(e) => panic_on_cancel(&this.logger, e), }; } } @@ -462,12 +459,7 @@ where async fn handle_assignment_event( event: AssignmentEvent, provider: Arc, - logger: &Logger, -) -> Result<(), CancelableError> { - let logger = logger.clone(); - - debug!(logger, "Received assignment event: {:?}", event); - +) { match event { AssignmentEvent::Add { deployment, @@ -478,7 +470,6 @@ async fn handle_assignment_event( node_id: _, } => provider.stop(deployment).await, } - Ok(()) } /// Resolves the subgraph's earliest block From 641f4facc078fa67fa5c0127bf5347f593a25417 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 25 Sep 2025 16:25:31 -0700 Subject: [PATCH 9/9] core, graph: Simplify assignment handling We analyze and react to assignment changes directly rather than worrying abut translating one stream into another. With that, We don't need the AssignmentEvent struct anymore. --- core/src/subgraph/registrar.rs | 143 ++++++++++----------------------- graph/src/data/store/mod.rs | 23 ------ graph/src/lib.rs | 2 +- 3 files changed, 42 insertions(+), 126 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 237ede0dbe8..b05ccdf4e33 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -12,8 +12,6 @@ use graph::data::subgraph::Graft; use graph::data::value::Word; use graph::futures03; use graph::futures03::future::TryFutureExt; -use graph::futures03::stream; -use graph::futures03::stream::TryStreamExt; use graph::futures03::Stream; use graph::futures03::StreamExt; use graph::prelude::{ @@ -90,27 +88,12 @@ where // // The discrepancy between the start time of the event stream and the table read can result // in some extraneous events on start up. Examples: - // - The event stream sees an Add event for subgraph A, but the table query finds that + // - The event stream sees an 'set' event for subgraph A, but the table query finds that // subgraph A is already in the table. - // - The event stream sees a Remove event for subgraph B, but the table query finds that + // - The event stream sees a 'removed' event for subgraph B, but the table query finds that // subgraph B has already been removed. - // The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning - // (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent. - - fn panic_on_cancel( - logger: &Logger, - e: CancelableError, - ) -> ! { - match e { - CancelableError::Cancel => { - panic!("assignment event stream canceled") - } - CancelableError::Error(e) => { - error!(logger, "Assignment event stream failed: {}", e); - panic!("assignment event stream failed: {}", e); - } - } - } + // The `change_assignment` function handles these cases by ignoring + // such cases which makes the operations idempotent // Start event stream let assignment_event_stream = self.cheap_clone().assignment_events().await; @@ -118,40 +101,34 @@ where // Deploy named subgraphs found in store self.start_assigned_subgraphs().await?; + let cancel_handle = self.assignment_event_stream_cancel_guard.handle(); + // Spawn a task to handle assignment events. - let assignment_event_stream_cancel_handle = - self.assignment_event_stream_cancel_guard.handle(); - - let fut = - Box::pin(assignment_event_stream.map_err(SubgraphAssignmentProviderError::Unknown)) - .cancelable(&assignment_event_stream_cancel_handle) - .for_each({ - move |event| { - let this = self.cheap_clone(); - let provider = self.provider.clone(); - async move { - match event { - Ok(event) => { - assert_eq!(event.node_id(), &this.node_id); - handle_assignment_event(event, provider.clone()).await - } - Err(e) => panic_on_cancel(&this.logger, e), - }; - } - } - }); + let fut = assignment_event_stream.for_each({ + move |event| { + // The assignment stream should run forever. If it gets + // cancelled, that probably indicates a serious problem and + // we panic + if cancel_handle.is_canceled() { + panic!("assignment event stream canceled"); + } + + let this = self.cheap_clone(); + async move { + this.change_assignment(event).await; + } + } + }); graph::spawn(fut); Ok(()) } - /// Maps an assignment change to an assignment event by checking the - /// current state in the database, ignoring changes that do not affect - /// this node or do not require anything to change. - async fn map_assignment( - &self, - change: AssignmentChange, - ) -> Result, Error> { + /// Start/stop subgraphs as needed, considering the current assignment + /// state in the database, ignoring changes that do not affect this + /// node, do not require anything to change, or for which we can not + /// find the assignment status from the database + async fn change_assignment(&self, change: AssignmentChange) { let (deployment, operation) = change.into_parts(); trace!(self.logger, "Received assignment change"; @@ -161,11 +138,16 @@ where match operation { AssignmentOperation::Set => { - let assigned = self - .store - .assignment_status(&deployment) - .await - .map_err(|e| anyhow!("Failed to get subgraph assignment entity: {}", e))?; + let assigned = match self.store.assignment_status(&deployment).await { + Ok(assigned) => assigned, + Err(e) => { + error!( + self.logger, + "Failed to get subgraph assignment entity"; "deployment" => deployment, "error" => e.to_string() + ); + return; + } + }; let logger = self.logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => self.node_id.to_string())); if let Some((assigned, is_paused)) = assigned { @@ -173,63 +155,36 @@ where if is_paused { // Subgraph is paused, so we don't start it debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); - return Ok(None); + return; } // Start subgraph on this node debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); - Ok(Some(AssignmentEvent::Add { - deployment, - node_id: self.node_id.clone(), - })) + self.provider.start(deployment, None).await; } else { // Ensure it is removed from this node debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); - Ok(Some(AssignmentEvent::Remove { - deployment, - node_id: self.node_id.clone(), - })) + self.provider.stop(deployment).await } } else { // Was added/updated, but is now gone. debug!(self.logger, "Deployment assignee not found in database"; "action" => "ignore"); - Ok(None) } } AssignmentOperation::Removed => { // Send remove event without checking node ID. // If node ID does not match, then this is a no-op when handled in // assignment provider. - Ok(Some(AssignmentEvent::Remove { - deployment, - node_id: self.node_id.clone(), - })) + self.provider.stop(deployment).await; } } } - pub async fn assignment_events( - self: Arc, - ) -> impl Stream> + Send { + pub async fn assignment_events(self: Arc) -> impl Stream + Send { self.subscription_manager .subscribe() .map(|event| futures03::stream::iter(event.changes.clone())) .flatten() - .then({ - let this = self.cheap_clone(); - move |change| { - let this = this.cheap_clone(); - - async move { - match this.map_assignment(change).await { - Ok(Some(event)) => stream::once(futures03::future::ok(event)).boxed(), - Ok(None) => stream::empty().boxed(), - Err(e) => stream::once(futures03::future::err(e)).boxed(), - } - } - } - }) - .flatten() } async fn start_assigned_subgraphs(&self) -> Result<(), Error> { @@ -456,22 +411,6 @@ where } } -async fn handle_assignment_event( - event: AssignmentEvent, - provider: Arc, -) { - match event { - AssignmentEvent::Add { - deployment, - node_id: _, - } => provider.start(deployment, None).await, - AssignmentEvent::Remove { - deployment, - node_id: _, - } => provider.stop(deployment).await, - } -} - /// Resolves the subgraph's earliest block async fn resolve_start_block( manifest: &SubgraphManifest, diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index c8786e9b473..8e5aa8a2f9e 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -1,5 +1,4 @@ use crate::{ - components::store::DeploymentLocator, derive::CacheWeight, prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError}, runtime::gas::{Gas, GasSizeOf}, @@ -83,28 +82,6 @@ impl<'de> de::Deserialize<'de> for NodeId { } } -#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] -#[serde(tag = "type")] -pub enum AssignmentEvent { - Add { - deployment: DeploymentLocator, - node_id: NodeId, - }, - Remove { - deployment: DeploymentLocator, - node_id: NodeId, - }, -} - -impl AssignmentEvent { - pub fn node_id(&self) -> &NodeId { - match self { - AssignmentEvent::Add { node_id, .. } => node_id, - AssignmentEvent::Remove { node_id, .. } => node_id, - } - } -} - /// An entity attribute name is represented as a string. pub type Attribute = String; diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 3ebeeeb24d8..05407603f48 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -152,7 +152,7 @@ pub mod prelude { Query, QueryError, QueryExecutionError, QueryResult, QueryTarget, QueryVariables, }; pub use crate::data::store::scalar::{BigDecimal, BigInt, BigIntSign}; - pub use crate::data::store::{AssignmentEvent, Attribute, Entity, NodeId, Value, ValueType}; + pub use crate::data::store::{Attribute, Entity, NodeId, Value, ValueType}; pub use crate::data::subgraph::schema::SubgraphDeploymentEntity; pub use crate::data::subgraph::{ CreateSubgraphResult, DataSourceContext, DeploymentHash, DeploymentState, Link,