diff --git a/crates/configuration/src/native_query.rs b/crates/configuration/src/native_query.rs index ef6291e9..00e85169 100644 --- a/crates/configuration/src/native_query.rs +++ b/crates/configuration/src/native_query.rs @@ -15,6 +15,7 @@ use crate::{schema::ObjectField, serialized}; #[derive(Clone, Debug)] pub struct NativeQuery { pub representation: NativeQueryRepresentation, + pub input_collection: Option, pub arguments: BTreeMap, pub result_document_type: String, pub pipeline: Vec, @@ -25,6 +26,7 @@ impl From for NativeQuery { fn from(value: serialized::NativeQuery) -> Self { NativeQuery { representation: value.representation, + input_collection: value.input_collection, arguments: value.arguments, result_document_type: value.result_document_type, pipeline: value.pipeline, diff --git a/crates/configuration/src/serialized/native_query.rs b/crates/configuration/src/serialized/native_query.rs index 623fa4fe..2147f030 100644 --- a/crates/configuration/src/serialized/native_query.rs +++ b/crates/configuration/src/serialized/native_query.rs @@ -33,6 +33,10 @@ pub struct NativeQuery { /// a "function" in your ddn configuration. pub representation: NativeQueryRepresentation, + /// Use `input_collection` when you want to start an aggregation pipeline off of the specified + /// `input_collection` db..aggregate. + pub input_collection: Option, + /// Arguments to be supplied for each query invocation. These will be available to the given /// pipeline as variables. For information about variables in MongoDB aggregation expressions /// see https://www.mongodb.com/docs/manual/reference/aggregation-variables/ diff --git a/crates/mongodb-agent-common/src/explain.rs b/crates/mongodb-agent-common/src/explain.rs index 40d5185d..259629c3 100644 --- a/crates/mongodb-agent-common/src/explain.rs +++ b/crates/mongodb-agent-common/src/explain.rs @@ -22,9 +22,14 @@ pub async fn explain_query( let aggregate_target = match QueryTarget::for_request(config, &query_request) { QueryTarget::Collection(collection_name) => Bson::String(collection_name), - // 1 means aggregation without a collection target - as in `db.aggregate()` instead of - // `db..aggregate()` - QueryTarget::NativeQuery { .. } => Bson::Int32(1), + QueryTarget::NativeQuery { native_query, .. } => { + match &native_query.input_collection { + Some(collection_name) => Bson::String(collection_name.to_string()), + // 1 means aggregation without a collection target - as in `db.aggregate()` instead of + // `db..aggregate()` + None => Bson::Int32(1) + } + } }; let query_command = doc! { diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index d56bb03c..b49cb58d 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -39,8 +39,14 @@ pub async fn execute_query_request( let collection = database.collection(&collection_name); collect_from_cursor(collection.aggregate(pipeline, None).await?).await } - QueryTarget::NativeQuery { .. } => { - collect_from_cursor(database.aggregate(pipeline, None).await?).await + QueryTarget::NativeQuery { native_query, .. } => { + match &native_query.input_collection { + Some(collection_name) => { + let collection = database.collection(collection_name); + collect_from_cursor(collection.aggregate(pipeline, None).await?).await + }, + None => collect_from_cursor(database.aggregate(pipeline, None).await?).await + } } }?; diff --git a/crates/mongodb-agent-common/src/query/native_query.rs b/crates/mongodb-agent-common/src/query/native_query.rs index ca2cc84d..d2b4b1c8 100644 --- a/crates/mongodb-agent-common/src/query/native_query.rs +++ b/crates/mongodb-agent-common/src/query/native_query.rs @@ -101,6 +101,7 @@ mod tests { async fn executes_native_query() -> Result<(), anyhow::Error> { let native_query = NativeQuery { representation: NativeQueryRepresentation::Collection, + input_collection: None, arguments: [ ( "filter".to_string(),