diff --git a/Cargo.lock b/Cargo.lock index 04ad9b9e..eb7731cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1773,7 +1773,7 @@ dependencies = [ [[package]] name = "ndc-sdk" version = "0.1.0" -source = "git+https://github.com/hasura/ndc-sdk-rs.git#972dba6e270ad54f4748487f75018c24229c1e5e" +source = "git+https://github.com/hasura/ndc-sdk-rs.git#a273a01efccfc71ef3341cf5f357b2c9ae2d109f" dependencies = [ "async-trait", "axum", diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index 71c92a54..43eaff9a 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -3,6 +3,7 @@ use dc_api_types::QueryRequest; use futures::Stream; use futures_util::TryStreamExt as _; use mongodb::bson; +use tracing::Instrument; use super::pipeline::pipeline_for_query_request; use crate::{ @@ -21,24 +22,39 @@ pub async fn execute_query_request( query_request: QueryRequest, ) -> Result, MongoAgentError> { let target = QueryTarget::for_request(config, &query_request); - let pipeline = pipeline_for_query_request(config, &query_request)?; + let pipeline = tracing::info_span!("Build Query Pipeline").in_scope(|| { + pipeline_for_query_request(config, &query_request) + })?; tracing::debug!( ?query_request, ?target, pipeline = %serde_json::to_string(&pipeline).unwrap(), "executing query" ); - // The target of a query request might be a collection, or it might be a native query. In the // latter case there is no collection to perform the aggregation against. So instead of sending // the MongoDB API call `db..aggregate` we instead call `db.aggregate`. - let documents = match target.input_collection() { - Some(collection_name) => { - let collection = database.collection(collection_name); - collect_from_cursor(collection.aggregate(pipeline, None).await?).await + let documents = async move { + match target.input_collection() { + Some(collection_name) => { + let collection = database.collection(collection_name); + collect_from_cursor( + collection.aggregate(pipeline, None) + .instrument(tracing::info_span!("Process Pipeline", internal.visibility = "user")) + .await? + ) + .await + } + None => collect_from_cursor( + database.aggregate(pipeline, None) + .instrument(tracing::info_span!("Process Pipeline", internal.visibility = "user")) + .await? + ) + .await, } - None => collect_from_cursor(database.aggregate(pipeline, None).await?).await, - }?; + } + .instrument(tracing::info_span!("Execute Query Pipeline", internal.visibility = "user")) + .await?; tracing::debug!(response_documents = %serde_json::to_string(&documents).unwrap(), "response from MongoDB"); Ok(documents) @@ -51,5 +67,6 @@ async fn collect_from_cursor( .into_stream() .map_err(MongoAgentError::MongoDB) .try_collect::>() + .instrument(tracing::info_span!("Collect Pipeline", internal.visibility = "user")) .await } diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index 892c8741..37be212e 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -18,7 +18,7 @@ use ndc_sdk::{ QueryResponse, SchemaResponse, }, }; -use tracing::instrument; +use tracing::{instrument, Instrument}; use crate::{ api_type_conversions::{v2_to_v3_explain_response, v3_to_v2_query_request}, @@ -141,18 +141,27 @@ impl Connector for MongoConnector { state: &Self::State, request: QueryRequest, ) -> Result, QueryError> { - tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request"); - let query_context = get_query_context(configuration); - let v2_request = v3_to_v2_query_request(&query_context, request.clone())?; - let response_documents = handle_query_request(configuration, state, v2_request) - .await - .map_err(mongo_agent_error_to_query_error)?; - let response = serialize_query_response(&query_context, &request, response_documents) - .map_err(|err| { - QueryError::UnprocessableContent(format!( - "error converting MongoDB response to JSON: {err}" - )) + let response = async move { + tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request"); + let query_context = get_query_context(configuration); + let v2_request = tracing::info_span!("Prepare Query Request").in_scope(|| { + v3_to_v2_query_request(&query_context, request.clone()) })?; + let response_documents = handle_query_request(configuration, state, v2_request) + .instrument(tracing::info_span!("Process Query Request", internal.visibility = "user")) + .await + .map_err(mongo_agent_error_to_query_error)?; + tracing::info_span!("Serialize Query Response", internal.visibility = "user").in_scope(|| { + serialize_query_response(&query_context, &request, response_documents) + .map_err(|err| { + QueryError::UnprocessableContent(format!( + "error converting MongoDB response to JSON: {err}" + )) + }) + }) + } + .instrument(tracing::info_span!("/query", internal.visibility = "user")) + .await?; Ok(response.into()) } }