From 1fd4012ac20e51a934fd27488c431622250d0c61 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Fri, 18 Oct 2024 14:43:41 +0530 Subject: [PATCH 01/10] core,server: add graphman unassign/reassign command to graphql api --- core/graphman/src/commands/deployment/mod.rs | 2 + .../src/commands/deployment/reassign.rs | 96 +++++++++++++++++++ .../src/commands/deployment/unassign.rs | 74 ++++++++++++++ .../graphman/src/entities/empty_response.rs | 8 +- .../src/resolvers/deployment_mutation.rs | 48 +++++++++- .../resolvers/deployment_mutation/reassign.rs | 19 ++++ .../resolvers/deployment_mutation/unassign.rs | 17 ++++ 7 files changed, 258 insertions(+), 6 deletions(-) create mode 100644 core/graphman/src/commands/deployment/reassign.rs create mode 100644 core/graphman/src/commands/deployment/unassign.rs create mode 100644 server/graphman/src/resolvers/deployment_mutation/reassign.rs create mode 100644 server/graphman/src/resolvers/deployment_mutation/unassign.rs diff --git a/core/graphman/src/commands/deployment/mod.rs b/core/graphman/src/commands/deployment/mod.rs index 9c695a5f74a..4cac2277bbe 100644 --- a/core/graphman/src/commands/deployment/mod.rs +++ b/core/graphman/src/commands/deployment/mod.rs @@ -1,3 +1,5 @@ pub mod info; pub mod pause; +pub mod reassign; pub mod resume; +pub mod unassign; diff --git a/core/graphman/src/commands/deployment/reassign.rs b/core/graphman/src/commands/deployment/reassign.rs new file mode 100644 index 00000000000..bdd0a79ebb9 --- /dev/null +++ b/core/graphman/src/commands/deployment/reassign.rs @@ -0,0 +1,96 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use graph::components::store::DeploymentLocator; +use graph::components::store::StoreEvent; +use graph::prelude::EntityChange; +use graph::prelude::NodeId; +use graph_store_postgres::command_support::catalog; +use graph_store_postgres::command_support::catalog::Site; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use thiserror::Error; + +use crate::deployment::DeploymentSelector; +use crate::deployment::DeploymentVersionSelector; +use crate::GraphmanError; + +pub struct ActiveDeployment { + locator: DeploymentLocator, + site: Site, +} + +#[derive(Debug, Error)] +pub enum ReassignDeploymentError { + #[error("deployment '{0}' is already assigned to '{1}'")] + AlreadyAssigned(String, String), + + #[error(transparent)] + Common(#[from] GraphmanError), +} + +pub fn load_deployment( + primary_pool: ConnectionPool, + deployment: &DeploymentSelector, +) -> Result { + let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + + let locator = crate::deployment::load_deployment( + &mut primary_conn, + deployment, + &DeploymentVersionSelector::All, + )? + .locator(); + + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let site = catalog_conn + .locate_site(locator.clone()) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("deployment site not found for '{locator}'")) + })?; + + Ok(ActiveDeployment { locator, site }) +} + +pub fn reassign_deployment( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: ActiveDeployment, + node: &NodeId, +) -> Result<(), ReassignDeploymentError> { + let primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let changes: Vec = match catalog_conn + .assigned_node(&deployment.site) + .map_err(GraphmanError::from)? + { + Some(curr) => { + if &curr == node { + vec![] + } else { + catalog_conn + .reassign_subgraph(&deployment.site, &node) + .map_err(GraphmanError::from)? + } + } + None => catalog_conn + .assign_subgraph(&deployment.site, &node) + .map_err(GraphmanError::from)?, + }; + + if changes.is_empty() { + return Err(ReassignDeploymentError::AlreadyAssigned( + deployment.locator.hash.to_string(), + node.to_string(), + )); + } + + catalog_conn + .send_store_event(¬ification_sender, &StoreEvent::new(changes)) + .map_err(GraphmanError::from)?; + + Ok(()) +} diff --git a/core/graphman/src/commands/deployment/unassign.rs b/core/graphman/src/commands/deployment/unassign.rs new file mode 100644 index 00000000000..9efc7317ad9 --- /dev/null +++ b/core/graphman/src/commands/deployment/unassign.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use graph::components::store::StoreEvent; +use graph_store_postgres::command_support::catalog; +use graph_store_postgres::command_support::catalog::Site; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use thiserror::Error; + +use crate::deployment::DeploymentSelector; +use crate::deployment::DeploymentVersionSelector; +use crate::GraphmanError; + +pub struct AssignedDeployment { + site: Site, +} + +#[derive(Debug, Error)] +pub enum UnassignDeploymentError { + #[error("deployment '{0}' is already unassigned")] + AlreadyUnassigned(String), + + #[error(transparent)] + Common(#[from] GraphmanError), +} + +pub fn load_assigned_deployment( + primary_pool: ConnectionPool, + deployment: &DeploymentSelector, +) -> Result { + let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + + let locator = crate::deployment::load_deployment( + &mut primary_conn, + deployment, + &DeploymentVersionSelector::All, + )? + .locator(); + + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let site = catalog_conn + .locate_site(locator.clone()) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("deployment site not found for '{locator}'")) + })?; + + match catalog_conn + .assigned_node(&site) + .map_err(GraphmanError::from)? + { + Some(_) => Ok(AssignedDeployment { site }), + None => Err(UnassignDeploymentError::AlreadyUnassigned(format!( + "{:?}", + deployment + ))), + } +} + +pub fn unassign_deployment( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: AssignedDeployment, +) -> Result<(), GraphmanError> { + let primary_conn = primary_pool.get()?; + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let changes = catalog_conn.unassign_subgraph(&deployment.site)?; + catalog_conn.send_store_event(¬ification_sender, &StoreEvent::new(changes))?; + + Ok(()) +} diff --git a/server/graphman/src/entities/empty_response.rs b/server/graphman/src/entities/empty_response.rs index a66244f899e..437553a5fed 100644 --- a/server/graphman/src/entities/empty_response.rs +++ b/server/graphman/src/entities/empty_response.rs @@ -5,11 +5,15 @@ use async_graphql::SimpleObject; #[derive(Clone, Debug, SimpleObject)] pub struct EmptyResponse { pub success: bool, + pub message: Option, } impl EmptyResponse { /// Returns a successful response. - pub fn new() -> Self { - Self { success: true } + pub fn new(msg: Option) -> Self { + Self { + success: true, + message: msg, + } } } diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs index 983897391cf..7b4e48ae5a2 100644 --- a/server/graphman/src/resolvers/deployment_mutation.rs +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -1,8 +1,11 @@ use std::sync::Arc; +use anyhow::anyhow; use async_graphql::Context; use async_graphql::Object; use async_graphql::Result; +use graph::prelude::NodeId; +use graph_store_postgres::command_support::catalog; use graph_store_postgres::graphman::GraphmanStore; use crate::entities::DeploymentSelector; @@ -12,9 +15,11 @@ use crate::resolvers::context::GraphmanContext; mod create; mod pause; +mod reassign; mod remove; mod restart; mod resume; +mod unassign; pub struct DeploymentMutation; @@ -32,7 +37,7 @@ impl DeploymentMutation { pause::run(&ctx, &deployment)?; - Ok(EmptyResponse::new()) + Ok(EmptyResponse::new(None)) } /// Resumes a deployment that has been previously paused. @@ -46,7 +51,7 @@ impl DeploymentMutation { resume::run(&ctx, &deployment)?; - Ok(EmptyResponse::new()) + Ok(EmptyResponse::new(None)) } /// Pauses a deployment and resumes it after a delay. @@ -72,13 +77,48 @@ impl DeploymentMutation { pub async fn create(&self, ctx: &Context<'_>, name: String) -> Result { let ctx = GraphmanContext::new(ctx)?; create::run(&ctx, &name)?; - Ok(EmptyResponse::new()) + Ok(EmptyResponse::new(None)) } /// Remove a subgraph pub async fn remove(&self, ctx: &Context<'_>, name: String) -> Result { let ctx = GraphmanContext::new(ctx)?; remove::run(&ctx, &name)?; - Ok(EmptyResponse::new()) + Ok(EmptyResponse::new(None)) + } + + /// Unassign a deployment + pub async fn unassign( + &self, + ctx: &Context<'_>, + deployment: DeploymentSelector, + ) -> Result { + let ctx = GraphmanContext::new(ctx)?; + let deployment = deployment.try_into()?; + + unassign::run(&ctx, &deployment)?; + + Ok(EmptyResponse::new(None)) + } + + /// Assign or reassign a deployment + pub async fn reassign( + &self, + ctx: &Context<'_>, + deployment: DeploymentSelector, + node: String, + ) -> Result { + let ctx = GraphmanContext::new(ctx)?; + let deployment = deployment.try_into()?; + let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?; + reassign::run(&ctx, &deployment, &node)?; + + let mirror = catalog::Mirror::primary_only(ctx.primary_pool); + let count = mirror.assignments(&node)?.len(); + if count == 1 { + Ok(EmptyResponse::new(Some(format!("warning: this is the only deployment assigned to '{}'. Are you sure it is spelled correctly?",node.as_str())))) + } else { + Ok(EmptyResponse::new(None)) + } } } diff --git a/server/graphman/src/resolvers/deployment_mutation/reassign.rs b/server/graphman/src/resolvers/deployment_mutation/reassign.rs new file mode 100644 index 00000000000..7bbfb20ba60 --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation/reassign.rs @@ -0,0 +1,19 @@ +use async_graphql::Result; +use graph::prelude::NodeId; +use graphman::commands::deployment::reassign::load_deployment; +use graphman::commands::deployment::reassign::reassign_deployment; +use graphman::deployment::DeploymentSelector; + +use crate::resolvers::context::GraphmanContext; + +pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector, node: &NodeId) -> Result<()> { + let deployment = load_deployment(ctx.primary_pool.clone(), deployment)?; + reassign_deployment( + ctx.primary_pool.clone(), + ctx.notification_sender.clone(), + deployment, + &node, + )?; + + Ok(()) +} diff --git a/server/graphman/src/resolvers/deployment_mutation/unassign.rs b/server/graphman/src/resolvers/deployment_mutation/unassign.rs new file mode 100644 index 00000000000..4af620e8568 --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation/unassign.rs @@ -0,0 +1,17 @@ +use async_graphql::Result; +use graphman::commands::deployment::unassign::load_assigned_deployment; +use graphman::commands::deployment::unassign::unassign_deployment; +use graphman::deployment::DeploymentSelector; + +use crate::resolvers::context::GraphmanContext; + +pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector) -> Result<()> { + let deployment = load_assigned_deployment(ctx.primary_pool.clone(), deployment)?; + unassign_deployment( + ctx.primary_pool.clone(), + ctx.notification_sender.clone(), + deployment, + )?; + + Ok(()) +} From 228896d5d73b9500b63eda05ea7ce00bd3cb83f6 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Fri, 18 Oct 2024 20:51:21 +0530 Subject: [PATCH 02/10] server: update deployment locator --- core/graphman/src/commands/deployment/reassign.rs | 5 ++--- core/graphman/src/commands/deployment/unassign.rs | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/graphman/src/commands/deployment/reassign.rs b/core/graphman/src/commands/deployment/reassign.rs index bdd0a79ebb9..0a2fc1468b5 100644 --- a/core/graphman/src/commands/deployment/reassign.rs +++ b/core/graphman/src/commands/deployment/reassign.rs @@ -35,12 +35,11 @@ pub fn load_deployment( ) -> Result { let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; - let locator = crate::deployment::load_deployment( + let locator = crate::deployment::load_deployment_locator( &mut primary_conn, deployment, &DeploymentVersionSelector::All, - )? - .locator(); + )?; let mut catalog_conn = catalog::Connection::new(primary_conn); diff --git a/core/graphman/src/commands/deployment/unassign.rs b/core/graphman/src/commands/deployment/unassign.rs index 9efc7317ad9..2b1ffbe2d27 100644 --- a/core/graphman/src/commands/deployment/unassign.rs +++ b/core/graphman/src/commands/deployment/unassign.rs @@ -31,12 +31,11 @@ pub fn load_assigned_deployment( ) -> Result { let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; - let locator = crate::deployment::load_deployment( + let locator = crate::deployment::load_deployment_locator( &mut primary_conn, deployment, &DeploymentVersionSelector::All, - )? - .locator(); + )?; let mut catalog_conn = catalog::Connection::new(primary_conn); From c16059f0133ebf2bea952782922245eeb08c226e Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Wed, 23 Oct 2024 11:35:49 +0530 Subject: [PATCH 03/10] server: added a common reponse type for graphman graphql api --- server/graphman/src/entities/empty_response.rs | 8 ++------ server/graphman/src/entities/mod.rs | 2 ++ server/graphman/src/entities/response.rs | 17 +++++++++++++++++ 3 files changed, 21 insertions(+), 6 deletions(-) create mode 100644 server/graphman/src/entities/response.rs diff --git a/server/graphman/src/entities/empty_response.rs b/server/graphman/src/entities/empty_response.rs index 437553a5fed..a66244f899e 100644 --- a/server/graphman/src/entities/empty_response.rs +++ b/server/graphman/src/entities/empty_response.rs @@ -5,15 +5,11 @@ use async_graphql::SimpleObject; #[derive(Clone, Debug, SimpleObject)] pub struct EmptyResponse { pub success: bool, - pub message: Option, } impl EmptyResponse { /// Returns a successful response. - pub fn new(msg: Option) -> Self { - Self { - success: true, - message: msg, - } + pub fn new() -> Self { + Self { success: true } } } diff --git a/server/graphman/src/entities/mod.rs b/server/graphman/src/entities/mod.rs index 8f4b2d8c018..a4424e6a867 100644 --- a/server/graphman/src/entities/mod.rs +++ b/server/graphman/src/entities/mod.rs @@ -9,6 +9,7 @@ mod deployment_version_selector; mod empty_response; mod execution; mod execution_id; +mod response; mod subgraph_health; pub use self::block_hash::BlockHash; @@ -22,4 +23,5 @@ pub use self::deployment_version_selector::DeploymentVersionSelector; pub use self::empty_response::EmptyResponse; pub use self::execution::Execution; pub use self::execution_id::ExecutionId; +pub use self::response::Response; pub use self::subgraph_health::SubgraphHealth; diff --git a/server/graphman/src/entities/response.rs b/server/graphman/src/entities/response.rs new file mode 100644 index 00000000000..93bce14d898 --- /dev/null +++ b/server/graphman/src/entities/response.rs @@ -0,0 +1,17 @@ +use async_graphql::SimpleObject; + +#[derive(Clone, Debug, SimpleObject)] +pub struct Response { + pub success: bool, + pub message: String, +} + +impl Response { + /// Returns a response with success & message. + pub fn new(success: bool, msg: String) -> Self { + Self { + success, + message: msg, + } + } +} From 3543972652e4d075d91b6a3b518e7880f9b1c4b3 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Wed, 23 Oct 2024 11:36:32 +0530 Subject: [PATCH 04/10] core,server: changed reponse type for graphman unassign/reassign graphql api --- .../src/commands/deployment/unassign.rs | 4 ++-- core/graphman/src/deployment.rs | 13 ++++++++++++ .../src/resolvers/deployment_mutation.rs | 21 ++++++++++++------- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/core/graphman/src/commands/deployment/unassign.rs b/core/graphman/src/commands/deployment/unassign.rs index 2b1ffbe2d27..366ffaa5a19 100644 --- a/core/graphman/src/commands/deployment/unassign.rs +++ b/core/graphman/src/commands/deployment/unassign.rs @@ -52,8 +52,8 @@ pub fn load_assigned_deployment( { Some(_) => Ok(AssignedDeployment { site }), None => Err(UnassignDeploymentError::AlreadyUnassigned(format!( - "{:?}", - deployment + "{}", + deployment.as_str() ))), } } diff --git a/core/graphman/src/deployment.rs b/core/graphman/src/deployment.rs index 1d749af54bb..966ab8bff73 100644 --- a/core/graphman/src/deployment.rs +++ b/core/graphman/src/deployment.rs @@ -31,6 +31,19 @@ pub enum DeploymentSelector { All, } +impl DeploymentSelector { + pub fn as_str(&self) -> String { + match self { + DeploymentSelector::Name(name) => name.clone(), + DeploymentSelector::Subgraph { hash, shard } => { + format!("[Hash:{}, Shard:{:?}]", hash, shard) + } + DeploymentSelector::Schema(schema) => schema.clone(), + DeploymentSelector::All => "All".to_string(), + } + } +} + #[derive(Clone, Debug)] pub enum DeploymentVersionSelector { Current, diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs index 7b4e48ae5a2..6153639b07b 100644 --- a/server/graphman/src/resolvers/deployment_mutation.rs +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -11,6 +11,7 @@ use graph_store_postgres::graphman::GraphmanStore; use crate::entities::DeploymentSelector; use crate::entities::EmptyResponse; use crate::entities::ExecutionId; +use crate::entities::Response; use crate::resolvers::context::GraphmanContext; mod create; @@ -37,7 +38,7 @@ impl DeploymentMutation { pause::run(&ctx, &deployment)?; - Ok(EmptyResponse::new(None)) + Ok(EmptyResponse::new()) } /// Resumes a deployment that has been previously paused. @@ -51,7 +52,7 @@ impl DeploymentMutation { resume::run(&ctx, &deployment)?; - Ok(EmptyResponse::new(None)) + Ok(EmptyResponse::new()) } /// Pauses a deployment and resumes it after a delay. @@ -92,13 +93,16 @@ impl DeploymentMutation { &self, ctx: &Context<'_>, deployment: DeploymentSelector, - ) -> Result { + ) -> Result { let ctx = GraphmanContext::new(ctx)?; let deployment = deployment.try_into()?; unassign::run(&ctx, &deployment)?; - Ok(EmptyResponse::new(None)) + Ok(Response::new( + true, + format!("Unassigned {}", deployment.as_str()), + )) } /// Assign or reassign a deployment @@ -107,7 +111,7 @@ impl DeploymentMutation { ctx: &Context<'_>, deployment: DeploymentSelector, node: String, - ) -> Result { + ) -> Result { let ctx = GraphmanContext::new(ctx)?; let deployment = deployment.try_into()?; let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?; @@ -116,9 +120,12 @@ impl DeploymentMutation { let mirror = catalog::Mirror::primary_only(ctx.primary_pool); let count = mirror.assignments(&node)?.len(); if count == 1 { - Ok(EmptyResponse::new(Some(format!("warning: this is the only deployment assigned to '{}'. Are you sure it is spelled correctly?",node.as_str())))) + Ok(Response::new(true,format!("warning: this is the only deployment assigned to '{}'. Are you sure it is spelled correctly?",node.as_str()))) } else { - Ok(EmptyResponse::new(None)) + Ok(Response::new( + true, + format!("Ressigned {} to {}", deployment.as_str(), node.as_str()), + )) } } } From b674be54bd6d944b4ece11bd29298d8f3db24a5e Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Fri, 29 Nov 2024 06:19:25 +0530 Subject: [PATCH 05/10] server: add CompletedWithWarnings return type --- .../src/commands/deployment/reassign.rs | 2 +- .../src/commands/deployment/unassign.rs | 7 ++--- core/graphman/src/deployment.rs | 13 --------- server/graphman/src/entities/mod.rs | 4 +-- server/graphman/src/entities/response.rs | 17 ----------- .../graphman/src/entities/warning_response.rs | 13 +++++++++ .../src/resolvers/deployment_mutation.rs | 28 +++++++++++-------- 7 files changed, 35 insertions(+), 49 deletions(-) delete mode 100644 server/graphman/src/entities/response.rs create mode 100644 server/graphman/src/entities/warning_response.rs diff --git a/core/graphman/src/commands/deployment/reassign.rs b/core/graphman/src/commands/deployment/reassign.rs index 0a2fc1468b5..bb24cd3f19c 100644 --- a/core/graphman/src/commands/deployment/reassign.rs +++ b/core/graphman/src/commands/deployment/reassign.rs @@ -82,7 +82,7 @@ pub fn reassign_deployment( if changes.is_empty() { return Err(ReassignDeploymentError::AlreadyAssigned( - deployment.locator.hash.to_string(), + deployment.locator.to_string(), node.to_string(), )); } diff --git a/core/graphman/src/commands/deployment/unassign.rs b/core/graphman/src/commands/deployment/unassign.rs index 366ffaa5a19..5966db8a065 100644 --- a/core/graphman/src/commands/deployment/unassign.rs +++ b/core/graphman/src/commands/deployment/unassign.rs @@ -51,10 +51,9 @@ pub fn load_assigned_deployment( .map_err(GraphmanError::from)? { Some(_) => Ok(AssignedDeployment { site }), - None => Err(UnassignDeploymentError::AlreadyUnassigned(format!( - "{}", - deployment.as_str() - ))), + None => Err(UnassignDeploymentError::AlreadyUnassigned( + locator.to_string(), + )), } } diff --git a/core/graphman/src/deployment.rs b/core/graphman/src/deployment.rs index 966ab8bff73..1d749af54bb 100644 --- a/core/graphman/src/deployment.rs +++ b/core/graphman/src/deployment.rs @@ -31,19 +31,6 @@ pub enum DeploymentSelector { All, } -impl DeploymentSelector { - pub fn as_str(&self) -> String { - match self { - DeploymentSelector::Name(name) => name.clone(), - DeploymentSelector::Subgraph { hash, shard } => { - format!("[Hash:{}, Shard:{:?}]", hash, shard) - } - DeploymentSelector::Schema(schema) => schema.clone(), - DeploymentSelector::All => "All".to_string(), - } - } -} - #[derive(Clone, Debug)] pub enum DeploymentVersionSelector { Current, diff --git a/server/graphman/src/entities/mod.rs b/server/graphman/src/entities/mod.rs index a4424e6a867..c8d3330c9f7 100644 --- a/server/graphman/src/entities/mod.rs +++ b/server/graphman/src/entities/mod.rs @@ -9,8 +9,8 @@ mod deployment_version_selector; mod empty_response; mod execution; mod execution_id; -mod response; mod subgraph_health; +mod warning_response; pub use self::block_hash::BlockHash; pub use self::block_number::BlockNumber; @@ -23,5 +23,5 @@ pub use self::deployment_version_selector::DeploymentVersionSelector; pub use self::empty_response::EmptyResponse; pub use self::execution::Execution; pub use self::execution_id::ExecutionId; -pub use self::response::Response; pub use self::subgraph_health::SubgraphHealth; +pub use self::warning_response::CompletedWithWarnings; diff --git a/server/graphman/src/entities/response.rs b/server/graphman/src/entities/response.rs deleted file mode 100644 index 93bce14d898..00000000000 --- a/server/graphman/src/entities/response.rs +++ /dev/null @@ -1,17 +0,0 @@ -use async_graphql::SimpleObject; - -#[derive(Clone, Debug, SimpleObject)] -pub struct Response { - pub success: bool, - pub message: String, -} - -impl Response { - /// Returns a response with success & message. - pub fn new(success: bool, msg: String) -> Self { - Self { - success, - message: msg, - } - } -} diff --git a/server/graphman/src/entities/warning_response.rs b/server/graphman/src/entities/warning_response.rs new file mode 100644 index 00000000000..e0d2f72fa41 --- /dev/null +++ b/server/graphman/src/entities/warning_response.rs @@ -0,0 +1,13 @@ +use async_graphql::SimpleObject; + +#[derive(Clone, Debug, SimpleObject)] +pub struct CompletedWithWarnings { + pub warnings: Vec, +} + +impl CompletedWithWarnings { + /// Returns a response with success & message. + pub fn new(msg: Vec) -> Self { + Self { warnings: msg } + } +} diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs index 6153639b07b..2bb48300c02 100644 --- a/server/graphman/src/resolvers/deployment_mutation.rs +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -4,14 +4,15 @@ use anyhow::anyhow; use async_graphql::Context; use async_graphql::Object; use async_graphql::Result; +use async_graphql::Union; use graph::prelude::NodeId; use graph_store_postgres::command_support::catalog; use graph_store_postgres::graphman::GraphmanStore; +use crate::entities::CompletedWithWarnings; use crate::entities::DeploymentSelector; use crate::entities::EmptyResponse; use crate::entities::ExecutionId; -use crate::entities::Response; use crate::resolvers::context::GraphmanContext; mod create; @@ -24,6 +25,12 @@ mod unassign; pub struct DeploymentMutation; +#[derive(Clone, Debug, Union)] +pub enum ReassignResponse { + EmptyResponse(EmptyResponse), + CompletedWithWarnings(CompletedWithWarnings), +} + /// Mutations related to one or multiple deployments. #[Object] impl DeploymentMutation { @@ -93,16 +100,13 @@ impl DeploymentMutation { &self, ctx: &Context<'_>, deployment: DeploymentSelector, - ) -> Result { + ) -> Result { let ctx = GraphmanContext::new(ctx)?; let deployment = deployment.try_into()?; unassign::run(&ctx, &deployment)?; - Ok(Response::new( - true, - format!("Unassigned {}", deployment.as_str()), - )) + Ok(EmptyResponse::new()) } /// Assign or reassign a deployment @@ -111,7 +115,7 @@ impl DeploymentMutation { ctx: &Context<'_>, deployment: DeploymentSelector, node: String, - ) -> Result { + ) -> Result { let ctx = GraphmanContext::new(ctx)?; let deployment = deployment.try_into()?; let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?; @@ -120,12 +124,12 @@ impl DeploymentMutation { let mirror = catalog::Mirror::primary_only(ctx.primary_pool); let count = mirror.assignments(&node)?.len(); if count == 1 { - Ok(Response::new(true,format!("warning: this is the only deployment assigned to '{}'. Are you sure it is spelled correctly?",node.as_str()))) - } else { - Ok(Response::new( - true, - format!("Ressigned {} to {}", deployment.as_str(), node.as_str()), + let warning_msg = format!("warning: this is the only deployment assigned to '{}'. Are you sure it is spelled correctly?",node.as_str()); + Ok(ReassignResponse::CompletedWithWarnings( + CompletedWithWarnings::new(vec![warning_msg]), )) + } else { + Ok(ReassignResponse::EmptyResponse(EmptyResponse::new())) } } } From 999825d1593f844ae617a482a22af2fb07143ce8 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Fri, 29 Nov 2024 20:46:14 +0530 Subject: [PATCH 06/10] server: graphman graphql api tests added for unassign/reassign --- server/graphman/tests/deployment_mutation.rs | 194 +++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/server/graphman/tests/deployment_mutation.rs b/server/graphman/tests/deployment_mutation.rs index 927cf5bc87a..f77da4b3916 100644 --- a/server/graphman/tests/deployment_mutation.rs +++ b/server/graphman/tests/deployment_mutation.rs @@ -390,3 +390,197 @@ fn graphql_cannot_remove_subgraph_with_invalid_name() { assert_ne!(resp, success_resp); }); } + +#[test] +fn graphql_can_unassign_deployments() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let unassign_req = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "unassign": { + "success": true, + } + } + } + }); + + let subgraph_node_id = send_graphql_request( + json!({ + "query": r#"query Deployment { + deployment { + info(deployment: { hash: "subgraph_1" }) { + nodeId + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let is_node_null = subgraph_node_id["data"]["deployment"]["info"][0]["nodeId"].is_null(); + + assert_eq!(unassign_req, expected_resp); + assert_eq!(is_node_null, true); + }); +} + +#[test] +fn graphql_cannot_unassign_deployments_twice() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let unassign_again = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "unassign": { + "success": true, + } + } + } + }); + + assert_ne!(unassign_again, expected_resp); + }); +} + +#[test] +fn graphql_can_reassign_deployment() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let deployment_hash = DeploymentHash::new("subgraph_2").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let reassign = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + reassign(deployment: { hash: "subgraph_1" }, node: "test") { + ... on EmptyResponse { + success + } + ... on CompletedWithWarnings { + warnings + } + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "reassign": { + "success": true, + } + } + } + }); + + assert_eq!(reassign, expected_resp); + }); +} + +#[test] +fn graphql_warns_reassign_on_wrong_node_id() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let reassign = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + reassign(deployment: { hash: "subgraph_1" }, node: "invalid_node") { + ... on EmptyResponse { + success + } + ... on CompletedWithWarnings { + warnings + } + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "reassign": { + "warnings": ["warning: this is the only deployment assigned to 'invalid_node'. Are you sure it is spelled correctly?"], + } + } + } + }); + + assert_eq!(reassign, expected_resp); + }); +} From 9a060df23a2484fad4f6622b22d0f2ae7c60cc72 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Thu, 26 Dec 2024 06:28:01 +0530 Subject: [PATCH 07/10] server: improve warning message for graphman graphql api and clean up code --- server/graphman/src/entities/warning_response.rs | 5 ++--- server/graphman/src/resolvers/deployment_mutation.rs | 2 +- server/graphman/tests/deployment_mutation.rs | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server/graphman/src/entities/warning_response.rs b/server/graphman/src/entities/warning_response.rs index e0d2f72fa41..a99c4ce7583 100644 --- a/server/graphman/src/entities/warning_response.rs +++ b/server/graphman/src/entities/warning_response.rs @@ -6,8 +6,7 @@ pub struct CompletedWithWarnings { } impl CompletedWithWarnings { - /// Returns a response with success & message. - pub fn new(msg: Vec) -> Self { - Self { warnings: msg } + pub fn new(warnings: Vec) -> Self { + Self { warnings } } } diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs index 2bb48300c02..4d487e60c32 100644 --- a/server/graphman/src/resolvers/deployment_mutation.rs +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -124,7 +124,7 @@ impl DeploymentMutation { let mirror = catalog::Mirror::primary_only(ctx.primary_pool); let count = mirror.assignments(&node)?.len(); if count == 1 { - let warning_msg = format!("warning: this is the only deployment assigned to '{}'. Are you sure it is spelled correctly?",node.as_str()); + let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str()); Ok(ReassignResponse::CompletedWithWarnings( CompletedWithWarnings::new(vec![warning_msg]), )) diff --git a/server/graphman/tests/deployment_mutation.rs b/server/graphman/tests/deployment_mutation.rs index f77da4b3916..b0afcc528c4 100644 --- a/server/graphman/tests/deployment_mutation.rs +++ b/server/graphman/tests/deployment_mutation.rs @@ -423,7 +423,7 @@ fn graphql_can_unassign_deployments() { let subgraph_node_id = send_graphql_request( json!({ - "query": r#"query Deployment { + "query": r#"{ deployment { info(deployment: { hash: "subgraph_1" }) { nodeId From e67c6aa23636d1d6d13f5eb127d42c5d9aea1141 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Thu, 26 Dec 2024 07:03:27 +0530 Subject: [PATCH 08/10] server: correction in EmptyResponse creation in deployment mutation methods --- server/graphman/src/resolvers/deployment_mutation.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs index 4d487e60c32..1e264111baf 100644 --- a/server/graphman/src/resolvers/deployment_mutation.rs +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -85,14 +85,14 @@ impl DeploymentMutation { pub async fn create(&self, ctx: &Context<'_>, name: String) -> Result { let ctx = GraphmanContext::new(ctx)?; create::run(&ctx, &name)?; - Ok(EmptyResponse::new(None)) + Ok(EmptyResponse::new()) } /// Remove a subgraph pub async fn remove(&self, ctx: &Context<'_>, name: String) -> Result { let ctx = GraphmanContext::new(ctx)?; remove::run(&ctx, &name)?; - Ok(EmptyResponse::new(None)) + Ok(EmptyResponse::new()) } /// Unassign a deployment From d0d7d5c1d54ad07a81942352617d3cd63b10f404 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Thu, 26 Dec 2024 07:22:09 +0530 Subject: [PATCH 09/10] server: update warning message for reassign on invalid node ID in deployment mutation tests --- server/graphman/tests/deployment_mutation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/graphman/tests/deployment_mutation.rs b/server/graphman/tests/deployment_mutation.rs index b0afcc528c4..04049caaf59 100644 --- a/server/graphman/tests/deployment_mutation.rs +++ b/server/graphman/tests/deployment_mutation.rs @@ -575,7 +575,7 @@ fn graphql_warns_reassign_on_wrong_node_id() { "data": { "deployment": { "reassign": { - "warnings": ["warning: this is the only deployment assigned to 'invalid_node'. Are you sure it is spelled correctly?"], + "warnings": ["This is the only deployment assigned to 'invalid_node'. Please make sure that the node ID is spelled correctly."], } } } From 38ceded52208dd8407a4dfa5b59843e518a2c5f8 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Sat, 28 Dec 2024 13:07:34 +0530 Subject: [PATCH 10/10] core,node,server: use common fn for graphman cli & server for unassign and reassign --- .../src/commands/deployment/reassign.rs | 34 ++++++++++++--- .../src/commands/deployment/unassign.rs | 10 ++++- node/src/bin/manager.rs | 20 ++++++--- node/src/manager/commands/deployment/mod.rs | 2 + .../manager/commands/deployment/reassign.rs | 41 +++++++++++++++++++ .../manager/commands/deployment/unassign.rs | 22 ++++++++++ .../src/resolvers/deployment_mutation.rs | 21 ++++------ .../resolvers/deployment_mutation/reassign.rs | 15 ++++--- 8 files changed, 136 insertions(+), 29 deletions(-) create mode 100644 node/src/manager/commands/deployment/reassign.rs create mode 100644 node/src/manager/commands/deployment/unassign.rs diff --git a/core/graphman/src/commands/deployment/reassign.rs b/core/graphman/src/commands/deployment/reassign.rs index bb24cd3f19c..2e5916a7aae 100644 --- a/core/graphman/src/commands/deployment/reassign.rs +++ b/core/graphman/src/commands/deployment/reassign.rs @@ -15,11 +15,17 @@ use crate::deployment::DeploymentSelector; use crate::deployment::DeploymentVersionSelector; use crate::GraphmanError; -pub struct ActiveDeployment { +pub struct Deployment { locator: DeploymentLocator, site: Site, } +impl Deployment { + pub fn locator(&self) -> &DeploymentLocator { + &self.locator + } +} + #[derive(Debug, Error)] pub enum ReassignDeploymentError { #[error("deployment '{0}' is already assigned to '{1}'")] @@ -29,10 +35,16 @@ pub enum ReassignDeploymentError { Common(#[from] GraphmanError), } +#[derive(Clone, Debug)] +pub enum ReassignResult { + EmptyResponse, + CompletedWithWarnings(Vec), +} + pub fn load_deployment( primary_pool: ConnectionPool, deployment: &DeploymentSelector, -) -> Result { +) -> Result { let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; let locator = crate::deployment::load_deployment_locator( @@ -50,15 +62,15 @@ pub fn load_deployment( GraphmanError::Store(anyhow!("deployment site not found for '{locator}'")) })?; - Ok(ActiveDeployment { locator, site }) + Ok(Deployment { locator, site }) } pub fn reassign_deployment( primary_pool: ConnectionPool, notification_sender: Arc, - deployment: ActiveDeployment, + deployment: &Deployment, node: &NodeId, -) -> Result<(), ReassignDeploymentError> { +) -> Result { let primary_conn = primary_pool.get().map_err(GraphmanError::from)?; let mut catalog_conn = catalog::Connection::new(primary_conn); @@ -91,5 +103,15 @@ pub fn reassign_deployment( .send_store_event(¬ification_sender, &StoreEvent::new(changes)) .map_err(GraphmanError::from)?; - Ok(()) + let mirror = catalog::Mirror::primary_only(primary_pool); + let count = mirror + .assignments(&node) + .map_err(GraphmanError::from)? + .len(); + if count == 1 { + let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str()); + Ok(ReassignResult::CompletedWithWarnings(vec![warning_msg])) + } else { + Ok(ReassignResult::EmptyResponse) + } } diff --git a/core/graphman/src/commands/deployment/unassign.rs b/core/graphman/src/commands/deployment/unassign.rs index 5966db8a065..5233e61ada1 100644 --- a/core/graphman/src/commands/deployment/unassign.rs +++ b/core/graphman/src/commands/deployment/unassign.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::anyhow; +use graph::components::store::DeploymentLocator; use graph::components::store::StoreEvent; use graph_store_postgres::command_support::catalog; use graph_store_postgres::command_support::catalog::Site; @@ -13,9 +14,16 @@ use crate::deployment::DeploymentVersionSelector; use crate::GraphmanError; pub struct AssignedDeployment { + locator: DeploymentLocator, site: Site, } +impl AssignedDeployment { + pub fn locator(&self) -> &DeploymentLocator { + &self.locator + } +} + #[derive(Debug, Error)] pub enum UnassignDeploymentError { #[error("deployment '{0}' is already unassigned")] @@ -50,7 +58,7 @@ pub fn load_assigned_deployment( .assigned_node(&site) .map_err(GraphmanError::from)? { - Some(_) => Ok(AssignedDeployment { site }), + Some(_) => Ok(AssignedDeployment { locator, site }), None => Err(UnassignDeploymentError::AlreadyUnassigned( locator.to_string(), )), diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index afc09403357..0b1c176468d 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -12,7 +12,7 @@ use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX}; use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry}; use graph::{ prelude::{ - anyhow::{self, Context as AnyhowContextTrait}, + anyhow::{self, anyhow, Context as AnyhowContextTrait}, info, tokio, Logger, NodeId, }, url::Url, @@ -1198,12 +1198,22 @@ async fn main() -> anyhow::Result<()> { Remove { name } => commands::remove::run(ctx.subgraph_store(), &name), Create { name } => commands::create::run(ctx.subgraph_store(), name), Unassign { deployment } => { - let sender = ctx.notification_sender(); - commands::assign::unassign(ctx.primary_pool(), &sender, &deployment).await + let notifications_sender = ctx.notification_sender(); + let primary_pool = ctx.primary_pool(); + let deployment = make_deployment_selector(deployment); + commands::deployment::unassign::run(primary_pool, notifications_sender, deployment) } Reassign { deployment, node } => { - let sender = ctx.notification_sender(); - commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node) + let notifications_sender = ctx.notification_sender(); + let primary_pool = ctx.primary_pool(); + let deployment = make_deployment_selector(deployment); + let node = NodeId::new(node).map_err(|node| anyhow!("invalid node id {:?}", node))?; + commands::deployment::reassign::run( + primary_pool, + notifications_sender, + deployment, + &node, + ) } Pause { deployment } => { let notifications_sender = ctx.notification_sender(); diff --git a/node/src/manager/commands/deployment/mod.rs b/node/src/manager/commands/deployment/mod.rs index 98910d7b4c4..8fd0237d3a7 100644 --- a/node/src/manager/commands/deployment/mod.rs +++ b/node/src/manager/commands/deployment/mod.rs @@ -1,4 +1,6 @@ pub mod info; pub mod pause; +pub mod reassign; pub mod restart; pub mod resume; +pub mod unassign; diff --git a/node/src/manager/commands/deployment/reassign.rs b/node/src/manager/commands/deployment/reassign.rs new file mode 100644 index 00000000000..60528f16206 --- /dev/null +++ b/node/src/manager/commands/deployment/reassign.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; + +use anyhow::Result; +use graph::prelude::NodeId; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graphman::commands::deployment::reassign::{ + load_deployment, reassign_deployment, ReassignResult, +}; +use graphman::deployment::DeploymentSelector; + +pub fn run( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: DeploymentSelector, + node: &NodeId, +) -> Result<()> { + let deployment = load_deployment(primary_pool.clone(), &deployment)?; + + println!("Reassigning deployment {}", deployment.locator()); + + let reassign_result = + reassign_deployment(primary_pool, notification_sender, &deployment, node)?; + + match reassign_result { + ReassignResult::EmptyResponse => { + println!( + "Deployment {} assigned to node {}", + deployment.locator(), + node + ); + } + ReassignResult::CompletedWithWarnings(warnings) => { + for msg in warnings { + println!("{}", msg); + } + } + } + + Ok(()) +} diff --git a/node/src/manager/commands/deployment/unassign.rs b/node/src/manager/commands/deployment/unassign.rs new file mode 100644 index 00000000000..45567e81f63 --- /dev/null +++ b/node/src/manager/commands/deployment/unassign.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; + +use anyhow::Result; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graphman::commands::deployment::unassign::load_assigned_deployment; +use graphman::commands::deployment::unassign::unassign_deployment; +use graphman::deployment::DeploymentSelector; + +pub fn run( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: DeploymentSelector, +) -> Result<()> { + let assigned_deployment = load_assigned_deployment(primary_pool.clone(), &deployment)?; + + println!("Unassigning deployment {}", assigned_deployment.locator()); + + unassign_deployment(primary_pool, notification_sender, assigned_deployment)?; + + Ok(()) +} diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs index 1e264111baf..8848578ce27 100644 --- a/server/graphman/src/resolvers/deployment_mutation.rs +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -6,8 +6,8 @@ use async_graphql::Object; use async_graphql::Result; use async_graphql::Union; use graph::prelude::NodeId; -use graph_store_postgres::command_support::catalog; use graph_store_postgres::graphman::GraphmanStore; +use graphman::commands::deployment::reassign::ReassignResult; use crate::entities::CompletedWithWarnings; use crate::entities::DeploymentSelector; @@ -119,17 +119,14 @@ impl DeploymentMutation { let ctx = GraphmanContext::new(ctx)?; let deployment = deployment.try_into()?; let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?; - reassign::run(&ctx, &deployment, &node)?; - - let mirror = catalog::Mirror::primary_only(ctx.primary_pool); - let count = mirror.assignments(&node)?.len(); - if count == 1 { - let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str()); - Ok(ReassignResponse::CompletedWithWarnings( - CompletedWithWarnings::new(vec![warning_msg]), - )) - } else { - Ok(ReassignResponse::EmptyResponse(EmptyResponse::new())) + let reassign_result = reassign::run(&ctx, &deployment, &node)?; + match reassign_result { + ReassignResult::CompletedWithWarnings(warnings) => Ok( + ReassignResponse::CompletedWithWarnings(CompletedWithWarnings::new(warnings)), + ), + ReassignResult::EmptyResponse => { + Ok(ReassignResponse::EmptyResponse(EmptyResponse::new())) + } } } } diff --git a/server/graphman/src/resolvers/deployment_mutation/reassign.rs b/server/graphman/src/resolvers/deployment_mutation/reassign.rs index 7bbfb20ba60..3887d67032a 100644 --- a/server/graphman/src/resolvers/deployment_mutation/reassign.rs +++ b/server/graphman/src/resolvers/deployment_mutation/reassign.rs @@ -1,19 +1,24 @@ +use anyhow::Ok; use async_graphql::Result; use graph::prelude::NodeId; use graphman::commands::deployment::reassign::load_deployment; use graphman::commands::deployment::reassign::reassign_deployment; +use graphman::commands::deployment::reassign::ReassignResult; use graphman::deployment::DeploymentSelector; use crate::resolvers::context::GraphmanContext; -pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector, node: &NodeId) -> Result<()> { +pub fn run( + ctx: &GraphmanContext, + deployment: &DeploymentSelector, + node: &NodeId, +) -> Result { let deployment = load_deployment(ctx.primary_pool.clone(), deployment)?; - reassign_deployment( + let reassign_result = reassign_deployment( ctx.primary_pool.clone(), ctx.notification_sender.clone(), - deployment, + &deployment, &node, )?; - - Ok(()) + Ok(reassign_result) }