Skip to content

Commit

Permalink
check query request to determine whether response is foreach (#40)
Browse files Browse the repository at this point in the history
We were getting errors in query responses with no matching rows because the response parsing interpreted the empty set of response rows as a foreach response. This change references the query request to determine whether or not to parse the response as a foreach.

[MDB-19](https://hasurahq.atlassian.net/browse/MDB-19)
  • Loading branch information
hallettj authored Apr 15, 2024
1 parent b86c233 commit 58b90ca
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 17 deletions.
45 changes: 32 additions & 13 deletions crates/mongodb-agent-common/src/query/execute_query_request.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use anyhow::anyhow;
use dc_api_types::{QueryRequest, QueryResponse};
use dc_api_types::{QueryRequest, QueryResponse, RowSet};
use futures_util::TryStreamExt;
use mongodb::bson::{self, doc, Document};
use itertools::Itertools as _;
use mongodb::bson::{self, Document};

use super::pipeline::{pipeline_for_query_request, ResponseShape};
use crate::{interface_types::MongoAgentError, mongodb::CollectionTrait};
use crate::{
interface_types::MongoAgentError, mongodb::CollectionTrait, query::foreach::foreach_variants,
};

pub async fn execute_query_request(
collection: &impl CollectionTrait<Document>,
Expand All @@ -25,18 +28,34 @@ pub async fn execute_query_request(
.try_collect::<Vec<_>>()
.await?;

let response_document: Document = match response_shape {
ResponseShape::RowStream => {
doc! { "rows": documents }
tracing::debug!(response_documents = %serde_json::to_string(&documents).unwrap(), "response from MongoDB");

let response = match (foreach_variants(&query_request), response_shape) {
(Some(_), _) => parse_single_document(documents)?,
(None, ResponseShape::ListOfRows) => QueryResponse::Single(RowSet::Rows {
rows: documents
.into_iter()
.map(bson::from_document)
.try_collect()?,
}),
(None, ResponseShape::SingleObject) => {
QueryResponse::Single(parse_single_document(documents)?)
}
ResponseShape::SingleObject => documents.into_iter().next().ok_or_else(|| {
MongoAgentError::AdHoc(anyhow!(
"Expected a response document from MongoDB, but did not get one"
))
})?,
};
tracing::debug!(response_document = %serde_json::to_string(&response_document).unwrap(), "response from MongoDB");
tracing::debug!(response = %serde_json::to_string(&response).unwrap(), "query response");

let response = bson::from_document(response_document)?;
Ok(response)
}

fn parse_single_document<T>(documents: Vec<Document>) -> Result<T, MongoAgentError>
where
T: for<'de> serde::Deserialize<'de>,
{
let document = documents.into_iter().next().ok_or_else(|| {
MongoAgentError::AdHoc(anyhow!(
"Expected a response document from MongoDB, but did not get one"
))
})?;
let value = bson::from_document(document)?;
Ok(value)
}
2 changes: 1 addition & 1 deletion crates/mongodb-agent-common/src/query/foreach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn pipeline_for_foreach(
let selection = Selection(doc! {
"rows": pipelines_with_response_shapes.iter().map(|(key, (_, response_shape))| doc! {
"query": match response_shape {
ResponseShape::RowStream => doc! { "rows": format!("${key}") }.into(),
ResponseShape::ListOfRows => doc! { "rows": format!("${key}") }.into(),
ResponseShape::SingleObject => Bson::String(format!("${key}")),
}
}).collect::<Vec<_>>()
Expand Down
26 changes: 25 additions & 1 deletion crates/mongodb-agent-common/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn handle_query_request(

#[cfg(test)]
mod tests {
use dc_api_types::{QueryRequest, QueryResponse};
use dc_api_types::{QueryRequest, QueryResponse, RowSet};
use mongodb::{
bson::{self, bson, doc, from_document, to_bson},
options::AggregateOptions,
Expand Down Expand Up @@ -312,4 +312,28 @@ mod tests {
assert_eq!(expected_response, result);
Ok(())
}

#[tokio::test]
async fn parses_empty_response() -> Result<(), anyhow::Error> {
let query_request: QueryRequest = from_value(json!({
"query": {
"fields": {
"date": { "type": "column", "column": "date", "column_type": "date", },
},
},
"target": { "type": "table", "name": [ "comments" ] },
"relationships": [],
}))?;

let expected_response = QueryResponse::Single(RowSet::Rows { rows: vec![] });

let mut collection = MockCollectionTrait::new();
collection
.expect_aggregate()
.returning(move |_pipeline, _: Option<AggregateOptions>| Ok(mock_stream(vec![])));

let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);
Ok(())
}
}
4 changes: 2 additions & 2 deletions crates/mongodb-agent-common/src/query/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{
pub enum ResponseShape {
/// Indicates that the response will be a stream of records that must be wrapped in an object
/// with a `rows` field to produce a valid `QueryResponse` for HGE.
RowStream,
ListOfRows,

/// Indicates that the response has already been wrapped in a single object with `rows` and/or
/// `aggregates` fields.
Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn pipeline_for_non_foreach(
(stages, ResponseShape::SingleObject)
} else {
let stages = pipeline_for_fields_facet(query_request)?;
(stages, ResponseShape::RowStream)
(stages, ResponseShape::ListOfRows)
};

pipeline.append(diverging_stages);
Expand Down

0 comments on commit 58b90ca

Please sign in to comment.