Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mutations via native queries #7

Merged
merged 21 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1ef17e1
add configuration for native queries; report queries in schema response
hallettj Mar 14, 2024
029ff0f
read schema and native queries from separate files
hallettj Mar 14, 2024
9ebfc75
rename metadata to schema
hallettj Mar 14, 2024
b6f6258
update fixture configuration
hallettj Mar 14, 2024
039c840
succeed parsing configuration if no native_queries are present
hallettj Mar 14, 2024
12e7b33
add mode property to NativeQuery
hallettj Mar 14, 2024
5962ae5
represent native queries as functions and procedures in schema
hallettj Mar 15, 2024
b995bee
Merge branch 'main' into jesse/native-queries
hallettj Mar 15, 2024
178695a
execute native query
hallettj Mar 18, 2024
c608d17
updated fixtures with a very basic native query
hallettj Mar 18, 2024
e9f042f
add more context to configuration read errors
hallettj Mar 18, 2024
f5d4f62
add objectTypes field to native queries
hallettj Mar 18, 2024
5385363
update fixtures with object types
hallettj Mar 18, 2024
1238cc6
Merge branch 'main' into jesse/native-queries-in-query-handler
hallettj Mar 18, 2024
e6b32b1
fix a typo
hallettj Mar 18, 2024
0d098d3
check for duplicate names when parsing configuration
hallettj Mar 18, 2024
4100fd7
implementation for read-write native queries
hallettj Mar 19, 2024
6af42b6
remove fields from Job if we're not going to use it
hallettj Mar 19, 2024
a092d72
add example mutation, insertArtist, in fixtures
hallettj Mar 19, 2024
9189bdf
Merge branch 'main' into jesse/mutations-via-native-queries
hallettj Mar 20, 2024
3c7e9df
fixture was inserting two documents
hallettj Mar 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/mongodb-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ configuration = { path = "../configuration" }
dc-api = { path = "../dc-api" }
dc-api-types = { path = "../dc-api-types" }
enum-iterator = "1.4.1"
futures = "^0.3"
http = "^0.2"
indexmap = { version = "2.1.0", features = ["serde"] }
itertools = "^0.10"
Expand Down
1 change: 1 addition & 0 deletions crates/mongodb-connector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod api_type_conversions;
mod capabilities;
mod error_mapping;
mod mongo_connector;
mod mutation;
mod schema;

use std::error::Error;
Expand Down
10 changes: 4 additions & 6 deletions crates/mongodb-connector/src/mongo_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use ndc_sdk::{
},
};

use crate::capabilities::mongo_capabilities_response;
use crate::{
api_type_conversions::{
v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, QueryContext,
},
capabilities::scalar_types,
error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error},
};
use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request};

#[derive(Clone, Default)]
pub struct MongoConnector;
Expand Down Expand Up @@ -115,12 +115,10 @@ impl Connector for MongoConnector {

async fn mutation(
_configuration: &Self::Configuration,
_state: &Self::State,
_request: MutationRequest,
state: &Self::State,
request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
Err(MutationError::UnsupportedOperation(
"The MongoDB agent does not yet support mutations".to_owned(),
))
handle_mutation_request(state, request).await
}

async fn query(
Expand Down
98 changes: 98 additions & 0 deletions crates/mongodb-connector/src/mutation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::collections::BTreeMap;

use configuration::native_queries::NativeQuery;
use futures::future::try_join_all;
use itertools::Itertools;
use mongodb::Database;
use mongodb_agent_common::interface_types::MongoConfig;
use ndc_sdk::{
connector::MutationError,
json_response::JsonResponse,
models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse},
};
use serde_json::Value;

/// A procedure combined with inputs
#[derive(Clone, Debug)]
#[allow(dead_code)]
struct Job<'a> {
// For the time being all procedures are native queries.
native_query: &'a NativeQuery,
arguments: BTreeMap<String, Value>,
}

impl<'a> Job<'a> {
pub fn new(native_query: &'a NativeQuery, arguments: BTreeMap<String, Value>) -> Self {
Job {
native_query,
arguments,
}
}
}

pub async fn handle_mutation_request(
config: &MongoConfig,
mutation_request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation");
let database = config.client.database(&config.database);
let jobs = look_up_procedures(config, mutation_request)?;
let operation_results = try_join_all(
jobs.into_iter()
.map(|job| execute_job(database.clone(), job)),
)
.await?;
Ok(JsonResponse::Value(MutationResponse { operation_results }))
}

/// Looks up procedures according to the names given in the mutation request, and pairs them with
/// arguments and requested fields. Returns an error if any procedures cannot be found.
fn look_up_procedures(
config: &MongoConfig,
mutation_request: MutationRequest,
) -> Result<Vec<Job<'_>>, MutationError> {
let (jobs, not_found): (Vec<Job>, Vec<String>) = mutation_request
.operations
.into_iter()
.map(|operation| match operation {
MutationOperation::Procedure {
name, arguments, ..
} => {
let native_query = config
.native_queries
.iter()
.find(|native_query| native_query.name == name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on PR #5. We will want a more performant way to look up native queries than linear search. This is within a map now so potentially O(m * n) where m is the number of operations and n is the number of native queries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I filed https://hasurahq.atlassian.net/browse/MDB-91 to follow up

native_query.ok_or(name).map(|nq| Job::new(nq, arguments))
}
})
.partition_result();

if !not_found.is_empty() {
return Err(MutationError::UnprocessableContent(format!(
"request includes unknown procedures: {}",
not_found.join(", ")
)));
}

Ok(jobs)
}

async fn execute_job(
database: Database,
job: Job<'_>,
) -> Result<MutationOperationResults, MutationError> {
let result = database
.run_command(job.native_query.command.clone(), None)
.await
.map_err(|err| match *err.kind {
mongodb::error::ErrorKind::InvalidArgument { message, .. } => {
MutationError::UnprocessableContent(message)
}
err => MutationError::Other(Box::new(err)),
})?;
let json_result =
serde_json::to_value(result).map_err(|err| MutationError::Other(Box::new(err)))?;
Ok(MutationOperationResults::Procedure {
result: json_result,
})
}
1 change: 1 addition & 0 deletions fixtures/connector/chinook/native_queries/hello.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
name: hello
description: Example of a read-only native query
objectTypes:
- name: HelloResult
fields:
Expand Down
16 changes: 16 additions & 0 deletions fixtures/connector/chinook/native_queries/insert_artist.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: insertArtist
description: Example of a database update using a native query
objectTypes:
- name: InsertArtist
fields:
- name: ok
type: !scalar int
- name: n
type: !scalar int
resultType: !object InsertArtist
# TODO: implement arguments instead of hard-coding inputs
command:
insert: "Artist"
documents:
- ArtistId: 1001
Name: Regina Spektor
54 changes: 54 additions & 0 deletions fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
kind: Command
version: v1
definition:
name: insertArtist
description: Example of a database update using a native query
outputType: InsertArtist
arguments: []
source:
dataConnectorName: mongodb
dataConnectorCommand:
procedure: insertArtist
typeMapping:
InsertArtist:
fieldMapping:
ok: { column: ok }
n: { column: n }
graphql:
rootFieldName: insertArtist
rootFieldKind: Mutation

---
kind: CommandPermissions
version: v1
definition:
commandName: insertArtist
permissions:
- role: admin
allowExecution: true

---
kind: ObjectType
version: v1
definition:
name: InsertArtist
graphql:
typeName: InsertArtist
fields:
- name: ok
type: Int!
- name: n
type: Int!

---
kind: TypePermissions
version: v1
definition:
typeName: InsertArtist
permissions:
- role: admin
output:
allowedFields:
- ok
- n

14 changes: 13 additions & 1 deletion fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,12 @@ definition:
type: { type: named, name: Int }
readOnly:
type: { type: named, name: Boolean }
InsertArtist:
fields:
ok:
type: { type: named, name: Int }
n:
type: { type: named, name: Int }
collections:
- name: Album
arguments: {}
Expand Down Expand Up @@ -1002,10 +1008,16 @@ definition:
foreign_keys: {}
functions:
- name: hello
description: Example of a read-only native query
result_type: { type: named, name: HelloResult }
arguments: {}
command: { hello: 1 }
procedures: []
procedures:
- name: insertArtist
description: Example of a database update using a native query
result_type: { type: named, name: InsertArtist }
arguments: {}
command: { insert: Artist, documents: [{ ArtistId: 1001, Name: Regina Spektor }] }
capabilities:
version: ^0.1.0
capabilities:
Expand Down
Loading