Skip to content

Commit

Permalink
add native queries, functions or virtual collections defined by pipel…
Browse files Browse the repository at this point in the history
…ines (#45)

Implements native queries. They are defined as aggregation pipelines. If the root target of a query request is a native query then the given pipeline will form the start of the overall query plan pipeline. In this case the query will be executed as a MongoDB `aggregate` command with no target collection - as opposed to our other queries which are an `aggregate` command that *does* have a collection target.

Native queries currently cannot be the target of a relation.

There is a really basic native query in fixtures to test with. If you run services with `arion up -d` you can see it as a query field called `hello`.

The changes were going to result in a large-ish amount of very similar, but technically incompatible code involving converting configuration to ndc types for schema responses, and for processing query requests. To avoid that I pushed configuration processing into the `configuration` crate. This makes it easier to share that logic, pushes a bunch of errors from connector runtime to configuration parsing time, and pushes computation to connector startup time instead of response-handling time. This resulted in a bunch of changes:

- The `MongoConfig` type is gone. Instead configuration-related data is kept in `Configuration` which is now passed directly to `mongodb-agent-common` functions. Database-connection data is put in a new type, `ConnectorState`. So now we have properly separated configuration and state types, which is what the ndc-sdk API expects.
- The `configuration` crate has a new dependency on `ndc-models`. We need to keep the `ndc-models` version matched with `ndc-sdk`. To make that easier I moved configuration for those dependencies to the workspace `Cargo.toml` and added a note.
  • Loading branch information
hallettj committed Apr 19, 2024
1 parent 93e0b09 commit 2067659
Show file tree
Hide file tree
Showing 60 changed files with 2,136 additions and 1,206 deletions.
43 changes: 10 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ members = [
]
resolver = "2"

# The tag or rev of ndc-models must match the locked tag or rev of the
# ndc-models dependency of ndc-sdk
[workspace.dependencies]
ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git" }
ndc-models = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.2" }

# We have a fork of the mongodb driver with a fix for reading metadata from time
# series collections.
# See the upstream PR: https://github.com/mongodb/mongo-rust-driver/pull/1003
Expand Down
12 changes: 6 additions & 6 deletions crates/cli/src/introspection/sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use configuration::{
};
use futures_util::TryStreamExt;
use mongodb::bson::{doc, Bson, Document};
use mongodb_agent_common::interface_types::MongoConfig;
use mongodb_agent_common::state::ConnectorState;
use mongodb_support::BsonScalarType::{self, *};

type ObjectField = WithName<schema::ObjectField>;
Expand All @@ -19,18 +19,18 @@ type ObjectType = WithName<schema::ObjectType>;
/// are not unifiable.
pub async fn sample_schema_from_db(
sample_size: u32,
config: &MongoConfig,
state: &ConnectorState,
existing_schemas: &HashSet<std::string::String>,
) -> anyhow::Result<BTreeMap<std::string::String, Schema>> {
let mut schemas = BTreeMap::new();
let db = config.client.database(&config.database);
let db = state.database();
let mut collections_cursor = db.list_collections(None, None).await?;

while let Some(collection_spec) = collections_cursor.try_next().await? {
let collection_name = collection_spec.name;
if !existing_schemas.contains(&collection_name) {
let collection_schema =
sample_schema_from_collection(&collection_name, sample_size, config).await?;
sample_schema_from_collection(&collection_name, sample_size, state).await?;
schemas.insert(collection_name, collection_schema);
}
}
Expand All @@ -40,9 +40,9 @@ pub async fn sample_schema_from_db(
async fn sample_schema_from_collection(
collection_name: &str,
sample_size: u32,
config: &MongoConfig,
state: &ConnectorState,
) -> anyhow::Result<Schema> {
let db = config.client.database(&config.database);
let db = state.database();
let options = None;
let mut cursor = db
.collection::<Document>(collection_name)
Expand Down
11 changes: 7 additions & 4 deletions crates/cli/src/introspection/validation_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ use configuration::{
};
use futures_util::TryStreamExt;
use mongodb::bson::from_bson;
use mongodb_agent_common::schema::{get_property_description, Property, ValidatorSchema};
use mongodb_agent_common::{
schema::{get_property_description, Property, ValidatorSchema},
state::ConnectorState,
};
use mongodb_support::BsonScalarType;

use mongodb_agent_common::interface_types::{MongoAgentError, MongoConfig};
use mongodb_agent_common::interface_types::MongoAgentError;

type Collection = WithName<schema::Collection>;
type ObjectType = WithName<schema::ObjectType>;
type ObjectField = WithName<schema::ObjectField>;

pub async fn get_metadata_from_validation_schema(
config: &MongoConfig,
state: &ConnectorState,
) -> Result<BTreeMap<String, Schema>, MongoAgentError> {
let db = config.client.database(&config.database);
let db = state.database();
let mut collections_cursor = db.list_collections(None, None).await?;

let mut schemas: Vec<WithName<Schema>> = vec![];
Expand Down
9 changes: 4 additions & 5 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use std::path::PathBuf;

use clap::{Parser, Subcommand};

use mongodb_agent_common::interface_types::MongoConfig;

// Exported for use in tests
pub use introspection::type_from_bson;
use mongodb_agent_common::state::ConnectorState;

#[derive(Debug, Clone, Parser)]
pub struct UpdateArgs {
Expand All @@ -29,7 +28,7 @@ pub enum Command {

pub struct Context {
pub path: PathBuf,
pub mongo_config: MongoConfig,
pub connector_state: ConnectorState,
}

/// Run a command in a given directory.
Expand All @@ -44,14 +43,14 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> {
async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> {
if !args.no_validator_schema {
let schemas_from_json_validation =
introspection::get_metadata_from_validation_schema(&context.mongo_config).await?;
introspection::get_metadata_from_validation_schema(&context.connector_state).await?;
configuration::write_schema_directory(&context.path, schemas_from_json_validation).await?;
}

let existing_schemas = configuration::list_existing_schemas(&context.path).await?;
let schemas_from_sampling = introspection::sample_schema_from_db(
args.sample_size,
&context.mongo_config,
&context.connector_state,
&existing_schemas,
)
.await?;
Expand Down
7 changes: 5 additions & 2 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ pub async fn main() -> anyhow::Result<()> {
Some(path) => path,
None => env::current_dir()?,
};
let mongo_config = try_init_state_from_uri(&args.connection_uri, &Default::default())
let connector_state = try_init_state_from_uri(&args.connection_uri)
.await
.map_err(|e| anyhow!("Error initializing MongoDB state {}", e))?;
let context = Context { path, mongo_config };
let context = Context {
path,
connector_state,
};
run(args.subcommand, &context).await?;
Ok(())
}
1 change: 1 addition & 0 deletions crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ futures = "^0.3"
itertools = "^0.12"
mongodb = "2.8"
mongodb-support = { path = "../mongodb-support" }
ndc-models = { workspace = true }
schemars = "^0.8.12"
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
Expand Down
Loading

0 comments on commit 2067659

Please sign in to comment.