Skip to content

Commit

Permalink
Add tracing spans and update SDK (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
codedmart committed Apr 27, 2024
1 parent c654ec2 commit ea3cba7
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 25 additions & 8 deletions crates/mongodb-agent-common/src/query/execute_query_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use dc_api_types::QueryRequest;
use futures::Stream;
use futures_util::TryStreamExt as _;
use mongodb::bson;
use tracing::Instrument;

use super::pipeline::pipeline_for_query_request;
use crate::{
Expand All @@ -21,24 +22,39 @@ pub async fn execute_query_request(
query_request: QueryRequest,
) -> Result<Vec<bson::Document>, MongoAgentError> {
let target = QueryTarget::for_request(config, &query_request);
let pipeline = pipeline_for_query_request(config, &query_request)?;
let pipeline = tracing::info_span!("Build Query Pipeline").in_scope(|| {
pipeline_for_query_request(config, &query_request)
})?;
tracing::debug!(
?query_request,
?target,
pipeline = %serde_json::to_string(&pipeline).unwrap(),
"executing query"
);

// The target of a query request might be a collection, or it might be a native query. In the
// latter case there is no collection to perform the aggregation against. So instead of sending
// the MongoDB API call `db.<collection>.aggregate` we instead call `db.aggregate`.
let documents = match target.input_collection() {
Some(collection_name) => {
let collection = database.collection(collection_name);
collect_from_cursor(collection.aggregate(pipeline, None).await?).await
let documents = async move {
match target.input_collection() {
Some(collection_name) => {
let collection = database.collection(collection_name);
collect_from_cursor(
collection.aggregate(pipeline, None)
.instrument(tracing::info_span!("Process Pipeline", internal.visibility = "user"))
.await?
)
.await
}
None => collect_from_cursor(
database.aggregate(pipeline, None)
.instrument(tracing::info_span!("Process Pipeline", internal.visibility = "user"))
.await?
)
.await,
}
None => collect_from_cursor(database.aggregate(pipeline, None).await?).await,
}?;
}
.instrument(tracing::info_span!("Execute Query Pipeline", internal.visibility = "user"))
.await?;
tracing::debug!(response_documents = %serde_json::to_string(&documents).unwrap(), "response from MongoDB");

Ok(documents)
Expand All @@ -51,5 +67,6 @@ async fn collect_from_cursor(
.into_stream()
.map_err(MongoAgentError::MongoDB)
.try_collect::<Vec<_>>()
.instrument(tracing::info_span!("Collect Pipeline", internal.visibility = "user"))
.await
}
33 changes: 21 additions & 12 deletions crates/mongodb-connector/src/mongo_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ndc_sdk::{
QueryResponse, SchemaResponse,
},
};
use tracing::instrument;
use tracing::{instrument, Instrument};

use crate::{
api_type_conversions::{v2_to_v3_explain_response, v3_to_v2_query_request},
Expand Down Expand Up @@ -141,18 +141,27 @@ impl Connector for MongoConnector {
state: &Self::State,
request: QueryRequest,
) -> Result<JsonResponse<QueryResponse>, QueryError> {
tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request");
let query_context = get_query_context(configuration);
let v2_request = v3_to_v2_query_request(&query_context, request.clone())?;
let response_documents = handle_query_request(configuration, state, v2_request)
.await
.map_err(mongo_agent_error_to_query_error)?;
let response = serialize_query_response(&query_context, &request, response_documents)
.map_err(|err| {
QueryError::UnprocessableContent(format!(
"error converting MongoDB response to JSON: {err}"
))
let response = async move {
tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request");
let query_context = get_query_context(configuration);
let v2_request = tracing::info_span!("Prepare Query Request").in_scope(|| {
v3_to_v2_query_request(&query_context, request.clone())
})?;
let response_documents = handle_query_request(configuration, state, v2_request)
.instrument(tracing::info_span!("Process Query Request", internal.visibility = "user"))
.await
.map_err(mongo_agent_error_to_query_error)?;
tracing::info_span!("Serialize Query Response", internal.visibility = "user").in_scope(|| {
serialize_query_response(&query_context, &request, response_documents)
.map_err(|err| {
QueryError::UnprocessableContent(format!(
"error converting MongoDB response to JSON: {err}"
))
})
})
}
.instrument(tracing::info_span!("/query", internal.visibility = "user"))
.await?;
Ok(response.into())
}
}

0 comments on commit ea3cba7

Please sign in to comment.