From 43841fc9492b2d98de0d08800063ecc2a4eb96df Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 20 Mar 2024 10:08:40 -0700 Subject: [PATCH] mutations via native queries (#7) Implements a mutation endpoint to run procedures. I added an example native query to fixtures, `insertArtist`, to demonstrate. I'll follow up with another PR that implements arguments to native queries where I'll replace the hard-coded inputs. I'm having a problem querying the `Artist` collection which I think is unrelated, but I'm going to check that out and likely submit a bug fix PR. I confirmed that `insertArtist` does update the database correctly, and I wanted to get this PR out. Ticket: https://hasurahq.atlassian.net/browse/MDB-86 --- Cargo.lock | 1 + crates/mongodb-connector/Cargo.toml | 1 + crates/mongodb-connector/src/main.rs | 1 + .../mongodb-connector/src/mongo_connector.rs | 10 +- crates/mongodb-connector/src/mutation.rs | 98 +++++++++++++++++++ .../chinook/native_queries/hello.yaml | 1 + .../chinook/native_queries/insert_artist.yaml | 16 +++ .../chinook/commands/InsertArtist.hml | 54 ++++++++++ .../chinook/dataconnectors/mongodb.hml | 14 ++- 9 files changed, 189 insertions(+), 7 deletions(-) create mode 100644 crates/mongodb-connector/src/mutation.rs create mode 100644 fixtures/connector/chinook/native_queries/insert_artist.yaml create mode 100644 fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml diff --git a/Cargo.lock b/Cargo.lock index b752b267..3c784f24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1558,6 +1558,7 @@ dependencies = [ "dc-api-test-helpers", "dc-api-types", "enum-iterator", + "futures", "http", "indexmap 2.2.5", "itertools 0.10.5", diff --git a/crates/mongodb-connector/Cargo.toml b/crates/mongodb-connector/Cargo.toml index 0632b67c..36a21468 100644 --- a/crates/mongodb-connector/Cargo.toml +++ b/crates/mongodb-connector/Cargo.toml @@ -10,6 +10,7 @@ configuration = { path = "../configuration" } dc-api = { path = "../dc-api" } dc-api-types = { path = "../dc-api-types" } enum-iterator = "1.4.1" +futures = "^0.3" http = "^0.2" indexmap = { version = "2.1.0", features = ["serde"] } itertools = "^0.10" diff --git a/crates/mongodb-connector/src/main.rs b/crates/mongodb-connector/src/main.rs index 26c46d0b..aadcefad 100644 --- a/crates/mongodb-connector/src/main.rs +++ b/crates/mongodb-connector/src/main.rs @@ -2,6 +2,7 @@ mod api_type_conversions; mod capabilities; mod error_mapping; mod mongo_connector; +mod mutation; mod schema; use std::error::Error; diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index f23c4338..77f16cc5 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -19,7 +19,6 @@ use ndc_sdk::{ }, }; -use crate::capabilities::mongo_capabilities_response; use crate::{ api_type_conversions::{ v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, QueryContext, @@ -27,6 +26,7 @@ use crate::{ capabilities::scalar_types, error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error}, }; +use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request}; #[derive(Clone, Default)] pub struct MongoConnector; @@ -115,12 +115,10 @@ impl Connector for MongoConnector { async fn mutation( _configuration: &Self::Configuration, - _state: &Self::State, - _request: MutationRequest, + state: &Self::State, + request: MutationRequest, ) -> Result, MutationError> { - Err(MutationError::UnsupportedOperation( - "The MongoDB agent does not yet support mutations".to_owned(), - )) + handle_mutation_request(state, request).await } async fn query( diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs new file mode 100644 index 00000000..2388d952 --- /dev/null +++ b/crates/mongodb-connector/src/mutation.rs @@ -0,0 +1,98 @@ +use std::collections::BTreeMap; + +use configuration::native_queries::NativeQuery; +use futures::future::try_join_all; +use itertools::Itertools; +use mongodb::Database; +use mongodb_agent_common::interface_types::MongoConfig; +use ndc_sdk::{ + connector::MutationError, + json_response::JsonResponse, + models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse}, +}; +use serde_json::Value; + +/// A procedure combined with inputs +#[derive(Clone, Debug)] +#[allow(dead_code)] +struct Job<'a> { + // For the time being all procedures are native queries. + native_query: &'a NativeQuery, + arguments: BTreeMap, +} + +impl<'a> Job<'a> { + pub fn new(native_query: &'a NativeQuery, arguments: BTreeMap) -> Self { + Job { + native_query, + arguments, + } + } +} + +pub async fn handle_mutation_request( + config: &MongoConfig, + mutation_request: MutationRequest, +) -> Result, MutationError> { + tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation"); + let database = config.client.database(&config.database); + let jobs = look_up_procedures(config, mutation_request)?; + let operation_results = try_join_all( + jobs.into_iter() + .map(|job| execute_job(database.clone(), job)), + ) + .await?; + Ok(JsonResponse::Value(MutationResponse { operation_results })) +} + +/// Looks up procedures according to the names given in the mutation request, and pairs them with +/// arguments and requested fields. Returns an error if any procedures cannot be found. +fn look_up_procedures( + config: &MongoConfig, + mutation_request: MutationRequest, +) -> Result>, MutationError> { + let (jobs, not_found): (Vec, Vec) = mutation_request + .operations + .into_iter() + .map(|operation| match operation { + MutationOperation::Procedure { + name, arguments, .. + } => { + let native_query = config + .native_queries + .iter() + .find(|native_query| native_query.name == name); + native_query.ok_or(name).map(|nq| Job::new(nq, arguments)) + } + }) + .partition_result(); + + if !not_found.is_empty() { + return Err(MutationError::UnprocessableContent(format!( + "request includes unknown procedures: {}", + not_found.join(", ") + ))); + } + + Ok(jobs) +} + +async fn execute_job( + database: Database, + job: Job<'_>, +) -> Result { + let result = database + .run_command(job.native_query.command.clone(), None) + .await + .map_err(|err| match *err.kind { + mongodb::error::ErrorKind::InvalidArgument { message, .. } => { + MutationError::UnprocessableContent(message) + } + err => MutationError::Other(Box::new(err)), + })?; + let json_result = + serde_json::to_value(result).map_err(|err| MutationError::Other(Box::new(err)))?; + Ok(MutationOperationResults::Procedure { + result: json_result, + }) +} diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml index 36b14855..e7b7a575 100644 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -1,4 +1,5 @@ name: hello +description: Example of a read-only native query objectTypes: - name: HelloResult fields: diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml new file mode 100644 index 00000000..d6803340 --- /dev/null +++ b/fixtures/connector/chinook/native_queries/insert_artist.yaml @@ -0,0 +1,16 @@ +name: insertArtist +description: Example of a database update using a native query +objectTypes: + - name: InsertArtist + fields: + - name: ok + type: !scalar int + - name: n + type: !scalar int +resultType: !object InsertArtist +# TODO: implement arguments instead of hard-coding inputs +command: + insert: "Artist" + documents: + - ArtistId: 1001 + Name: Regina Spektor diff --git a/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml new file mode 100644 index 00000000..7b1d3fff --- /dev/null +++ b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml @@ -0,0 +1,54 @@ +kind: Command +version: v1 +definition: + name: insertArtist + description: Example of a database update using a native query + outputType: InsertArtist + arguments: [] + source: + dataConnectorName: mongodb + dataConnectorCommand: + procedure: insertArtist + typeMapping: + InsertArtist: + fieldMapping: + ok: { column: ok } + n: { column: n } + graphql: + rootFieldName: insertArtist + rootFieldKind: Mutation + +--- +kind: CommandPermissions +version: v1 +definition: + commandName: insertArtist + permissions: + - role: admin + allowExecution: true + +--- +kind: ObjectType +version: v1 +definition: + name: InsertArtist + graphql: + typeName: InsertArtist + fields: + - name: ok + type: Int! + - name: n + type: Int! + +--- +kind: TypePermissions +version: v1 +definition: + typeName: InsertArtist + permissions: + - role: admin + output: + allowedFields: + - ok + - n + diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index 4eb5585b..d94ec308 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -911,6 +911,12 @@ definition: type: { type: named, name: Int } readOnly: type: { type: named, name: Boolean } + InsertArtist: + fields: + ok: + type: { type: named, name: Int } + n: + type: { type: named, name: Int } collections: - name: Album arguments: {} @@ -1002,10 +1008,16 @@ definition: foreign_keys: {} functions: - name: hello + description: Example of a read-only native query result_type: { type: named, name: HelloResult } arguments: {} command: { hello: 1 } - procedures: [] + procedures: + - name: insertArtist + description: Example of a database update using a native query + result_type: { type: named, name: InsertArtist } + arguments: {} + command: { insert: Artist, documents: [{ ArtistId: 1001, Name: Regina Spektor }] } capabilities: version: ^0.1.0 capabilities: