Skip to content

Commit

Permalink
translate mutation response according to requested fields (#59)
Browse files Browse the repository at this point in the history
Returns only the requested fields from mutation responses, and applies field aliases.
  • Loading branch information
hallettj authored Apr 27, 2024
1 parent ea3cba7 commit ac982c8
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This changelog documents the changes between release versions.
- In the CLI update command, if the database URI is not provided the error message now mentions the correct environment variable to use (`MONGODB_DATABASE_URI`) ([#50](https://github.com/hasura/ndc-mongodb/pull/50))
- Update to latest NDC SDK ([#51](https://github.com/hasura/ndc-mongodb/pull/51))
- Update `rustls` dependency to fix https://github.com/hasura/ndc-mongodb/security/dependabot/1 ([#51](https://github.com/hasura/ndc-mongodb/pull/51))
- Serialize query and mutation response fields with known types using simple JSON instead of Extended JSON (#53) (#59)

## [0.0.4] - 2024-04-12
- Queries that attempt to compare a column to a column in the query root table, or a related table, will now fail instead of giving the incorrect result ([#22](https://github.com/hasura/ndc-mongodb/pull/22))
Expand Down
2 changes: 1 addition & 1 deletion crates/integration-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl From<&str> for GraphQLRequest {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct GraphQLResponse {
data: Value,
errors: Option<Vec<Value>>,
Expand Down
19 changes: 16 additions & 3 deletions crates/integration-tests/src/tests/native_procedure.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::query;
use crate::{query, GraphQLResponse};
use insta::assert_yaml_snapshot;
use serde_json::json;

Expand All @@ -9,13 +9,13 @@ async fn updates_with_native_procedure() -> anyhow::Result<()> {
let mutation = r#"
mutation InsertArtist($id: Int!, $name: String!) {
insertArtist(id: $id, name: $name) {
n
number_of_docs_inserted: n
ok
}
}
"#;

query(mutation)
let res1 = query(mutation)
.variables(json!({ "id": id_1, "name": "Regina Spektor" }))
.run()
.await?;
Expand All @@ -24,6 +24,19 @@ async fn updates_with_native_procedure() -> anyhow::Result<()> {
.run()
.await?;

assert_eq!(
res1,
GraphQLResponse {
data: json!({
"insertArtist": {
"number_of_docs_inserted": 1,
"ok": 1.0,
}
}),
errors: None,
}
);

assert_yaml_snapshot!(
query(
r#"
Expand Down
5 changes: 3 additions & 2 deletions crates/mongodb-connector/src/mongo_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Connector for MongoConnector {
_request: MutationRequest,
) -> Result<JsonResponse<ExplainResponse>, ExplainError> {
Err(ExplainError::UnsupportedOperation(
"The MongoDB agent does not yet support mutations".to_owned(),
"Explain for mutations is not implemented yet".to_owned(),
))
}

Expand All @@ -132,7 +132,8 @@ impl Connector for MongoConnector {
state: &Self::State,
request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
handle_mutation_request(configuration, state, request).await
let query_context = get_query_context(configuration);
handle_mutation_request(configuration, query_context, state, request).await
}

#[instrument(err, skip_all)]
Expand Down
145 changes: 121 additions & 24 deletions crates/mongodb-connector/src/mutation.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,71 @@
use std::collections::BTreeMap;

use configuration::{schema::ObjectType, Configuration};
use configuration::Configuration;
use futures::future::try_join_all;
use itertools::Itertools;
use mongodb::Database;
use mongodb::{
bson::{self, Bson},
Database,
};
use mongodb_agent_common::{
procedure::Procedure, query::serialization::bson_to_json, state::ConnectorState,
};
use ndc_sdk::{
connector::MutationError,
json_response::JsonResponse,
models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse},
models::{
Field, MutationOperation, MutationOperationResults, MutationRequest, MutationResponse,
NestedArray, NestedField, NestedObject, Relationship,
},
};

use crate::{
api_type_conversions::QueryContext,
query_response::{extend_configured_object_types, prune_type_to_field_selection},
};

pub async fn handle_mutation_request(
config: &Configuration,
query_context: QueryContext<'_>,
state: &ConnectorState,
mutation_request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation");
let database = state.database();
let jobs = look_up_procedures(config, mutation_request)?;
let operation_results = try_join_all(
jobs.into_iter()
.map(|procedure| execute_procedure(&config.object_types, database.clone(), procedure)),
)
let jobs = look_up_procedures(config, &mutation_request)?;
let operation_results = try_join_all(jobs.into_iter().map(|(procedure, requested_fields)| {
execute_procedure(
&query_context,
database.clone(),
&mutation_request.collection_relationships,
procedure,
requested_fields,
)
}))
.await?;
Ok(JsonResponse::Value(MutationResponse { operation_results }))
}

/// Looks up procedures according to the names given in the mutation request, and pairs them with
/// arguments and requested fields. Returns an error if any procedures cannot be found.
fn look_up_procedures(
config: &Configuration,
mutation_request: MutationRequest,
) -> Result<Vec<Procedure<'_>>, MutationError> {
let (procedures, not_found): (Vec<Procedure>, Vec<String>) = mutation_request
fn look_up_procedures<'a, 'b>(
config: &'a Configuration,
mutation_request: &'b MutationRequest,
) -> Result<Vec<(Procedure<'a>, Option<&'b NestedField>)>, MutationError> {
let (procedures, not_found): (Vec<_>, Vec<String>) = mutation_request
.operations
.into_iter()
.iter()
.map(|operation| match operation {
MutationOperation::Procedure {
name, arguments, ..
name,
arguments,
fields,
} => {
let native_procedure = config.native_procedures.get(&name);
native_procedure.ok_or(name).map(|native_procedure| {
Procedure::from_native_procedure(native_procedure, arguments)
})
let native_procedure = config.native_procedures.get(name);
let procedure = native_procedure.ok_or(name).map(|native_procedure| {
Procedure::from_native_procedure(native_procedure, arguments.clone())
})?;
Ok((procedure, fields.as_ref()))
}
})
.partition_result();
Expand All @@ -61,17 +81,94 @@ fn look_up_procedures(
}

async fn execute_procedure(
object_types: &BTreeMap<String, ObjectType>,
query_context: &QueryContext<'_>,
database: Database,
relationships: &BTreeMap<String, Relationship>,
procedure: Procedure<'_>,
requested_fields: Option<&NestedField>,
) -> Result<MutationOperationResults, MutationError> {
let (result, result_type) = procedure
.execute(object_types, database.clone())
.execute(&query_context.object_types, database.clone())
.await
.map_err(|err| MutationError::InvalidRequest(err.to_string()))?;
let json_result = bson_to_json(&result_type, object_types, result.into())
.map_err(|err| MutationError::Other(Box::new(err)))?;
.map_err(|err| MutationError::UnprocessableContent(err.to_string()))?;

let rewritten_result = rewrite_response(requested_fields, result.into())?;

let (requested_result_type, temp_object_types) = prune_type_to_field_selection(
query_context,
relationships,
&[],
&result_type,
requested_fields,
)
.map_err(|err| MutationError::Other(Box::new(err)))?;
let object_types = extend_configured_object_types(query_context, temp_object_types);

let json_result = bson_to_json(&requested_result_type, &object_types, rewritten_result)
.map_err(|err| MutationError::UnprocessableContent(err.to_string()))?;

Ok(MutationOperationResults::Procedure {
result: json_result,
})
}

/// We need to traverse requested fields to rename any fields that are aliased in the GraphQL
/// request
fn rewrite_response(
requested_fields: Option<&NestedField>,
value: Bson,
) -> Result<Bson, MutationError> {
match (requested_fields, value) {
(None, value) => Ok(value),

(Some(NestedField::Object(fields)), Bson::Document(doc)) => {
Ok(rewrite_doc(fields, doc)?.into())
}
(Some(NestedField::Array(fields)), Bson::Array(values)) => {
Ok(rewrite_array(fields, values)?.into())
}

(Some(NestedField::Object(_)), _) => Err(MutationError::UnprocessableContent(
"expected an object".to_owned(),
)),
(Some(NestedField::Array(_)), _) => Err(MutationError::UnprocessableContent(
"expected an array".to_owned(),
)),
}
}

fn rewrite_doc(
fields: &NestedObject,
mut doc: bson::Document,
) -> Result<bson::Document, MutationError> {
fields
.fields
.iter()
.map(|(name, field)| {
let field_value = match field {
Field::Column { column, fields } => {
let orig_value = doc.remove(column).ok_or_else(|| {
MutationError::UnprocessableContent(format!(
"missing expected field from response: {name}"
))
})?;
rewrite_response(fields.as_ref(), orig_value)
}
Field::Relationship { .. } => Err(MutationError::UnsupportedOperation(
"The MongoDB connector does not support relationship references in mutations"
.to_owned(),
)),
}?;

Ok((name.clone(), field_value))
})
.try_collect()
}

fn rewrite_array(fields: &NestedArray, values: Vec<Bson>) -> Result<Vec<Bson>, MutationError> {
let nested = &fields.fields;
values
.into_iter()
.map(|value| rewrite_response(Some(nested), value))
.try_collect()
}
Loading

0 comments on commit ac982c8

Please sign in to comment.