Skip to content
26 changes: 11 additions & 15 deletions core/src/subgraph/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashSet;
use std::sync::Mutex;
use std::{collections::HashSet, time::Instant};

use async_trait::async_trait;

Expand Down Expand Up @@ -67,39 +67,35 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProvider<I> {

#[async_trait]
impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAssignmentProvider<I> {
async fn start(
&self,
loc: DeploymentLocator,
stop_block: Option<BlockNumber>,
) -> Result<(), SubgraphAssignmentProviderError> {
async fn start(&self, loc: DeploymentLocator, stop_block: Option<BlockNumber>) {
let logger = self.logger_factory.subgraph_logger(&loc);

// If subgraph ID already in set
if !self.deployment_registry.insert(loc.id) {
info!(logger, "Subgraph deployment is already running");

return Err(SubgraphAssignmentProviderError::AlreadyRunning(
loc.hash.clone(),
));
return;
}

let start_time = Instant::now();

self.instance_manager
.cheap_clone()
.start_subgraph(loc, stop_block)
.await;

Ok(())
debug!(
logger,
"Subgraph started";
"start_ms" => start_time.elapsed().as_millis()
);
}

async fn stop(
&self,
deployment: DeploymentLocator,
) -> Result<(), SubgraphAssignmentProviderError> {
async fn stop(&self, deployment: DeploymentLocator) {
// If subgraph ID was in set
if self.deployment_registry.remove(&deployment.id) {
// Shut down subgraph processing
self.instance_manager.stop_subgraph(deployment).await;
}
Ok(())
}
}
325 changes: 117 additions & 208 deletions core/src/subgraph/registrar.rs

Large diffs are not rendered by default.

33 changes: 2 additions & 31 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use diesel::sql_types::Integer;
use diesel_derives::{AsExpression, FromSqlRow};
pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache};
use slog::Logger;
use tokio_stream::wrappers::ReceiverStream;

pub use super::subgraph::Entity;
pub use err::{StoreError, StoreResult};
Expand All @@ -18,7 +19,6 @@ use strum_macros::Display;
pub use traits::*;
pub use write::Batch;

use futures01::{Async, Stream};
use serde::{Deserialize, Serialize};
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashSet};
Expand Down Expand Up @@ -633,37 +633,8 @@ impl PartialEq for StoreEvent {
}
}

/// A `StoreEventStream` produces the `StoreEvents`. Various filters can be applied
/// to it to reduce which and how many events are delivered by the stream.
pub struct StoreEventStream<S> {
source: S,
}

/// A boxed `StoreEventStream`
pub type StoreEventStreamBox =
StoreEventStream<Box<dyn Stream<Item = Arc<StoreEvent>, Error = ()> + Send>>;

impl<S> Stream for StoreEventStream<S>
where
S: Stream<Item = Arc<StoreEvent>, Error = ()> + Send,
{
type Item = Arc<StoreEvent>;
type Error = ();

fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
self.source.poll()
}
}

impl<S> StoreEventStream<S>
where
S: Stream<Item = Arc<StoreEvent>, Error = ()> + Send + 'static,
{
// Create a new `StoreEventStream` from another such stream
pub fn new(source: S) -> Self {
StoreEventStream { source }
}
}
pub type StoreEventStreamBox = ReceiverStream<Arc<StoreEvent>>;

