diff --git a/Cargo.lock b/Cargo.lock index 875e0dda..cabb1cad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1541,6 +1541,7 @@ dependencies = [ "mongodb-support", "serde", "serde_json", + "these", "thiserror", "tokio", ] @@ -1580,6 +1581,7 @@ dependencies = [ "anyhow", "dc-api-types", "enum-iterator", + "indexmap 1.9.3", "schemars", "serde", "serde_json", @@ -1771,7 +1773,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75e56d5c441965b6425165b7e3223cc933ca469834f4a8b4786817a1f9dc4f13" dependencies = [ - "indexmap 1.9.3", + "indexmap 2.2.5", "serde", "serde_json", ] @@ -2959,6 +2961,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" +[[package]] +name = "these" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7162adbff4f8c44e938e0e51f6d3d829818c2ffefd793702a3a6f6ef0551de43" + [[package]] name = "thiserror" version = "1.0.58" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 092ba2db..caafce44 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -17,3 +17,4 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.113", features = ["raw_value"] } thiserror = "1.0.57" tokio = { version = "1.36.0", features = ["full"] } +these = "2.0.0" diff --git a/crates/cli/src/introspection/mod.rs b/crates/cli/src/introspection/mod.rs new file mode 100644 index 00000000..057303c2 --- /dev/null +++ b/crates/cli/src/introspection/mod.rs @@ -0,0 +1,6 @@ +pub mod sampling; +pub mod validation_schema; +pub mod type_unification; + +pub use validation_schema::get_metadata_from_validation_schema; +pub use sampling::sample_schema_from_db; \ No newline at end of file diff --git a/crates/cli/src/introspection/sampling.rs b/crates/cli/src/introspection/sampling.rs new file mode 100644 index 00000000..8e86fb77 --- /dev/null +++ b/crates/cli/src/introspection/sampling.rs @@ -0,0 +1,163 @@ +use super::type_unification::{ + unify_object_types, unify_schema, unify_type, TypeUnificationContext, TypeUnificationResult, +}; +use configuration::{ + schema::{Collection, ObjectField, ObjectType, Type}, + Schema, +}; +use futures_util::TryStreamExt; +use mongodb::bson::{doc, Bson, Document}; +use mongodb_agent_common::interface_types::MongoConfig; +use mongodb_support::BsonScalarType::{self, *}; + +/// Sample from all collections in the database and return a Schema. +/// Return an error if there are any errors accessing the database +/// or if the types derived from the sample documents for a collection +/// are not unifiable. +pub async fn sample_schema_from_db( + sample_size: u32, + config: &MongoConfig, +) -> anyhow::Result { + let mut schema = Schema { + collections: vec![], + object_types: vec![], + }; + let db = config.client.database(&config.database); + let mut collections_cursor = db.list_collections(None, None).await?; + + while let Some(collection_spec) = collections_cursor.try_next().await? { + let collection_name = collection_spec.name; + let collection_schema = + sample_schema_from_collection(&collection_name, sample_size, config).await?; + schema = unify_schema(schema, collection_schema)?; + } + Ok(schema) +} + +async fn sample_schema_from_collection( + collection_name: &str, + sample_size: u32, + config: &MongoConfig, +) -> anyhow::Result { + let db = config.client.database(&config.database); + let options = None; + let mut cursor = db + .collection::(collection_name) + .aggregate(vec![doc! {"$sample": { "size": sample_size }}], options) + .await?; + let mut collected_object_types = vec![]; + while let Some(document) = cursor.try_next().await? { + let object_types = make_object_type(collection_name, &document)?; + collected_object_types = if collected_object_types.is_empty() { + object_types + } else { + unify_object_types(collected_object_types, object_types)? + }; + } + let collection_info = Collection { + name: collection_name.to_string(), + description: None, + r#type: collection_name.to_string(), + }; + + Ok(Schema { + collections: vec![collection_info], + object_types: collected_object_types, + }) +} + +fn make_object_type( + object_type_name: &str, + document: &Document, +) -> TypeUnificationResult> { + let (mut object_type_defs, object_fields) = { + let type_prefix = format!("{object_type_name}_"); + let (object_type_defs, object_fields): (Vec>, Vec) = document + .iter() + .map(|(field_name, field_value)| { + make_object_field(&type_prefix, field_name, field_value) + }) + .collect::, ObjectField)>>>()? + .into_iter() + .unzip(); + (object_type_defs.concat(), object_fields) + }; + + let object_type = ObjectType { + name: object_type_name.to_string(), + description: None, + fields: object_fields, + }; + + object_type_defs.push(object_type); + Ok(object_type_defs) +} + +fn make_object_field( + type_prefix: &str, + field_name: &str, + field_value: &Bson, +) -> TypeUnificationResult<(Vec, ObjectField)> { + let object_type_name = format!("{type_prefix}{field_name}"); + let (collected_otds, field_type) = make_field_type(&object_type_name, field_name, field_value)?; + + let object_field = ObjectField { + name: field_name.to_owned(), + description: None, + r#type: field_type, + }; + + Ok((collected_otds, object_field)) +} + +fn make_field_type( + object_type_name: &str, + field_name: &str, + field_value: &Bson, +) -> TypeUnificationResult<(Vec, Type)> { + fn scalar(t: BsonScalarType) -> TypeUnificationResult<(Vec, Type)> { + Ok((vec![], Type::Scalar(t))) + } + match field_value { + Bson::Double(_) => scalar(Double), + Bson::String(_) => scalar(String), + Bson::Array(arr) => { + // Examine all elements of the array and take the union of the resulting types. + let mut collected_otds = vec![]; + let mut result_type = Type::Scalar(Undefined); + for elem in arr { + let (elem_collected_otds, elem_type) = + make_field_type(object_type_name, field_name, elem)?; + collected_otds = if collected_otds.is_empty() { + elem_collected_otds + } else { + unify_object_types(collected_otds, elem_collected_otds)? + }; + let context = TypeUnificationContext::new(object_type_name, field_name); + result_type = unify_type(context, result_type, elem_type)?; + } + Ok((collected_otds, Type::ArrayOf(Box::new(result_type)))) + } + Bson::Document(document) => { + let collected_otds = make_object_type(object_type_name, document)?; + Ok((collected_otds, Type::Object(object_type_name.to_owned()))) + } + Bson::Boolean(_) => scalar(Bool), + Bson::Null => scalar(Null), + Bson::RegularExpression(_) => scalar(Regex), + Bson::JavaScriptCode(_) => scalar(Javascript), + Bson::JavaScriptCodeWithScope(_) => scalar(JavascriptWithScope), + Bson::Int32(_) => scalar(Int), + Bson::Int64(_) => scalar(Long), + Bson::Timestamp(_) => scalar(Timestamp), + Bson::Binary(_) => scalar(BinData), + Bson::ObjectId(_) => scalar(ObjectId), + Bson::DateTime(_) => scalar(Date), + Bson::Symbol(_) => scalar(Symbol), + Bson::Decimal128(_) => scalar(Decimal), + Bson::Undefined => scalar(Undefined), + Bson::MaxKey => scalar(MaxKey), + Bson::MinKey => scalar(MinKey), + Bson::DbPointer(_) => scalar(DbPointer), + } +} diff --git a/crates/cli/src/introspection/type_unification.rs b/crates/cli/src/introspection/type_unification.rs new file mode 100644 index 00000000..b435e54e --- /dev/null +++ b/crates/cli/src/introspection/type_unification.rs @@ -0,0 +1,232 @@ +/// This module contains functions for unifying types. +/// This is useful when deriving a schema from set of sample documents. +/// It allows the information in the schemas derived from several documents to be combined into one schema. +/// +use configuration::{ + schema::{ObjectField, ObjectType, Type}, + Schema, +}; +use indexmap::IndexMap; +use mongodb_support::{ + align::align_with_result, + BsonScalarType::{self, *}, +}; +use std::{ + fmt::{self, Display}, + string::String, +}; +use thiserror::Error; + +#[derive(Debug)] +pub struct TypeUnificationContext { + object_type_name: String, + field_name: String, +} + +impl TypeUnificationContext { + pub fn new(object_type_name: &str, field_name: &str) -> Self { + TypeUnificationContext { + object_type_name: object_type_name.to_owned(), + field_name: field_name.to_owned(), + } + } +} + +impl Display for TypeUnificationContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "object type: {}, field: {}", + self.object_type_name, self.field_name + ) + } +} + +#[derive(Debug, Error)] +pub enum TypeUnificationError { + ScalarType(TypeUnificationContext, BsonScalarType, BsonScalarType), + ObjectType(String, String), + TypeKind(Type, Type), +} + +impl Display for TypeUnificationError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::ScalarType(context, scalar_a, scalar_b) => write!( + f, + "Scalar type mismatch {} {} at {}", + scalar_a.bson_name(), + scalar_b.bson_name(), + context + ), + Self::ObjectType(object_a, object_b) => { + write!(f, "Object type mismatch {} {}", object_a, object_b) + } + Self::TypeKind(type_a, type_b) => { + write!(f, "Type mismatch {:?} {:?}", type_a, type_b) + } + } + } +} + +pub type TypeUnificationResult = Result; + +/// Unify two types. +/// Return an error if the types are not unifiable. +pub fn unify_type( + context: TypeUnificationContext, + type_a: Type, + type_b: Type, +) -> TypeUnificationResult { + match (type_a, type_b) { + // If one type is undefined, the union is the other type. + // This is used as the base case when inferring array types from documents. + (Type::Scalar(Undefined), type_b) => Ok(type_b), + (type_a, Type::Scalar(Undefined)) => Ok(type_a), + + // Union of any type with Null is the Nullable version of that type + (Type::Scalar(Null), type_b) => Ok(make_nullable(type_b)), + (type_a, Type::Scalar(Null)) => Ok(make_nullable(type_a)), + + // Scalar types only unify if they are the same type. + (Type::Scalar(scalar_a), Type::Scalar(scalar_b)) => { + if scalar_a == scalar_b { + Ok(Type::Scalar(scalar_a)) + } else { + Err(TypeUnificationError::ScalarType( + context, scalar_a, scalar_b, + )) + } + } + + // Object types only unify if they have the same name. + (Type::Object(object_a), Type::Object(object_b)) => { + if object_a == object_b { + Ok(Type::Object(object_a)) + } else { + Err(TypeUnificationError::ObjectType(object_a, object_b)) + } + } + + // Array types unify iff their element types unify. + (Type::ArrayOf(elem_type_a), Type::ArrayOf(elem_type_b)) => { + let elem_type = unify_type(context, *elem_type_a, *elem_type_b)?; + Ok(Type::ArrayOf(Box::new(elem_type))) + } + + // A Nullable type will unify with another type iff the underlying type is unifiable. + // The resulting type will be Nullable. + (Type::Nullable(nullable_type_a), type_b) => { + let result_type = unify_type(context, *nullable_type_a, type_b)?; + Ok(make_nullable(result_type)) + } + (type_a, Type::Nullable(nullable_type_b)) => { + let result_type = unify_type(context, type_a, *nullable_type_b)?; + Ok(make_nullable(result_type)) + } + + // Anything else is a unification error. + (type_a, type_b) => Err(TypeUnificationError::TypeKind(type_a, type_b)), + } +} + +fn make_nullable(t: Type) -> Type { + match t { + Type::Nullable(t) => Type::Nullable(t), + t => Type::Nullable(Box::new(t)), + } +} + +fn make_nullable_field(field: ObjectField) -> Result { + Ok(ObjectField { + name: field.name, + r#type: make_nullable(field.r#type), + description: field.description, + }) +} + +/// Unify two `ObjectType`s. +/// Any field that appears in only one of the `ObjectType`s will be made nullable. +fn unify_object_type( + object_type_a: ObjectType, + object_type_b: ObjectType, +) -> TypeUnificationResult { + let field_map_a: IndexMap = object_type_a + .fields + .into_iter() + .map(|o| (o.name.to_owned(), o)) + .collect(); + let field_map_b: IndexMap = object_type_b + .fields + .into_iter() + .map(|o| (o.name.to_owned(), o)) + .collect(); + + let merged_field_map = align_with_result( + field_map_a, + field_map_b, + make_nullable_field, + make_nullable_field, + |field_a, field_b| unify_object_field(&object_type_a.name, field_a, field_b), + )?; + + Ok(ObjectType { + name: object_type_a.name, + fields: merged_field_map.into_values().collect(), + description: object_type_a.description.or(object_type_b.description), + }) +} + +/// The types of two `ObjectField`s. +/// If the types are not unifiable then return an error. +fn unify_object_field( + object_type_name: &str, + object_field_a: ObjectField, + object_field_b: ObjectField, +) -> TypeUnificationResult { + let context = TypeUnificationContext::new(object_type_name, &object_field_a.name); + Ok(ObjectField { + name: object_field_a.name, + r#type: unify_type(context, object_field_a.r#type, object_field_b.r#type)?, + description: object_field_a.description.or(object_field_b.description), + }) +} + +/// Unify two sets of `ObjectType`s. +/// Any `ObjectType` that appears in only one set will be unchanged in the output. +/// Any type that appears in both sets will be unified using `unify_object_type`. +pub fn unify_object_types( + object_types_a: Vec, + object_types_b: Vec, +) -> TypeUnificationResult> { + let type_map_a: IndexMap = object_types_a + .into_iter() + .map(|t| (t.name.to_owned(), t)) + .collect(); + let type_map_b: IndexMap = object_types_b + .into_iter() + .map(|t| (t.name.to_owned(), t)) + .collect(); + + let merged_type_map = align_with_result(type_map_a, type_map_b, Ok, Ok, unify_object_type)?; + + Ok(merged_type_map.into_values().collect()) +} + +/// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections. +pub fn unify_schema(schema_a: Schema, schema_b: Schema) -> TypeUnificationResult { + let collections = schema_a + .collections + .into_iter() + .chain(schema_b.collections) + .collect(); + let object_types = schema_a + .object_types + .into_iter() + .chain(schema_b.object_types) + .collect(); + Ok(Schema { + collections, + object_types, + }) +} diff --git a/crates/cli/src/introspection.rs b/crates/cli/src/introspection/validation_schema.rs similarity index 100% rename from crates/cli/src/introspection.rs rename to crates/cli/src/introspection/validation_schema.rs diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index b37c4ee2..e06babca 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -4,16 +4,22 @@ mod introspection; use std::path::PathBuf; -use clap::Subcommand; +use clap::{Parser, Subcommand}; use configuration::Configuration; use mongodb_agent_common::interface_types::MongoConfig; +#[derive(Debug, Clone, Parser)] +pub struct UpdateArgs { + #[arg(long = "sample-size", value_name = "N")] + sample_size: Option, +} + /// The command invoked by the user. #[derive(Debug, Clone, Subcommand)] pub enum Command { /// Update the configuration by introspecting the database, using the configuration options. - Update, + Update(UpdateArgs), } pub struct Context { @@ -24,14 +30,19 @@ pub struct Context { /// Run a command in a given directory. pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> { match command { - Command::Update => update(context).await?, + Command::Update(args) => update(context, &args).await?, }; Ok(()) } /// Update the configuration in the current directory by introspecting the database. -async fn update(context: &Context) -> anyhow::Result<()> { - let schema = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?; +async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> { + let schema = match args.sample_size { + None => introspection::get_metadata_from_validation_schema(&context.mongo_config).await?, + Some(sample_size) => { + introspection::sample_schema_from_db(sample_size, &context.mongo_config).await? + } + }; let configuration = Configuration::from_schema(schema); configuration::write_directory(&context.path, &configuration).await?; diff --git a/crates/mongodb-support/Cargo.toml b/crates/mongodb-support/Cargo.toml index b6893a8b..dbd6cb2e 100644 --- a/crates/mongodb-support/Cargo.toml +++ b/crates/mongodb-support/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] dc-api-types = { path = "../dc-api-types" } enum-iterator = "1.4.1" +indexmap = { version = "1", features = ["serde"] } # must match the version that ndc-client uses schemars = "^0.8.12" serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/crates/mongodb-support/src/align.rs b/crates/mongodb-support/src/align.rs new file mode 100644 index 00000000..25553f0f --- /dev/null +++ b/crates/mongodb-support/src/align.rs @@ -0,0 +1,24 @@ +use indexmap::IndexMap; +use std::hash::Hash; + +pub fn align_with_result(ts: IndexMap, mut us: IndexMap, ft: FT, fu: FU, ftu: FTU) -> Result, E> +where + K: Hash + Eq, + FT: Fn(T) -> Result, + FU: Fn(U) -> Result, + FTU: Fn(T, U) -> Result, +{ + let mut result: IndexMap = IndexMap::new(); + + for (k, t) in ts { + match us.swap_remove(&k) { + None => result.insert(k, ft(t)?), + Some(u) => result.insert(k, ftu(t, u)?), + }; + } + + for (k, u) in us { + result.insert(k, fu(u)?); + } + Ok(result) +} diff --git a/crates/mongodb-support/src/lib.rs b/crates/mongodb-support/src/lib.rs index ed3f1734..a2c6fc08 100644 --- a/crates/mongodb-support/src/lib.rs +++ b/crates/mongodb-support/src/lib.rs @@ -1,4 +1,5 @@ mod bson_type; pub mod error; +pub mod align; pub use self::bson_type::{BsonScalarType, BsonType};