From 1ef17e1850ef9a3c2a855d3de1391a92830c6b32 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 11:57:49 -0700 Subject: [PATCH 01/18] add configuration for native queries; report queries in schema response --- Cargo.lock | 1 + crates/configuration/Cargo.toml | 1 + crates/configuration/src/configuration.rs | 8 ++++- crates/configuration/src/lib.rs | 1 + .../src/interface_types/mongo_config.rs | 3 ++ .../mongodb-connector/src/mongo_connector.rs | 4 +-- crates/mongodb-connector/src/schema.rs | 32 +++++++++++++++++-- crates/mongodb-connector/src/state.rs | 6 +++- rust-toolchain.toml | 2 +- 9 files changed, 50 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbce74e4..df48f696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,6 +411,7 @@ name = "configuration" version = "0.1.0" dependencies = [ "itertools 0.12.1", + "mongodb", "mongodb-support", "schemars", "serde", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 6824690f..ee99cfd0 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] itertools = "^0.12" +mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } schemars = "^0.8.12" serde = { version = "1", features = ["derive"] } diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index c38671b1..20c49a40 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -3,12 +3,18 @@ use std::{io, path::Path}; use schemars::JsonSchema; use serde::Deserialize; -use crate::{read_directory, Metadata}; +use crate::{read_directory, Metadata, native_queries::NativeQuery}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Configuration { + /// Descriptions of collections and types used in the database pub metadata: Metadata, + + /// Native queries allow arbitrary MongoDB aggregation pipelines where types of results are + /// specified via user configuration. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub native_queries: Vec, } impl Configuration { diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index ba88399d..88abc394 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -1,5 +1,6 @@ mod configuration; pub mod metadata; +pub mod native_queries; mod read_directory; pub use crate::configuration::Configuration; diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs index 2e6815ed..e5cdae13 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs @@ -1,3 +1,4 @@ +use configuration::native_queries::NativeQuery; use mongodb::Client; #[derive(Clone, Debug)] @@ -6,4 +7,6 @@ pub struct MongoConfig { /// Name of the database to connect to pub database: String, + + pub native_queries: Vec, } diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index 6a15e319..691575cc 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -45,10 +45,10 @@ impl Connector for MongoConnector { /// Reads database connection URI from environment variable async fn try_init_state( - _configuration: &Self::Configuration, + configuration: &Self::Configuration, _metrics: &mut prometheus::Registry, ) -> Result { - let state = crate::state::try_init_state().await?; + let state = crate::state::try_init_state(configuration).await?; Ok(state) } diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 965c3f6d..32c98e92 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use configuration::{metadata, Configuration}; +use configuration::{metadata, native_queries::NativeQuery, Configuration}; use ndc_sdk::{connector, models}; use crate::capabilities; @@ -10,9 +10,11 @@ pub async fn get_schema( ) -> Result { let metadata = &config.metadata; let object_types = map_object_types(&metadata.object_types); - let collections = metadata.collections.iter().map(map_collection).collect(); + let configured_collections = metadata.collections.iter().map(map_collection); + let native_queries = config.native_queries.iter().map(map_native_query); + Ok(models::SchemaResponse { - collections, + collections: configured_collections.chain(native_queries).collect(), object_types, scalar_types: capabilities::scalar_types(), functions: Default::default(), @@ -75,3 +77,27 @@ fn map_collection(collection: &metadata::Collection) -> models::CollectionInfo { uniqueness_constraints: Default::default(), } } + +fn map_native_query(query: &NativeQuery) -> models::CollectionInfo { + let arguments = query + .arguments + .iter() + .map(|field| { + ( + field.name.clone(), + models::ArgumentInfo { + argument_type: map_type(&field.r#type), + description: field.description.clone(), + }, + ) + }) + .collect(); + models::CollectionInfo { + name: query.name.clone(), + collection_type: query.result_type.clone(), + uniqueness_constraints: Default::default(), + foreign_keys: Default::default(), + description: query.description.clone(), + arguments, + } +} diff --git a/crates/mongodb-connector/src/state.rs b/crates/mongodb-connector/src/state.rs index 912bcd96..c7c3938a 100644 --- a/crates/mongodb-connector/src/state.rs +++ b/crates/mongodb-connector/src/state.rs @@ -1,12 +1,15 @@ use std::{env, error::Error}; use anyhow::anyhow; +use configuration::Configuration; use mongodb_agent_common::{interface_types::MongoConfig, mongodb_connection::get_mongodb_client}; pub const DATABASE_URI_ENV_VAR: &str = "MONGODB_DATABASE_URI"; /// Reads database connection URI from environment variable -pub async fn try_init_state() -> Result> { +pub async fn try_init_state( + configuration: &Configuration, +) -> Result> { // Splitting this out of the `Connector` impl makes error translation easier let database_uri = env::var(DATABASE_URI_ENV_VAR)?; let client = get_mongodb_client(&database_uri).await?; @@ -19,5 +22,6 @@ pub async fn try_init_state() -> Result Date: Thu, 14 Mar 2024 13:38:54 -0700 Subject: [PATCH 02/18] read schema and native queries from separate files --- Cargo.lock | 2 + crates/configuration/Cargo.toml | 2 + crates/configuration/src/read_directory.rs | 71 +++++++++++++++++----- 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df48f696..cd14b4fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,6 +410,7 @@ dependencies = [ name = "configuration" version = "0.1.0" dependencies = [ + "futures", "itertools 0.12.1", "mongodb", "mongodb-support", @@ -418,6 +419,7 @@ dependencies = [ "serde_json", "serde_yaml", "tokio", + "tokio-stream", ] [[package]] diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index ee99cfd0..bea1d0e8 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +futures = "^0.3" itertools = "^0.12" mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } @@ -12,3 +13,4 @@ serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } serde_yaml = "^0.9" tokio = "1" +tokio-stream = { version = "^0.1", features = ["fs"] } diff --git a/crates/configuration/src/read_directory.rs b/crates/configuration/src/read_directory.rs index 3a72c5a6..01508bd9 100644 --- a/crates/configuration/src/read_directory.rs +++ b/crates/configuration/src/read_directory.rs @@ -1,3 +1,4 @@ +use futures::stream::TryStreamExt as _; use itertools::Itertools as _; use serde::Deserialize; use std::{ @@ -5,10 +6,13 @@ use std::{ path::{Path, PathBuf}, }; use tokio::fs; +use tokio_stream::wrappers::ReadDirStream; -use crate::Configuration; +use crate::{native_queries::NativeQuery, Configuration}; + +pub const METADATA_FILENAME: &str = "metadata"; +pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries"; -pub const CONFIGURATION_FILENAME: &str = "configuration"; pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] = [("json", JSON), ("yaml", YAML), ("yml", YAML)]; @@ -25,29 +29,68 @@ const YAML: FileFormat = FileFormat::Yaml; pub async fn read_directory( configuration_dir: impl AsRef + Send, ) -> io::Result { - parse_file(configuration_dir, CONFIGURATION_FILENAME).await + let dir = configuration_dir.as_ref(); + + let metadata = parse_json_or_yaml(dir, METADATA_FILENAME).await?; + + let native_queries: Vec = + read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)).await?; + + Ok(Configuration { + metadata, + native_queries, + }) +} + +/// Parse all files in a directory with one of the allowed configuration extensions according to +/// the given type argument. For example if `T` is `NativeQuery` this function assumes that all +/// json and yaml files in the given directory should be parsed as native query configurations. +async fn read_subdir_configs(subdir: &Path) -> io::Result> +where + for<'a> T: Deserialize<'a>, +{ + let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); + dir_stream + .try_filter_map(|dir_entry| async move { + // Permits regular files and symlinks, does not filter out symlinks to directories. + let is_file = !(dir_entry.file_type().await?.is_dir()); + if !is_file { + return Ok(None); + } + + let path = dir_entry.path(); + let extension = path.extension().and_then(|ext| ext.to_str()); + + let format_option = extension + .and_then(|ext| { + CONFIGURATION_EXTENSIONS + .iter() + .find(|(expected_ext, _)| ext == *expected_ext) + }) + .map(|(_, format)| *format); + + Ok(format_option.map(|format| (path, format))) + }) + .and_then(|(path, format)| async move { parse_config_file::(path, format).await }) + .try_collect::>() + .await } /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc; reads the file; and parses it according to its extension. -async fn parse_file(configuration_dir: impl AsRef, basename: &str) -> io::Result +async fn parse_json_or_yaml(configuration_dir: &Path, basename: &str) -> io::Result where for<'a> T: Deserialize<'a>, { let (path, format) = find_file(configuration_dir, basename).await?; - read_file(path, format).await + parse_config_file(path, format).await } /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc, and returns the found path with its file format. -async fn find_file( - configuration_dir: impl AsRef, - basename: &str, -) -> io::Result<(PathBuf, FileFormat)> { - let dir = configuration_dir.as_ref(); - +async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> { for (extension, format) in CONFIGURATION_EXTENSIONS { - let path = dir.join(format!("{basename}.{extension}")); + let path = configuration_dir.join(format!("{basename}.{extension}")); if fs::try_exists(&path).await? { return Ok((path, format)); } @@ -57,7 +100,7 @@ async fn find_file( io::ErrorKind::NotFound, format!( "could not find file, {:?}", - dir.join(format!( + configuration_dir.join(format!( "{basename}.{{{}}}", CONFIGURATION_EXTENSIONS .into_iter() @@ -68,7 +111,7 @@ async fn find_file( )) } -async fn read_file(path: impl AsRef, format: FileFormat) -> io::Result +async fn parse_config_file(path: impl AsRef, format: FileFormat) -> io::Result where for<'a> T: Deserialize<'a>, { From 9ebfc75260919fcde85607b10dbd3d29b8d70f22 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 13:45:25 -0700 Subject: [PATCH 03/18] rename metadata to schema --- crates/configuration/src/configuration.rs | 4 ++-- crates/configuration/src/lib.rs | 4 ++-- crates/configuration/src/native_queries.rs | 4 ++-- crates/configuration/src/read_directory.rs | 6 ++--- .../src/{metadata => schema}/database.rs | 0 .../src/{metadata => schema}/mod.rs | 2 +- crates/mongodb-connector/src/schema.rs | 24 +++++++++---------- 7 files changed, 22 insertions(+), 22 deletions(-) rename crates/configuration/src/{metadata => schema}/database.rs (100%) rename crates/configuration/src/{metadata => schema}/mod.rs (94%) diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 20c49a40..a8b76d48 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -3,13 +3,13 @@ use std::{io, path::Path}; use schemars::JsonSchema; use serde::Deserialize; -use crate::{read_directory, Metadata, native_queries::NativeQuery}; +use crate::{read_directory, Schema, native_queries::NativeQuery}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Configuration { /// Descriptions of collections and types used in the database - pub metadata: Metadata, + pub schema: Schema, /// Native queries allow arbitrary MongoDB aggregation pipelines where types of results are /// specified via user configuration. diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index 88abc394..0e49b042 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -1,8 +1,8 @@ mod configuration; -pub mod metadata; +pub mod schema; pub mod native_queries; mod read_directory; pub use crate::configuration::Configuration; -pub use crate::metadata::Metadata; +pub use crate::schema::Schema; pub use crate::read_directory::read_directory; diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 954d5abc..ad91c525 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria}; use schemars::JsonSchema; use serde::Deserialize; -use crate::metadata::ObjectField; +use crate::schema::ObjectField; /// An arbitrary database command using MongoDB's runCommand API. /// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ @@ -13,7 +13,7 @@ pub struct NativeQuery { pub name: String, /// The name of an object type that specifies the type of data returned from the query. This - /// must correspond to a configuration definition in `metadata.objectTypes`. + /// must correspond to a configuration definition in `schema.objectTypes`. pub result_type: String, /// Arguments for per-query customization diff --git a/crates/configuration/src/read_directory.rs b/crates/configuration/src/read_directory.rs index 01508bd9..70eda016 100644 --- a/crates/configuration/src/read_directory.rs +++ b/crates/configuration/src/read_directory.rs @@ -10,7 +10,7 @@ use tokio_stream::wrappers::ReadDirStream; use crate::{native_queries::NativeQuery, Configuration}; -pub const METADATA_FILENAME: &str = "metadata"; +pub const SCHEMA_FILENAME: &str = "schema"; pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries"; pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] = @@ -31,13 +31,13 @@ pub async fn read_directory( ) -> io::Result { let dir = configuration_dir.as_ref(); - let metadata = parse_json_or_yaml(dir, METADATA_FILENAME).await?; + let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?; let native_queries: Vec = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)).await?; Ok(Configuration { - metadata, + schema, native_queries, }) } diff --git a/crates/configuration/src/metadata/database.rs b/crates/configuration/src/schema/database.rs similarity index 100% rename from crates/configuration/src/metadata/database.rs rename to crates/configuration/src/schema/database.rs diff --git a/crates/configuration/src/metadata/mod.rs b/crates/configuration/src/schema/mod.rs similarity index 94% rename from crates/configuration/src/metadata/mod.rs rename to crates/configuration/src/schema/mod.rs index 28751944..8418a210 100644 --- a/crates/configuration/src/metadata/mod.rs +++ b/crates/configuration/src/schema/mod.rs @@ -7,7 +7,7 @@ pub use self::database::{Collection, ObjectField, ObjectType, Type}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] -pub struct Metadata { +pub struct Schema { #[serde(default)] pub collections: Vec, #[serde(default)] diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 32c98e92..577bc76f 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use configuration::{metadata, native_queries::NativeQuery, Configuration}; +use configuration::{native_queries::NativeQuery, schema, Configuration}; use ndc_sdk::{connector, models}; use crate::capabilities; @@ -8,9 +8,9 @@ use crate::capabilities; pub async fn get_schema( config: &Configuration, ) -> Result { - let metadata = &config.metadata; - let object_types = map_object_types(&metadata.object_types); - let configured_collections = metadata.collections.iter().map(map_collection); + let schema = &config.schema; + let object_types = map_object_types(&schema.object_types); + let configured_collections = schema.collections.iter().map(map_collection); let native_queries = config.native_queries.iter().map(map_native_query); Ok(models::SchemaResponse { @@ -22,7 +22,7 @@ pub async fn get_schema( }) } -fn map_object_types(object_types: &[metadata::ObjectType]) -> BTreeMap { +fn map_object_types(object_types: &[schema::ObjectType]) -> BTreeMap { object_types .iter() .map(|t| { @@ -37,7 +37,7 @@ fn map_object_types(object_types: &[metadata::ObjectType]) -> BTreeMap BTreeMap { +fn map_field_infos(fields: &[schema::ObjectField]) -> BTreeMap { fields .iter() .map(|f| { @@ -52,22 +52,22 @@ fn map_field_infos(fields: &[metadata::ObjectField]) -> BTreeMap models::Type { +fn map_type(t: &schema::Type) -> models::Type { match t { - metadata::Type::Scalar(t) => models::Type::Named { + schema::Type::Scalar(t) => models::Type::Named { name: t.graphql_name(), }, - metadata::Type::Object(t) => models::Type::Named { name: t.clone() }, - metadata::Type::ArrayOf(t) => models::Type::Array { + schema::Type::Object(t) => models::Type::Named { name: t.clone() }, + schema::Type::ArrayOf(t) => models::Type::Array { element_type: Box::new(map_type(t)), }, - metadata::Type::Nullable(t) => models::Type::Nullable { + schema::Type::Nullable(t) => models::Type::Nullable { underlying_type: Box::new(map_type(t)), }, } } -fn map_collection(collection: &metadata::Collection) -> models::CollectionInfo { +fn map_collection(collection: &schema::Collection) -> models::CollectionInfo { models::CollectionInfo { name: collection.name.clone(), collection_type: collection.r#type.clone(), From b6f625849861e4980869b2411251d8e59ca66f9c Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 13:46:42 -0700 Subject: [PATCH 04/18] update fixture configuration --- fixtures/connector/chinook/configuration.yaml | 16 ---------------- fixtures/connector/chinook/schema.yaml | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 16 deletions(-) delete mode 100644 fixtures/connector/chinook/configuration.yaml create mode 100644 fixtures/connector/chinook/schema.yaml diff --git a/fixtures/connector/chinook/configuration.yaml b/fixtures/connector/chinook/configuration.yaml deleted file mode 100644 index e6aec4d6..00000000 --- a/fixtures/connector/chinook/configuration.yaml +++ /dev/null @@ -1,16 +0,0 @@ -metadata: - collections: - - name: Album - type: Album - - objectTypes: - - name: Album - fields: - - name: _id - type: !scalar objectId - - name: AlbumId - type: !scalar int - - name: ArtistId - type: !scalar int - - name: Title - type: !scalar string diff --git a/fixtures/connector/chinook/schema.yaml b/fixtures/connector/chinook/schema.yaml new file mode 100644 index 00000000..bbb4a52c --- /dev/null +++ b/fixtures/connector/chinook/schema.yaml @@ -0,0 +1,15 @@ +collections: + - name: Album + type: Album + +objectTypes: + - name: Album + fields: + - name: _id + type: !scalar objectId + - name: AlbumId + type: !scalar int + - name: ArtistId + type: !scalar int + - name: Title + type: !scalar string From 039c8408ca11d46926f82aaadeccee0dfd7050d4 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 14:02:20 -0700 Subject: [PATCH 05/18] succeed parsing configuration if no native_queries are present --- crates/configuration/src/read_directory.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/configuration/src/read_directory.rs b/crates/configuration/src/read_directory.rs index 70eda016..4f214ad3 100644 --- a/crates/configuration/src/read_directory.rs +++ b/crates/configuration/src/read_directory.rs @@ -33,8 +33,9 @@ pub async fn read_directory( let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?; - let native_queries: Vec = - read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)).await?; + let native_queries: Vec = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)) + .await? + .unwrap_or_default(); Ok(Configuration { schema, @@ -45,12 +46,16 @@ pub async fn read_directory( /// Parse all files in a directory with one of the allowed configuration extensions according to /// the given type argument. For example if `T` is `NativeQuery` this function assumes that all /// json and yaml files in the given directory should be parsed as native query configurations. -async fn read_subdir_configs(subdir: &Path) -> io::Result> +async fn read_subdir_configs(subdir: &Path) -> io::Result>> where for<'a> T: Deserialize<'a>, { + if !(fs::try_exists(subdir).await?) { + return Ok(None); + } + let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); - dir_stream + let configs = dir_stream .try_filter_map(|dir_entry| async move { // Permits regular files and symlinks, does not filter out symlinks to directories. let is_file = !(dir_entry.file_type().await?.is_dir()); @@ -73,7 +78,9 @@ where }) .and_then(|(path, format)| async move { parse_config_file::(path, format).await }) .try_collect::>() - .await + .await?; + + Ok(Some(configs)) } /// Given a base name, like "connection", looks for files of the form "connection.json", From 12e7b331cbf864556117314e704aa22a2516c4a9 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 16:49:00 -0700 Subject: [PATCH 06/18] add mode property to NativeQuery --- crates/configuration/src/native_queries.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index ad91c525..93a84ccf 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -31,6 +31,18 @@ pub struct NativeQuery { #[serde(default, skip_serializing_if = "Option::is_none")] pub description: Option, + + /// Set to `readWrite` if this native query might modify data in the database. + #[serde(default)] + pub mode: Mode, +} + +#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum Mode { + #[default] + ReadOnly, + ReadWrite, } type Object = serde_json::Map; From 5962ae5c9886a2d2a0f86180cd17c2e23de2af13 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 17:29:01 -0700 Subject: [PATCH 07/18] represent native queries as functions and procedures in schema --- crates/configuration/src/native_queries.rs | 9 ++-- crates/mongodb-connector/src/schema.rs | 60 ++++++++++++++++++---- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 93a84ccf..633c9ead 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria}; use schemars::JsonSchema; use serde::Deserialize; -use crate::schema::ObjectField; +use crate::schema::{ObjectField, Type}; /// An arbitrary database command using MongoDB's runCommand API. /// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ @@ -12,9 +12,8 @@ pub struct NativeQuery { /// Name that will be used to identify the query in your data graph pub name: String, - /// The name of an object type that specifies the type of data returned from the query. This - /// must correspond to a configuration definition in `schema.objectTypes`. - pub result_type: String, + /// Type of data returned by the query. + pub result_type: Type, /// Arguments for per-query customization pub arguments: Vec, @@ -37,7 +36,7 @@ pub struct NativeQuery { pub mode: Mode, } -#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub enum Mode { #[default] diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 577bc76f..5cb4a7a5 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,6 +1,9 @@ use std::collections::BTreeMap; -use configuration::{native_queries::NativeQuery, schema, Configuration}; +use configuration::{ + native_queries::{self, NativeQuery}, + schema, Configuration, +}; use ndc_sdk::{connector, models}; use crate::capabilities; @@ -10,15 +13,28 @@ pub async fn get_schema( ) -> Result { let schema = &config.schema; let object_types = map_object_types(&schema.object_types); - let configured_collections = schema.collections.iter().map(map_collection); - let native_queries = config.native_queries.iter().map(map_native_query); + let collections = schema.collections.iter().map(map_collection).collect(); + + let functions = config + .native_queries + .iter() + .filter(|q| q.mode == native_queries::Mode::ReadOnly) + .map(native_query_to_function) + .collect(); + + let procedures = config + .native_queries + .iter() + .filter(|q| q.mode == native_queries::Mode::ReadWrite) + .map(native_query_to_procedure) + .collect(); Ok(models::SchemaResponse { - collections: configured_collections.chain(native_queries).collect(), + collections, object_types, scalar_types: capabilities::scalar_types(), - functions: Default::default(), - procedures: Default::default(), + functions, + procedures, }) } @@ -78,7 +94,8 @@ fn map_collection(collection: &schema::Collection) -> models::CollectionInfo { } } -fn map_native_query(query: &NativeQuery) -> models::CollectionInfo { +/// For read-only native queries +fn native_query_to_function(query: &NativeQuery) -> models::FunctionInfo { let arguments = query .arguments .iter() @@ -92,12 +109,33 @@ fn map_native_query(query: &NativeQuery) -> models::CollectionInfo { ) }) .collect(); - models::CollectionInfo { + models::FunctionInfo { + name: query.name.clone(), + description: query.description.clone(), + arguments, + result_type: map_type(&query.result_type), + } +} + +/// For read-write native queries +fn native_query_to_procedure(query: &NativeQuery) -> models::ProcedureInfo { + let arguments = query + .arguments + .iter() + .map(|field| { + ( + field.name.clone(), + models::ArgumentInfo { + argument_type: map_type(&field.r#type), + description: field.description.clone(), + }, + ) + }) + .collect(); + models::ProcedureInfo { name: query.name.clone(), - collection_type: query.result_type.clone(), - uniqueness_constraints: Default::default(), - foreign_keys: Default::default(), description: query.description.clone(), arguments, + result_type: map_type(&query.result_type), } } From 178695ad78e606d5f4cfef112ec0b33d6174d357 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 17:40:09 -0700 Subject: [PATCH 08/18] execute native query --- .../src/query/execute_native_query_request.rs | 31 +++++++++++++++++++ crates/mongodb-agent-common/src/query/mod.rs | 21 ++++++++++--- 2 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 crates/mongodb-agent-common/src/query/execute_native_query_request.rs diff --git a/crates/mongodb-agent-common/src/query/execute_native_query_request.rs b/crates/mongodb-agent-common/src/query/execute_native_query_request.rs new file mode 100644 index 00000000..ff603dd0 --- /dev/null +++ b/crates/mongodb-agent-common/src/query/execute_native_query_request.rs @@ -0,0 +1,31 @@ +use configuration::native_queries::NativeQuery; +use dc_api::JsonResponse; +use dc_api_types::{QueryResponse, ResponseFieldValue, RowSet}; +use mongodb::Database; + +use crate::interface_types::MongoAgentError; + +pub async fn handle_native_query_request( + native_query: NativeQuery, + database: Database, +) -> Result, MongoAgentError> { + let result = database + .run_command(native_query.command, native_query.selection_criteria) + .await?; + let result_json = + serde_json::to_value(result).map_err(|err| MongoAgentError::AdHoc(err.into()))?; + + // A function returs a single row with a single column called `__value` + // https://hasura.github.io/ndc-spec/specification/queries/functions.html + let response_row = [( + "__value".to_owned(), + ResponseFieldValue::Column(result_json), + )] + .into_iter() + .collect(); + + Ok(JsonResponse::Value(QueryResponse::Single(RowSet { + aggregates: None, + rows: Some(vec![response_row]), + }))) +} diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index ed0abc68..5f32bee9 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -1,5 +1,6 @@ mod column_ref; mod constants; +mod execute_native_query_request; mod execute_query_request; mod foreach; mod make_selector; @@ -17,7 +18,10 @@ pub use self::{ make_sort::make_sort, pipeline::{is_response_faceted, pipeline_for_non_foreach, pipeline_for_query_request}, }; -use crate::interface_types::{MongoAgentError, MongoConfig}; +use crate::{ + interface_types::{MongoAgentError, MongoConfig}, + query::execute_native_query_request::handle_native_query_request, +}; pub fn collection_name(query_request_target: &Target) -> String { query_request_target.name().join(".") @@ -29,10 +33,17 @@ pub async fn handle_query_request( ) -> Result, MongoAgentError> { tracing::debug!(?config, query_request = %serde_json::to_string(&query_request).unwrap(), "executing query"); - let collection = config - .client - .database(&config.database) - .collection::(&collection_name(&query_request.target)); + let database = config.client.database(&config.database); + + let target = &query_request.target; + if let Some(native_query) = config.native_queries.iter().find(|query| { + let target_name = target.name(); + target_name.len() == 1 && target_name[0] == query.name + }) { + return handle_native_query_request(native_query.clone(), database).await; + } + + let collection = database.collection::(&collection_name(&query_request.target)); execute_query_request(&collection, query_request).await } From c608d1795056dd96af97a05356c7a3080945a234 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 20:26:39 -0700 Subject: [PATCH 09/18] updated fixtures with a very basic native query --- crates/configuration/src/native_queries.rs | 7 +- .../chinook/native_queries/hello.yaml | 4 + fixtures/connector/chinook/schema.json | 742 ++++++++++++++++++ fixtures/connector/chinook/schema.yaml | 15 - .../ddn/subgraphs/chinook/commands/Hello.hml | 23 + .../chinook/dataconnectors/mongodb.hml | 6 +- 6 files changed, 780 insertions(+), 17 deletions(-) create mode 100644 fixtures/connector/chinook/native_queries/hello.yaml create mode 100644 fixtures/connector/chinook/schema.json delete mode 100644 fixtures/connector/chinook/schema.yaml create mode 100644 fixtures/ddn/subgraphs/chinook/commands/Hello.hml diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 633c9ead..3a92d94a 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -16,6 +16,7 @@ pub struct NativeQuery { pub result_type: Type, /// Arguments for per-query customization + #[serde(default)] pub arguments: Vec, /// Command to run expressed as a BSON document @@ -31,7 +32,11 @@ pub struct NativeQuery { #[serde(default, skip_serializing_if = "Option::is_none")] pub description: Option, - /// Set to `readWrite` if this native query might modify data in the database. + /// Set to `readWrite` if this native query might modify data in the database. When refreshing + /// a dataconnector native queries will appear in the corresponding `DataConnectorLink` + /// definition as `functions` if they are read-only, or as `procedures` if they are read-rite. + /// Functions are intended to map to GraphQL Query fields, while procedures map to Mutation + /// fields. #[serde(default)] pub mode: Mode, } diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml new file mode 100644 index 00000000..2f440194 --- /dev/null +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -0,0 +1,4 @@ +name: hello +result_type: !scalar string +command: + hello: 1 diff --git a/fixtures/connector/chinook/schema.json b/fixtures/connector/chinook/schema.json new file mode 100644 index 00000000..4c7ee983 --- /dev/null +++ b/fixtures/connector/chinook/schema.json @@ -0,0 +1,742 @@ +{ + "collections": [ + { + "name": "Invoice", + "type": "Invoice", + "description": null + }, + { + "name": "Track", + "type": "Track", + "description": null + }, + { + "name": "MediaType", + "type": "MediaType", + "description": null + }, + { + "name": "InvoiceLine", + "type": "InvoiceLine", + "description": null + }, + { + "name": "Employee", + "type": "Employee", + "description": null + }, + { + "name": "PlaylistTrack", + "type": "PlaylistTrack", + "description": null + }, + { + "name": "Album", + "type": "Album", + "description": null + }, + { + "name": "Genre", + "type": "Genre", + "description": null + }, + { + "name": "Artist", + "type": "Artist", + "description": null + }, + { + "name": "Playlist", + "type": "Playlist", + "description": null + }, + { + "name": "Customer", + "type": "Customer", + "description": null + } + ], + "objectTypes": [ + { + "name": "Invoice", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "BillingAddress", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingCity", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingCountry", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingPostalCode", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingState", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "CustomerId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "InvoiceDate", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "InvoiceId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Total", + "type": { + "scalar": "double" + }, + "description": null + } + ], + "description": "Object type for collection Invoice" + }, + { + "name": "Track", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "AlbumId", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + }, + { + "name": "Bytes", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + }, + { + "name": "Composer", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "GenreId", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + }, + { + "name": "MediaTypeId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Milliseconds", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "TrackId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "UnitPrice", + "type": { + "scalar": "double" + }, + "description": null + } + ], + "description": "Object type for collection Track" + }, + { + "name": "MediaType", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "MediaTypeId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection MediaType" + }, + { + "name": "InvoiceLine", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "InvoiceId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "InvoiceLineId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Quantity", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "TrackId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "UnitPrice", + "type": { + "scalar": "double" + }, + "description": null + } + ], + "description": "Object type for collection InvoiceLine" + }, + { + "name": "Employee", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "Address", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BirthDate", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "City", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Country", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Email", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "EmployeeId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Fax", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "FirstName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "HireDate", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "LastName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "Phone", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "PostalCode", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "ReportsTo", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "State", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Title", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection Employee" + }, + { + "name": "PlaylistTrack", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "PlaylistId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "TrackId", + "type": { + "scalar": "int" + }, + "description": null + } + ], + "description": "Object type for collection PlaylistTrack" + }, + { + "name": "Album", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "AlbumId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "ArtistId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Title", + "type": { + "scalar": "string" + }, + "description": null + } + ], + "description": "Object type for collection Album" + }, + { + "name": "Genre", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "GenreId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection Genre" + }, + { + "name": "Artist", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "ArtistId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection Artist" + }, + { + "name": "Playlist", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "PlaylistId", + "type": { + "scalar": "int" + }, + "description": null + } + ], + "description": "Object type for collection Playlist" + }, + { + "name": "Customer", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "Address", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "City", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Company", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Country", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "CustomerId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Email", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "Fax", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "FirstName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "LastName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "Phone", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "PostalCode", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "State", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "SupportRepId", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + } + ], + "description": "Object type for collection Customer" + } + ] +} \ No newline at end of file diff --git a/fixtures/connector/chinook/schema.yaml b/fixtures/connector/chinook/schema.yaml deleted file mode 100644 index bbb4a52c..00000000 --- a/fixtures/connector/chinook/schema.yaml +++ /dev/null @@ -1,15 +0,0 @@ -collections: - - name: Album - type: Album - -objectTypes: - - name: Album - fields: - - name: _id - type: !scalar objectId - - name: AlbumId - type: !scalar int - - name: ArtistId - type: !scalar int - - name: Title - type: !scalar string diff --git a/fixtures/ddn/subgraphs/chinook/commands/Hello.hml b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml new file mode 100644 index 00000000..c194a5ba --- /dev/null +++ b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml @@ -0,0 +1,23 @@ +kind: Command +version: v1 +definition: + name: hello + description: Example of a read-only native query + outputType: String + arguments: [] + source: + dataConnectorName: mongodb + dataConnectorCommand: + function: hello + graphql: + rootFieldName: hello + rootFieldKind: Query + +--- +kind: CommandPermissions +version: v1 +definition: + commandName: hello + permissions: + - role: admin + allowExecution: true diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index 4167b898..7127da8b 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -994,7 +994,11 @@ definition: unique_columns: - _id foreign_keys: {} - functions: [] + functions: + - name: hello + result_type: { type: named, name: String } + arguments: {} + command: { hello: 1 } procedures: [] capabilities: version: ^0.1.0 From e9f042f95c2f437dabe15dbcecc94209723dc526 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 21:35:25 -0700 Subject: [PATCH 10/18] add more context to configuration read errors --- Cargo.lock | 1 + crates/configuration/Cargo.toml | 1 + crates/configuration/src/configuration.rs | 4 +- crates/configuration/src/directory.rs | 54 ++++++++++--------- .../mongodb-connector/src/mongo_connector.rs | 4 +- 5 files changed, 35 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 875e0dda..6401d568 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,6 +410,7 @@ dependencies = [ name = "configuration" version = "0.1.0" dependencies = [ + "anyhow", "futures", "itertools 0.12.1", "mongodb", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index bea1d0e8..8db65e2e 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1" futures = "^0.3" itertools = "^0.12" mongodb = "2.8" diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 3a3549f9..1081ef26 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -1,4 +1,4 @@ -use std::{io, path::Path}; +use std::path::Path; use schemars::JsonSchema; use serde::Deserialize; @@ -27,7 +27,7 @@ impl Configuration { pub async fn parse_configuration( configuration_dir: impl AsRef + Send, - ) -> io::Result { + ) -> anyhow::Result { read_directory(configuration_dir).await } } diff --git a/crates/configuration/src/directory.rs b/crates/configuration/src/directory.rs index f80d4b23..2ec5618e 100644 --- a/crates/configuration/src/directory.rs +++ b/crates/configuration/src/directory.rs @@ -1,10 +1,8 @@ +use anyhow::{anyhow, Context as _}; use futures::stream::TryStreamExt as _; use itertools::Itertools as _; use serde::{Deserialize, Serialize}; -use std::{ - io, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; use tokio::fs; use tokio_stream::wrappers::ReadDirStream; @@ -29,7 +27,7 @@ const YAML: FileFormat = FileFormat::Yaml; /// Read configuration from a directory pub async fn read_directory( configuration_dir: impl AsRef + Send, -) -> io::Result { +) -> anyhow::Result { let dir = configuration_dir.as_ref(); let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?; @@ -47,7 +45,7 @@ pub async fn read_directory( /// Parse all files in a directory with one of the allowed configuration extensions according to /// the given type argument. For example if `T` is `NativeQuery` this function assumes that all /// json and yaml files in the given directory should be parsed as native query configurations. -async fn read_subdir_configs(subdir: &Path) -> io::Result>> +async fn read_subdir_configs(subdir: &Path) -> anyhow::Result>> where for<'a> T: Deserialize<'a>, { @@ -57,6 +55,7 @@ where let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); let configs = dir_stream + .map_err(|err| err.into()) .try_filter_map(|dir_entry| async move { // Permits regular files and symlinks, does not filter out symlinks to directories. let is_file = !(dir_entry.file_type().await?.is_dir()); @@ -86,7 +85,7 @@ where /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc; reads the file; and parses it according to its extension. -async fn parse_json_or_yaml(configuration_dir: &Path, basename: &str) -> io::Result +async fn parse_json_or_yaml(configuration_dir: &Path, basename: &str) -> anyhow::Result where for<'a> T: Deserialize<'a>, { @@ -96,7 +95,10 @@ where /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc, and returns the found path with its file format. -async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> { +async fn find_file( + configuration_dir: &Path, + basename: &str, +) -> anyhow::Result<(PathBuf, FileFormat)> { for (extension, format) in CONFIGURATION_EXTENSIONS { let path = configuration_dir.join(format!("{basename}.{extension}")); if fs::try_exists(&path).await? { @@ -104,30 +106,28 @@ async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(Path } } - Err(io::Error::new( - io::ErrorKind::NotFound, - format!( - "could not find file, {:?}", - configuration_dir.join(format!( - "{basename}.{{{}}}", - CONFIGURATION_EXTENSIONS - .into_iter() - .map(|(ext, _)| ext) - .join(",") - )) - ), + Err(anyhow!( + "could not find file, {:?}", + configuration_dir.join(format!( + "{basename}.{{{}}}", + CONFIGURATION_EXTENSIONS + .into_iter() + .map(|(ext, _)| ext) + .join(",") + )) )) } -async fn parse_config_file(path: impl AsRef, format: FileFormat) -> io::Result +async fn parse_config_file(path: impl AsRef, format: FileFormat) -> anyhow::Result where for<'a> T: Deserialize<'a>, { let bytes = fs::read(path.as_ref()).await?; let value = match format { - FileFormat::Json => serde_json::from_slice(&bytes)?, + FileFormat::Json => serde_json::from_slice(&bytes) + .with_context(|| format!("error parsing {:?}", path.as_ref()))?, FileFormat::Yaml => serde_yaml::from_slice(&bytes) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, + .with_context(|| format!("error parsing {:?}", path.as_ref()))?, }; Ok(value) } @@ -136,7 +136,7 @@ where pub async fn write_directory( configuration_dir: impl AsRef, configuration: &Configuration, -) -> io::Result<()> { +) -> anyhow::Result<()> { write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await } @@ -149,11 +149,13 @@ async fn write_file( configuration_dir: impl AsRef, basename: &str, value: &T, -) -> io::Result<()> +) -> anyhow::Result<()> where T: Serialize, { let path = default_file_path(configuration_dir, basename); let bytes = serde_json::to_vec_pretty(value)?; - fs::write(path, bytes).await + fs::write(path.clone(), bytes) + .await + .with_context(|| format!("error writing {:?}", path)) } diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index e7e18b2d..f23c4338 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -39,7 +39,9 @@ impl Connector for MongoConnector { async fn parse_configuration( configuration_dir: impl AsRef + Send, ) -> Result { - let configuration = Configuration::parse_configuration(configuration_dir).await?; + let configuration = Configuration::parse_configuration(configuration_dir) + .await + .map_err(|err| ParseError::Other(err.into()))?; Ok(configuration) } From f5d4f623e79eddecc9ad3d10f94f3e24ad254e8f Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 21:39:58 -0700 Subject: [PATCH 11/18] add objectTypes field to native queries --- crates/configuration/src/native_queries.rs | 12 +++++- crates/mongodb-connector/src/schema.rs | 37 ++++++++++++------- .../chinook/native_queries/hello.yaml | 10 ++++- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 3a92d94a..19f54f69 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria}; use schemars::JsonSchema; use serde::Deserialize; -use crate::schema::{ObjectField, Type}; +use crate::schema::{ObjectField, Type, ObjectType}; /// An arbitrary database command using MongoDB's runCommand API. /// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ @@ -12,7 +12,15 @@ pub struct NativeQuery { /// Name that will be used to identify the query in your data graph pub name: String, - /// Type of data returned by the query. + /// You may define object types here to reference in `result_type`. Any types defined here will + /// be merged with the definitions in `schema.json`. This allows you to maintain hand-written + /// types for native queries without having to edit a generated `schema.json` file. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub object_types: Vec, + + /// Type of data returned by the query. You may reference object types defined in the + /// `object_types` list in this definition, or you may reference object types from + /// `schema.json`. pub result_type: Type, /// Arguments for per-query customization diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 5cb4a7a5..c06131a2 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -12,9 +12,19 @@ pub async fn get_schema( config: &Configuration, ) -> Result { let schema = &config.schema; - let object_types = map_object_types(&schema.object_types); let collections = schema.collections.iter().map(map_collection).collect(); + let object_types_from_schema = map_object_types(&schema.object_types); + let object_types_from_native_queries = map_object_types( + config + .native_queries + .iter() + .flat_map(|native_query| &native_query.object_types), + ); + let object_types = object_types_from_schema + .chain(object_types_from_native_queries) + .collect(); + let functions = config .native_queries .iter() @@ -38,19 +48,18 @@ pub async fn get_schema( }) } -fn map_object_types(object_types: &[schema::ObjectType]) -> BTreeMap { - object_types - .iter() - .map(|t| { - ( - t.name.clone(), - models::ObjectType { - fields: map_field_infos(&t.fields), - description: t.description.clone(), - }, - ) - }) - .collect() +fn map_object_types<'a>( + object_types: impl IntoIterator + 'a, +) -> impl Iterator + 'a { + object_types.into_iter().map(|t| { + ( + t.name.clone(), + models::ObjectType { + fields: map_field_infos(&t.fields), + description: t.description.clone(), + }, + ) + }) } fn map_field_infos(fields: &[schema::ObjectField]) -> BTreeMap { diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml index 2f440194..c3523342 100644 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -1,4 +1,12 @@ name: hello -result_type: !scalar string +objectTypes: + - name: helloResult + fields: + - name: ok + type: !scalar int + - name: readOnly + type: !scalar bool + # There are more fields but you get the idea +resultType: !object helloResult command: hello: 1 From 5385363b4424e90e17e28f924167e621ae02f144 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 21:53:39 -0700 Subject: [PATCH 12/18] update fixtures with object types --- .../chinook/native_queries/hello.yaml | 4 +-- .../ddn/subgraphs/chinook/commands/Hello.hml | 34 +++++++++++++++++-- .../chinook/dataconnectors/mongodb.hml | 8 ++++- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml index c3523342..36b14855 100644 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -1,12 +1,12 @@ name: hello objectTypes: - - name: helloResult + - name: HelloResult fields: - name: ok type: !scalar int - name: readOnly type: !scalar bool # There are more fields but you get the idea -resultType: !object helloResult +resultType: !object HelloResult command: hello: 1 diff --git a/fixtures/ddn/subgraphs/chinook/commands/Hello.hml b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml index c194a5ba..cfdebd65 100644 --- a/fixtures/ddn/subgraphs/chinook/commands/Hello.hml +++ b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml @@ -3,16 +3,21 @@ version: v1 definition: name: hello description: Example of a read-only native query - outputType: String + outputType: HelloResult arguments: [] source: dataConnectorName: mongodb dataConnectorCommand: function: hello + typeMapping: + HelloResult: + fieldMapping: + ok: { column: ok } + readOnly: { column: readOnly } graphql: rootFieldName: hello rootFieldKind: Query - + --- kind: CommandPermissions version: v1 @@ -21,3 +26,28 @@ definition: permissions: - role: admin allowExecution: true + +--- +kind: ObjectType +version: v1 +definition: + name: HelloResult + graphql: + typeName: HelloResult + fields: + - name: ok + type: Int! + - name: readOnly + type: Boolean! + +--- +kind: TypePermissions +version: v1 +definition: + typeName: HelloResult + permissions: + - role: admin + output: + allowedFields: + - ok + - readOnly diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index 7127da8b..4eb5585b 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -905,6 +905,12 @@ definition: underlying_type: type: named name: ObjectId + HelloResult: + fields: + ok: + type: { type: named, name: Int } + readOnly: + type: { type: named, name: Boolean } collections: - name: Album arguments: {} @@ -996,7 +1002,7 @@ definition: foreign_keys: {} functions: - name: hello - result_type: { type: named, name: String } + result_type: { type: named, name: HelloResult } arguments: {} command: { hello: 1 } procedures: [] From e6b32b12024665318c136eac5c3733ccb3fac83a Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 18 Mar 2024 11:43:07 -0700 Subject: [PATCH 13/18] fix a typo --- crates/configuration/src/native_queries.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 94688cb4..6153a4bf 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -42,7 +42,7 @@ pub struct NativeQuery { /// Set to `readWrite` if this native query might modify data in the database. When refreshing /// a dataconnector native queries will appear in the corresponding `DataConnectorLink` - /// definition as `functions` if they are read-only, or as `procedures` if they are read-rite. + /// definition as `functions` if they are read-only, or as `procedures` if they are read-write. /// Functions are intended to map to GraphQL Query fields, while procedures map to Mutation /// fields. #[serde(default)] From 0d098d3392129256f42d760f4a5df312fb7c75e8 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 18 Mar 2024 12:49:33 -0700 Subject: [PATCH 14/18] check for duplicate names when parsing configuration --- crates/cli/src/lib.rs | 2 +- crates/configuration/src/configuration.rs | 92 ++++++++++++++++++++++- crates/configuration/src/directory.rs | 5 +- crates/mongodb-connector/src/schema.rs | 32 +++----- 4 files changed, 99 insertions(+), 32 deletions(-) diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index b37c4ee2..88eedde5 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -32,7 +32,7 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> { /// Update the configuration in the current directory by introspecting the database. async fn update(context: &Context) -> anyhow::Result<()> { let schema = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?; - let configuration = Configuration::from_schema(schema); + let configuration = Configuration::from_schema(schema)?; configuration::write_directory(&context.path, &configuration).await?; diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 1081ef26..8a5bd1a6 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -1,9 +1,11 @@ use std::path::Path; +use anyhow::ensure; +use itertools::Itertools; use schemars::JsonSchema; use serde::Deserialize; -use crate::{native_queries::NativeQuery, read_directory, Schema}; +use crate::{native_queries::NativeQuery, read_directory, schema::ObjectType, Schema}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] @@ -18,11 +20,45 @@ pub struct Configuration { } impl Configuration { - pub fn from_schema(schema: Schema) -> Self { - Self { + pub fn validate(schema: Schema, native_queries: Vec) -> anyhow::Result { + let config = Configuration { schema, - ..Default::default() + native_queries, + }; + + { + let duplicate_type_names: Vec<&str> = config + .object_types() + .map(|t| t.name.as_ref()) + .duplicates() + .collect(); + ensure!( + duplicate_type_names.is_empty(), + "configuration contains multiple definitions for these object type names: {}", + duplicate_type_names.join(", ") + ); + } + + { + let duplicate_collection_names: Vec<&str> = config + .schema + .collections + .iter() + .map(|c| c.name.as_ref()) + .duplicates() + .collect(); + ensure!( + duplicate_collection_names.is_empty(), + "configuration contains multiple definitions for these collection names: {}", + duplicate_collection_names.join(", ") + ); } + + Ok(config) + } + + pub fn from_schema(schema: Schema) -> anyhow::Result { + Self::validate(schema, Default::default()) } pub async fn parse_configuration( @@ -30,4 +66,52 @@ impl Configuration { ) -> anyhow::Result { read_directory(configuration_dir).await } + + /// Returns object types collected from schema and native queries + pub fn object_types(&self) -> impl Iterator { + let object_types_from_schema = self.schema.object_types.iter(); + let object_types_from_native_queries = self + .native_queries + .iter() + .flat_map(|native_query| &native_query.object_types); + object_types_from_schema.chain(object_types_from_native_queries) + } +} + +#[cfg(test)] +mod tests { + use mongodb::bson::doc; + + use super::*; + use crate::{schema::Type, Schema}; + + #[test] + fn fails_with_duplicate_object_types() { + let schema = Schema { + collections: Default::default(), + object_types: vec![ObjectType { + name: "Album".to_owned(), + fields: Default::default(), + description: Default::default(), + }], + }; + let native_queries = vec![NativeQuery { + name: "hello".to_owned(), + object_types: vec![ObjectType { + name: "Album".to_owned(), + fields: Default::default(), + description: Default::default(), + }], + result_type: Type::Object("Album".to_owned()), + command: doc! { "command": 1 }, + arguments: Default::default(), + selection_criteria: Default::default(), + description: Default::default(), + mode: Default::default(), + }]; + let result = Configuration::validate(schema, native_queries); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("multiple definitions")); + assert!(error_msg.contains("Album")); + } } diff --git a/crates/configuration/src/directory.rs b/crates/configuration/src/directory.rs index 2ec5618e..fd616e71 100644 --- a/crates/configuration/src/directory.rs +++ b/crates/configuration/src/directory.rs @@ -36,10 +36,7 @@ pub async fn read_directory( .await? .unwrap_or_default(); - Ok(Configuration { - schema, - native_queries, - }) + Configuration::validate(schema, native_queries) } /// Parse all files in a directory with one of the allowed configuration extensions according to diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index c06131a2..d5f265e8 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -13,17 +13,7 @@ pub async fn get_schema( ) -> Result { let schema = &config.schema; let collections = schema.collections.iter().map(map_collection).collect(); - - let object_types_from_schema = map_object_types(&schema.object_types); - let object_types_from_native_queries = map_object_types( - config - .native_queries - .iter() - .flat_map(|native_query| &native_query.object_types), - ); - let object_types = object_types_from_schema - .chain(object_types_from_native_queries) - .collect(); + let object_types = config.object_types().map(map_object_type).collect(); let functions = config .native_queries @@ -48,18 +38,14 @@ pub async fn get_schema( }) } -fn map_object_types<'a>( - object_types: impl IntoIterator + 'a, -) -> impl Iterator + 'a { - object_types.into_iter().map(|t| { - ( - t.name.clone(), - models::ObjectType { - fields: map_field_infos(&t.fields), - description: t.description.clone(), - }, - ) - }) +fn map_object_type(object_type: &schema::ObjectType) -> (String, models::ObjectType) { + ( + object_type.name.clone(), + models::ObjectType { + fields: map_field_infos(&object_type.fields), + description: object_type.description.clone(), + }, + ) } fn map_field_infos(fields: &[schema::ObjectField]) -> BTreeMap { From 4100fd77cad7b01b66542366310db34fe8ff5bc9 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 18 Mar 2024 17:49:16 -0700 Subject: [PATCH 15/18] implementation for read-write native queries --- Cargo.lock | 1 + crates/mongodb-connector/Cargo.toml | 1 + crates/mongodb-connector/src/main.rs | 1 + .../mongodb-connector/src/mongo_connector.rs | 10 +- crates/mongodb-connector/src/mutation.rs | 110 ++++++++++++++++++ 5 files changed, 117 insertions(+), 6 deletions(-) create mode 100644 crates/mongodb-connector/src/mutation.rs diff --git a/Cargo.lock b/Cargo.lock index 6401d568..6b300ed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1557,6 +1557,7 @@ dependencies = [ "dc-api-test-helpers", "dc-api-types", "enum-iterator", + "futures", "http", "indexmap 2.2.5", "itertools 0.10.5", diff --git a/crates/mongodb-connector/Cargo.toml b/crates/mongodb-connector/Cargo.toml index 0632b67c..36a21468 100644 --- a/crates/mongodb-connector/Cargo.toml +++ b/crates/mongodb-connector/Cargo.toml @@ -10,6 +10,7 @@ configuration = { path = "../configuration" } dc-api = { path = "../dc-api" } dc-api-types = { path = "../dc-api-types" } enum-iterator = "1.4.1" +futures = "^0.3" http = "^0.2" indexmap = { version = "2.1.0", features = ["serde"] } itertools = "^0.10" diff --git a/crates/mongodb-connector/src/main.rs b/crates/mongodb-connector/src/main.rs index 26c46d0b..aadcefad 100644 --- a/crates/mongodb-connector/src/main.rs +++ b/crates/mongodb-connector/src/main.rs @@ -2,6 +2,7 @@ mod api_type_conversions; mod capabilities; mod error_mapping; mod mongo_connector; +mod mutation; mod schema; use std::error::Error; diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index f23c4338..77f16cc5 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -19,7 +19,6 @@ use ndc_sdk::{ }, }; -use crate::capabilities::mongo_capabilities_response; use crate::{ api_type_conversions::{ v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, QueryContext, @@ -27,6 +26,7 @@ use crate::{ capabilities::scalar_types, error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error}, }; +use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request}; #[derive(Clone, Default)] pub struct MongoConnector; @@ -115,12 +115,10 @@ impl Connector for MongoConnector { async fn mutation( _configuration: &Self::Configuration, - _state: &Self::State, - _request: MutationRequest, + state: &Self::State, + request: MutationRequest, ) -> Result, MutationError> { - Err(MutationError::UnsupportedOperation( - "The MongoDB agent does not yet support mutations".to_owned(), - )) + handle_mutation_request(state, request).await } async fn query( diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs new file mode 100644 index 00000000..737842fd --- /dev/null +++ b/crates/mongodb-connector/src/mutation.rs @@ -0,0 +1,110 @@ +use std::collections::BTreeMap; + +use configuration::native_queries::NativeQuery; +use futures::future::try_join_all; +use itertools::Itertools; +use mongodb::Database; +use mongodb_agent_common::interface_types::MongoConfig; +use ndc_sdk::{ + connector::MutationError, + json_response::JsonResponse, + models::{ + MutationOperation, MutationOperationResults, MutationRequest, MutationResponse, NestedField, + }, +}; +use serde_json::Value; + +/// A procedure combined with inputs +#[derive(Clone, Debug)] +#[allow(dead_code)] +struct Job<'a> { + // For the time being all procedures are native queries. + native_query: &'a NativeQuery, + arguments: BTreeMap, + fields: Option, +} + +impl<'a> Job<'a> { + pub fn new( + native_query: &'a NativeQuery, + arguments: BTreeMap, + fields: Option, + ) -> Self { + Job { + native_query, + arguments, + fields, + } + } +} + +pub async fn handle_mutation_request( + config: &MongoConfig, + mutation_request: MutationRequest, +) -> Result, MutationError> { + tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation"); + let database = config.client.database(&config.database); + let jobs = look_up_procedures(config, mutation_request)?; + let operation_results = try_join_all( + jobs.into_iter() + .map(|job| execute_job(database.clone(), job)), + ) + .await?; + Ok(JsonResponse::Value(MutationResponse { operation_results })) +} + +/// Looks up procedures according to the names given in the mutation request, and pairs them with +/// arguments and requested fields. Returns an error if any procedures cannot be found. +fn look_up_procedures( + config: &MongoConfig, + mutation_request: MutationRequest, +) -> Result>, MutationError> { + let (jobs, not_found): (Vec, Vec) = mutation_request + .operations + .into_iter() + .map(|operation| match operation { + MutationOperation::Procedure { + name, + arguments, + fields, + } => { + let native_query = config + .native_queries + .iter() + .find(|native_query| native_query.name == name); + native_query + .ok_or(name) + .map(|nq| Job::new(nq, arguments, fields)) + } + }) + .partition_result(); + + if !not_found.is_empty() { + return Err(MutationError::UnprocessableContent(format!( + "request includes unknown procedures: {}", + not_found.join(", ") + ))); + } + + Ok(jobs) +} + +async fn execute_job( + database: Database, + job: Job<'_>, +) -> Result { + let result = database + .run_command(job.native_query.command.clone(), None) + .await + .map_err(|err| match *err.kind { + mongodb::error::ErrorKind::InvalidArgument { message, .. } => { + MutationError::UnprocessableContent(message) + } + err => MutationError::Other(Box::new(err)), + })?; + let json_result = + serde_json::to_value(result).map_err(|err| MutationError::Other(Box::new(err)))?; + Ok(MutationOperationResults::Procedure { + result: json_result, + }) +} From 6af42b60d6a60ca1e29387e73be6cd7afc350764 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 19 Mar 2024 11:59:21 -0700 Subject: [PATCH 16/18] remove fields from Job if we're not going to use it --- crates/mongodb-connector/src/mutation.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs index 737842fd..2388d952 100644 --- a/crates/mongodb-connector/src/mutation.rs +++ b/crates/mongodb-connector/src/mutation.rs @@ -8,9 +8,7 @@ use mongodb_agent_common::interface_types::MongoConfig; use ndc_sdk::{ connector::MutationError, json_response::JsonResponse, - models::{ - MutationOperation, MutationOperationResults, MutationRequest, MutationResponse, NestedField, - }, + models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse}, }; use serde_json::Value; @@ -21,19 +19,13 @@ struct Job<'a> { // For the time being all procedures are native queries. native_query: &'a NativeQuery, arguments: BTreeMap, - fields: Option, } impl<'a> Job<'a> { - pub fn new( - native_query: &'a NativeQuery, - arguments: BTreeMap, - fields: Option, - ) -> Self { + pub fn new(native_query: &'a NativeQuery, arguments: BTreeMap) -> Self { Job { native_query, arguments, - fields, } } } @@ -64,17 +56,13 @@ fn look_up_procedures( .into_iter() .map(|operation| match operation { MutationOperation::Procedure { - name, - arguments, - fields, + name, arguments, .. } => { let native_query = config .native_queries .iter() .find(|native_query| native_query.name == name); - native_query - .ok_or(name) - .map(|nq| Job::new(nq, arguments, fields)) + native_query.ok_or(name).map(|nq| Job::new(nq, arguments)) } }) .partition_result(); From a092d72233f67dea1f883064034dd10a6b6f7932 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 19 Mar 2024 12:41:53 -0700 Subject: [PATCH 17/18] add example mutation, insertArtist, in fixtures --- .../chinook/native_queries/hello.yaml | 1 + .../chinook/native_queries/insert_artist.yaml | 16 ++++++ .../chinook/commands/InsertArtist.hml | 54 +++++++++++++++++++ .../chinook/dataconnectors/mongodb.hml | 14 ++++- 4 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 fixtures/connector/chinook/native_queries/insert_artist.yaml create mode 100644 fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml index 36b14855..e7b7a575 100644 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -1,4 +1,5 @@ name: hello +description: Example of a read-only native query objectTypes: - name: HelloResult fields: diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml new file mode 100644 index 00000000..3dd16c70 --- /dev/null +++ b/fixtures/connector/chinook/native_queries/insert_artist.yaml @@ -0,0 +1,16 @@ +name: insertArtist +description: Example of a database update using a native query +objectTypes: + - name: InsertArtist + fields: + - name: ok + type: !scalar int + - name: n + type: !scalar int +resultType: !object InsertArtist +# TODO: implement arguments instead of hard-coding inputs +command: + insert: "Artist" + documents: + - ArtistId: 1001 + - Name: Regina Spektor diff --git a/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml new file mode 100644 index 00000000..7b1d3fff --- /dev/null +++ b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml @@ -0,0 +1,54 @@ +kind: Command +version: v1 +definition: + name: insertArtist + description: Example of a database update using a native query + outputType: InsertArtist + arguments: [] + source: + dataConnectorName: mongodb + dataConnectorCommand: + procedure: insertArtist + typeMapping: + InsertArtist: + fieldMapping: + ok: { column: ok } + n: { column: n } + graphql: + rootFieldName: insertArtist + rootFieldKind: Mutation + +--- +kind: CommandPermissions +version: v1 +definition: + commandName: insertArtist + permissions: + - role: admin + allowExecution: true + +--- +kind: ObjectType +version: v1 +definition: + name: InsertArtist + graphql: + typeName: InsertArtist + fields: + - name: ok + type: Int! + - name: n + type: Int! + +--- +kind: TypePermissions +version: v1 +definition: + typeName: InsertArtist + permissions: + - role: admin + output: + allowedFields: + - ok + - n + diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index 4eb5585b..d94ec308 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -911,6 +911,12 @@ definition: type: { type: named, name: Int } readOnly: type: { type: named, name: Boolean } + InsertArtist: + fields: + ok: + type: { type: named, name: Int } + n: + type: { type: named, name: Int } collections: - name: Album arguments: {} @@ -1002,10 +1008,16 @@ definition: foreign_keys: {} functions: - name: hello + description: Example of a read-only native query result_type: { type: named, name: HelloResult } arguments: {} command: { hello: 1 } - procedures: [] + procedures: + - name: insertArtist + description: Example of a database update using a native query + result_type: { type: named, name: InsertArtist } + arguments: {} + command: { insert: Artist, documents: [{ ArtistId: 1001, Name: Regina Spektor }] } capabilities: version: ^0.1.0 capabilities: From 3c7e9dfbc356cefb04a4e7d39701872283caf368 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 20 Mar 2024 09:56:32 -0700 Subject: [PATCH 18/18] fixture was inserting two documents --- fixtures/connector/chinook/native_queries/insert_artist.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml index 3dd16c70..d6803340 100644 --- a/fixtures/connector/chinook/native_queries/insert_artist.yaml +++ b/fixtures/connector/chinook/native_queries/insert_artist.yaml @@ -13,4 +13,4 @@ command: insert: "Artist" documents: - ArtistId: 1001 - - Name: Regina Spektor + Name: Regina Spektor