/// An entity operation that can be transacted into the store.
#[derive(Clone, Debug, PartialEq)]
Expand Down
5 changes: 3 additions & 2 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,16 @@ pub trait SubgraphStore: Send + Sync + 'static {
/// the subgraph is assigned to, and `is_paused` is true if the
/// subgraph is paused.
/// Returns None if the deployment does not exist.
fn assignment_status(
async fn assignment_status(
&self,
deployment: &DeploymentLocator,
) -> Result<Option<(NodeId, bool)>, StoreError>;

fn assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;

/// Returns assignments that are not paused
fn active_assignments(&self, node: &NodeId) -> Result<Vec<DeploymentLocator>, StoreError>;
async fn active_assignments(&self, node: &NodeId)
-> Result<Vec<DeploymentLocator>, StoreError>;

/// Return `true` if a subgraph `name` exists, regardless of whether the
/// subgraph has any deployments attached to it
Expand Down
11 changes: 2 additions & 9 deletions graph/src/components/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ use crate::{components::store::DeploymentLocator, prelude::*};
/// Common trait for subgraph providers.
#[async_trait]
pub trait SubgraphAssignmentProvider: Send + Sync + 'static {
async fn start(
&self,
deployment: DeploymentLocator,
stop_block: Option<BlockNumber>,
) -> Result<(), SubgraphAssignmentProviderError>;
async fn stop(
&self,
deployment: DeploymentLocator,
) -> Result<(), SubgraphAssignmentProviderError>;
async fn start(&self, deployment: DeploymentLocator, stop_block: Option<BlockNumber>);
async fn stop(&self, deployment: DeploymentLocator);
}
23 changes: 0 additions & 23 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{
components::store::DeploymentLocator,
derive::CacheWeight,
prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError},
runtime::gas::{Gas, GasSizeOf},
Expand Down Expand Up @@ -83,28 +82,6 @@ impl<'de> de::Deserialize<'de> for NodeId {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum AssignmentEvent {
Add {
deployment: DeploymentLocator,
node_id: NodeId,
},
Remove {
deployment: DeploymentLocator,
node_id: NodeId,
},
}

impl AssignmentEvent {
pub fn node_id(&self) -> &NodeId {
match self {
AssignmentEvent::Add { node_id, .. } => node_id,
AssignmentEvent::Remove { node_id, .. } => node_id,
}
}
}

/// An entity attribute name is represented as a string.
pub type Attribute = String;

Expand Down
6 changes: 3 additions & 3 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ pub mod prelude {
EntityCollection, EntityFilter, EntityLink, EntityOperation, EntityOrder,
EntityOrderByChild, EntityOrderByChildInfo, EntityQuery, EntityRange, EntityWindow,
EthereumCallCache, ParentLink, PartialBlockPtr, PoolWaitStats, QueryStore,
QueryStoreManager, StoreError, StoreEvent, StoreEventStream, StoreEventStreamBox,
SubgraphStore, UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX,
QueryStoreManager, StoreError, StoreEvent, StoreEventStreamBox, SubgraphStore,
UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX,
};
pub use crate::components::subgraph::{
BlockState, HostMetrics, InstanceDSTemplateInfo, RuntimeHost, RuntimeHostBuilder,
Expand All @@ -152,7 +152,7 @@ pub mod prelude {
Query, QueryError, QueryExecutionError, QueryResult, QueryTarget, QueryVariables,
};
pub use crate::data::store::scalar::{BigDecimal, BigInt, BigIntSign};
pub use crate::data::store::{AssignmentEvent, Attribute, Entity, NodeId, Value, ValueType};
pub use crate::data::store::{Attribute, Entity, NodeId, Value, ValueType};
pub use crate::data::subgraph::schema::SubgraphDeploymentEntity;
pub use crate::data::subgraph::{
CreateSubgraphResult, DataSourceContext, DeploymentHash, DeploymentState, Link,
Expand Down
6 changes: 2 additions & 4 deletions node/src/launcher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use anyhow::Result;

use git_testament::{git_testament, render_testament};
use graph::futures01::Future as _;
use graph::futures03::compat::Future01CompatExt;
use graph::futures03::future::TryFutureExt;

use crate::config::Config;
Expand Down Expand Up @@ -524,9 +522,9 @@ pub async fn run(

graph::spawn(
subgraph_registrar
.cheap_clone()
.start()
.map_err(|e| panic!("failed to initialize subgraph provider {}", e))
.compat(),
.map_err(|e| panic!("failed to initialize subgraph provider {}", e)),
);

// Start admin JSON-RPC server.
Expand Down
21 changes: 6 additions & 15 deletions node/src/manager/commands/listen.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::Write;
use std::sync::Arc;

use graph::futures01::Stream as _;
use graph::futures03::compat::Future01CompatExt;
use graph::futures03::{future, StreamExt};

use graph::{
components::store::SubscriptionManager as _,
prelude::{serde_json, Error},
Expand All @@ -12,25 +12,16 @@ use graph_store_postgres::SubscriptionManager;
async fn listen(mgr: Arc<SubscriptionManager>) -> Result<(), Error> {
let events = mgr.subscribe();
println!("press ctrl-c to stop");
let res = events
.inspect(move |event| {
serde_json::to_writer_pretty(std::io::stdout(), event)
events
.for_each(move |event| {
serde_json::to_writer_pretty(std::io::stdout(), &event)
.expect("event can be serialized to JSON");
writeln!(std::io::stdout()).unwrap();
std::io::stdout().flush().unwrap();
future::ready(())
})
.collect()
.compat()
.await;

match res {
Ok(_) => {
println!("stream finished")
}
Err(()) => {
eprintln!("stream failed")
}
}
Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ pub async fn run(

let locator = locate(subgraph_store.as_ref(), &hash)?;

SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block))
.await?;
SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block)).await;

loop {
tokio::time::sleep(Duration::from_millis(1000)).await;
Expand Down
44 changes: 37 additions & 7 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use diesel::{
Connection as _,
};
use graph::{
cheap_clone::CheapClone,
components::store::DeploymentLocator,
data::{
store::scalar::ToPrimitive,
Expand Down Expand Up @@ -1886,8 +1887,9 @@ pub fn is_empty(conn: &mut PgConnection) -> Result<bool, StoreError> {
/// a query returns either success or anything but a
/// `Err(StoreError::DatabaseUnavailable)`. This only works for tables that
/// are mirrored through `refresh_tables`
#[derive(Clone, CheapClone)]
pub struct Mirror {
pools: Vec<ConnectionPool>,
pools: Arc<Vec<ConnectionPool>>,
}

impl Mirror {
Expand Down Expand Up @@ -1917,6 +1919,7 @@ impl Mirror {
pools.push(pool.clone());
pools
});
let pools = Arc::new(pools);
Mirror { pools }
}

Expand All @@ -1925,7 +1928,7 @@ impl Mirror {
/// used for non-critical uses like command line tools
pub fn primary_only(primary: ConnectionPool) -> Mirror {
Mirror {
pools: vec![primary],
pools: Arc::new(vec![primary]),
}
}

Expand All @@ -1940,7 +1943,7 @@ impl Mirror {
mut f: impl 'a
+ FnMut(&mut PooledConnection<ConnectionManager<PgConnection>>) -> Result<T, StoreError>,
) -> Result<T, StoreError> {
for pool in &self.pools {
for pool in self.pools.as_ref() {
let mut conn = match pool.get() {
Ok(conn) => conn,
Err(StoreError::DatabaseUnavailable) => continue,
Expand All @@ -1955,6 +1958,27 @@ impl Mirror {
Err(StoreError::DatabaseUnavailable)
}

/// An async version of `read` that spawns a blocking task to do the
/// actual work. This is useful when you want to call `read` from an
/// async context
pub(crate) async fn read_async<T, F>(&self, mut f: F) -> Result<T, StoreError>
where
T: 'static + Send,
F: 'static
+ Send
+ FnMut(&mut PooledConnection<ConnectionManager<PgConnection>>) -> Result<T, StoreError>,
{
let this = self.cheap_clone();
let res = graph::spawn_blocking(async move { this.read(|conn| f(conn)) }).await;
match res {
Ok(v) => v,
Err(e) => Err(internal_error!(
"spawn_blocking in read_async failed: {}",
e
)),
}
}

/// Refresh the contents of mirrored tables from the primary (through
/// the fdw mapping that `ForeignServer` establishes)
pub(crate) fn refresh_tables(
Expand Down Expand Up @@ -2050,8 +2074,10 @@ impl Mirror {
self.read(|conn| queries::assignments(conn, node))
}

pub fn active_assignments(&self, node: &NodeId) -> Result<Vec<Site>, StoreError> {
self.read(|conn| queries::active_assignments(conn, node))
pub async fn active_assignments(&self, node: &NodeId) -> Result<Vec<Site>, StoreError> {
let node = node.clone();
self.read_async(move |conn| queries::active_assignments(conn, &node))
.await
}

pub fn assigned_node(&self, site: &Site) -> Result<Option<NodeId>, StoreError> {
Expand All @@ -2062,8 +2088,12 @@ impl Mirror {
/// the subgraph is assigned to, and `is_paused` is true if the
/// subgraph is paused.
/// Returns None if the deployment does not exist.
pub fn assignment_status(&self, site: &Site) -> Result<Option<(NodeId, bool)>, StoreError> {
self.read(|conn| queries::assignment_status(conn, site))
pub async fn assignment_status(
&self,
site: Arc<Site>,
) -> Result<Option<(NodeId, bool)>, StoreError> {
self.read_async(move |conn| queries::assignment_status(conn, &site))
.await
}

pub fn find_active_site(&self, subgraph: &DeploymentHash) -> Result<Option<Site>, StoreError> {
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/store_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,6 @@ impl SubscriptionManagerTrait for SubscriptionManager {
self.subscriptions.write().unwrap().insert(id, sender);

// Return the subscription ID and entity change stream
StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat()))
ReceiverStream::new(receiver)
}
}
Loading
Loading