Skip to content

Commit

Permalink
prevent response serialization from assuming objects are relations (#34)
Browse files Browse the repository at this point in the history
The change in #27 fixed serializing query responses with object values, but serializing foreach responses stopped working. This PR switches back to roughly the logic in which foreach responses were working, and fixes the issue with object values in a different way.

This is a quick patch - we still need to rework response serialization to use `bson_to_json` translation. That will require using the query request to direct response serialization which should make everything more robust.

I removed the use of `JsonResponse` in this PR because it serves no purpose for this connector. `JsonResponse` is useful for connectors that get a JSON response from the database, which the connector can pass through without processing. But as far as we can determine MongoDB always sends BSON so to get JSON output we're stuck doing response processing in the connector.

There's also a fix in here to avoid using `$literal` expression escaping in a context where expressions are not evaluated.

Fixes https://hasurahq.atlassian.net/browse/MDB-19

---------

Co-authored-by: Brandon Martin <brandon@codedmart.com>
  • Loading branch information
hallettj and codedmart authored Apr 12, 2024
1 parent 9b43d58 commit 4274be1
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 228 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 22 additions & 6 deletions crates/dc-api-types/src/query_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,29 @@ pub struct ForEachRow {
pub query: RowSet,
}

/// A row set must contain either rows, or aggregates, or possibly both
#[skip_serializing_none]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct RowSet {
/// The results of the aggregates returned by the query
pub aggregates: Option<HashMap<String, serde_json::Value>>,
/// The rows returned by the query, corresponding to the query's fields
pub rows: Option<Vec<HashMap<String, ResponseFieldValue>>>,
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum RowSet {
Aggregate {
/// The results of the aggregates returned by the query
aggregates: HashMap<String, serde_json::Value>,
/// The rows returned by the query, corresponding to the query's fields
rows: Option<Vec<HashMap<String, ResponseFieldValue>>>,
},
Rows {
/// Rows returned by a query that did not request aggregates.
rows: Vec<HashMap<String, ResponseFieldValue>>,
},
}

impl Default for RowSet {
fn default() -> Self {
RowSet::Rows {
rows: Default::default(),
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
120 changes: 0 additions & 120 deletions crates/dc-api/src/interface_types/json_response.rs

This file was deleted.

3 changes: 1 addition & 2 deletions crates/dc-api/src/interface_types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod agent_error;
mod json_response;

pub use self::{agent_error::AgentError, json_response::JsonResponse};
pub use self::agent_error::AgentError;
2 changes: 1 addition & 1 deletion crates/dc-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod interface_types;

pub use self::interface_types::{AgentError, JsonResponse};
pub use self::interface_types::AgentError;
13 changes: 5 additions & 8 deletions crates/mongodb-agent-common/src/query/execute_query_request.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use anyhow::anyhow;
use bytes::Bytes;
use dc_api::JsonResponse;
use dc_api_types::{QueryRequest, QueryResponse};
use futures_util::TryStreamExt;
use mongodb::bson::{doc, Document};
use mongodb::bson::{self, doc, Document};

use super::pipeline::{pipeline_for_query_request, ResponseShape};
use crate::{interface_types::MongoAgentError, mongodb::CollectionTrait};

pub async fn execute_query_request(
collection: &impl CollectionTrait<Document>,
query_request: QueryRequest,
) -> Result<JsonResponse<QueryResponse>, MongoAgentError> {
) -> Result<QueryResponse, MongoAgentError> {
let (pipeline, response_shape) = pipeline_for_query_request(&query_request)?;
tracing::debug!(pipeline = %serde_json::to_string(&pipeline).unwrap(), "aggregate pipeline");

Expand All @@ -33,9 +31,8 @@ pub async fn execute_query_request(
))
})?,
};
tracing::debug!(response_document = %serde_json::to_string(&response_document).unwrap(), "response from MongoDB");

let bytes: Bytes = serde_json::to_vec(&response_document)
.map_err(MongoAgentError::Serialization)?
.into();
Ok(JsonResponse::Serialized(bytes))
let response = bson::from_document(response_document)?;
Ok(response)
}
13 changes: 4 additions & 9 deletions crates/mongodb-agent-common/src/query/foreach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ mod tests {
"$facet": {
"__FACET___0": [
{ "$match": { "$and": [{ "artistId": {"$eq":1 }}]}},
{ "$replaceWith": {
{ "$replaceWith": {
"albumId": { "$ifNull": ["$albumId", null] },
"title": { "$ifNull": ["$title", null] }
} },
],
"__FACET___1": [
{ "$match": { "$and": [{ "artistId": {"$eq":2}}]}},
{ "$replaceWith": {
{ "$replaceWith": {
"albumId": { "$ifNull": ["$albumId", null] },
"title": { "$ifNull": ["$title", null] }
} },
Expand Down Expand Up @@ -248,9 +248,7 @@ mod tests {
})?)]))
});

let result = execute_query_request(&collection, query_request)
.await?
.into_value()?;
let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);

Ok(())
Expand Down Expand Up @@ -364,7 +362,6 @@ mod tests {
]
}))?;


let mut collection = MockCollectionTrait::new();
collection
.expect_aggregate()
Expand Down Expand Up @@ -398,9 +395,7 @@ mod tests {
})?)]))
});

let result = execute_query_request(&collection, query_request)
.await?
.into_value()?;
let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);

Ok(())
Expand Down
11 changes: 5 additions & 6 deletions crates/mongodb-agent-common/src/query/make_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use mongodb_support::BsonScalarType;

use crate::{
comparison_function::ComparisonFunction, interface_types::MongoAgentError,
query::serialization::json_to_bson_scalar, query::column_ref::column_ref,
query::column_ref::column_ref, query::serialization::json_to_bson_scalar,
};

use BinaryArrayComparisonOperator as ArrOp;
Expand Down Expand Up @@ -112,7 +112,7 @@ fn make_selector_helper(
"comparisons between columns",
)),
ArrayComparisonValue::Variable(name) => {
Ok(variable_to_mongo_expression(variables, name, value_type)?.into())
variable_to_mongo_expression(variables, name, value_type)
}
})
.collect::<Result<_, MongoAgentError>>()?;
Expand Down Expand Up @@ -149,11 +149,10 @@ fn variable_to_mongo_expression(
variables: Option<&BTreeMap<String, serde_json::Value>>,
variable: &str,
value_type: &str,
) -> Result<bson::Document, MongoAgentError> {
) -> Result<bson::Bson, MongoAgentError> {
let value = variables
.and_then(|vars| vars.get(variable))
.ok_or_else(|| MongoAgentError::VariableNotDefined(variable.to_owned()))?;
Ok(doc! {
"$literal": bson_from_scalar_value(value, value_type)?
})

bson_from_scalar_value(value, value_type)
}
19 changes: 5 additions & 14 deletions crates/mongodb-agent-common/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod pipeline;
mod relations;
pub mod serialization;

use dc_api::JsonResponse;
use dc_api_types::{QueryRequest, QueryResponse, Target};
use mongodb::bson::Document;

Expand All @@ -28,7 +27,7 @@ pub fn collection_name(query_request_target: &Target) -> String {
pub async fn handle_query_request(
config: &MongoConfig,
query_request: QueryRequest,
) -> Result<JsonResponse<QueryResponse>, MongoAgentError> {
) -> Result<QueryResponse, MongoAgentError> {
tracing::debug!(?config, query_request = %serde_json::to_string(&query_request).unwrap(), "executing query");

let database = config.client.database(&config.database);
Expand Down Expand Up @@ -91,9 +90,7 @@ mod tests {
]))
});

let result = execute_query_request(&collection, query_request)
.await?
.into_value()?;
let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);
Ok(())
}
Expand Down Expand Up @@ -170,9 +167,7 @@ mod tests {
})?)]))
});

let result = execute_query_request(&collection, query_request)
.await?
.into_value()?;
let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);
Ok(())
}
Expand Down Expand Up @@ -255,9 +250,7 @@ mod tests {
})?)]))
});

let result = execute_query_request(&collection, query_request)
.await?
.into_value()?;
let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);
Ok(())
}
Expand Down Expand Up @@ -317,9 +310,7 @@ mod tests {
})?)]))
});

let result = execute_query_request(&collection, query_request)
.await?
.into_value()?;
let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);
Ok(())
}
Expand Down
Loading

0 comments on commit 4274be1

Please sign in to comment.