Skip to content

Commit

Permalink
Sample documents from the database
Browse files Browse the repository at this point in the history
  • Loading branch information
dmoverton committed Mar 19, 2024
1 parent 698707a commit 9582635
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 47 deletions.
187 changes: 146 additions & 41 deletions crates/cli/src/introspection/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,81 @@ use configuration::{
schema::{Collection, ObjectField, ObjectType, Type},
Schema,
};
use futures_util::TryStreamExt;
use indexmap::IndexMap;
use mongodb::bson::{Bson, Document};
use mongodb_agent_common::interface_types::{MongoAgentError, MongoConfig};
use mongodb::bson::{doc, Bson, Document};
use mongodb_agent_common::interface_types::MongoConfig;
use mongodb_support::{
align::align_with_result,
BsonScalarType::{self, *},
BsonType,
};
use std::string::String;
use std::{
fmt::{self, Display},
string::String,
};
use thiserror::Error;

pub fn schema_from_document(
collection_name: &str,
document: &Document,
) -> Result<Schema, TypeUnificationError> {
let (object_types, collection) = make_collection(collection_name, document)?;
Ok(Schema {
collections: vec![collection],
object_types,
})
// Sample from all collections in the database
pub async fn sample_schema_from_db(
sample_size: u32,
config: &MongoConfig,
) -> anyhow::Result<Schema> {
let mut schema = Schema {
collections: vec![],
object_types: vec![],
};
let db = config.client.database(&config.database);
let mut collections_cursor = db.list_collections(None, None).await?;

while let Some(collection_spec) = collections_cursor.try_next().await? {
let collection_name = collection_spec.name;
let collection_schema =
sample_schema_from_collection(&collection_name, sample_size, config).await?;
schema = unify_schema(schema, collection_schema)?;
}
Ok(schema)
}

fn make_collection(
pub async fn sample_schema_from_collection(
collection_name: &str,
document: &Document,
) -> Result<(Vec<ObjectType>, Collection), TypeUnificationError> {
let object_type_defs = make_object_type(collection_name, document)?;
sample_size: u32,
config: &MongoConfig,
) -> anyhow::Result<Schema> {
let db = config.client.database(&config.database);
let options = None;
let mut cursor = db
.collection::<Document>(collection_name)
.aggregate(vec![doc! {"$sample": { "size": sample_size }}], options)
.await?;
let mut collected_object_types = vec![];
while let Some(document) = cursor.try_next().await? {
let object_types = make_object_type(collection_name, &document)?;
collected_object_types = unify_object_types(collected_object_types, object_types)?;
}
let collection_info = Collection {
name: collection_name.to_string(),
description: None,
r#type: collection_name.to_string(),
};

Ok((object_type_defs, collection_info))
Ok(Schema {
collections: vec![collection_info],
object_types: collected_object_types,
})
}

fn make_object_type(
object_type_name: &str,
document: &Document,
) -> Result<Vec<ObjectType>, TypeUnificationError> {
) -> TypeUnificationResult<Vec<ObjectType>> {
let (mut object_type_defs, object_fields) = {
let type_prefix = format!("{object_type_name}_");
let (object_type_defs, object_fields): (Vec<Vec<ObjectType>>, Vec<ObjectField>) = document
.iter()
.map(|(field_name, field_value)| {
make_object_fields(&type_prefix, field_name, field_value)
make_object_field(&type_prefix, field_name, field_value)
})
.collect::<Result<Vec<(Vec<ObjectType>, ObjectField)>, TypeUnificationError>>()?
.collect::<TypeUnificationResult<Vec<(Vec<ObjectType>, ObjectField)>>>()?
.into_iter()
.unzip();
(object_type_defs.concat(), object_fields)
Expand All @@ -64,28 +92,29 @@ fn make_object_type(
Ok(object_type_defs)
}

fn make_object_fields(
fn make_object_field(
type_prefix: &str,
field_name: &str,
field_value: &Bson,
) -> Result<(Vec<ObjectType>, ObjectField), TypeUnificationError> {
) -> TypeUnificationResult<(Vec<ObjectType>, ObjectField)> {
let object_type_name = format!("{type_prefix}{field_name}");
let (collected_otds, field_type) = make_field_type(&object_type_name, field_value)?;
let (collected_otds, field_type) = make_field_type(&object_type_name, field_name, field_value)?;

let object_field = ObjectField {
name: field_name.to_owned(),
description: None,
r#type: Type::Nullable(Box::new(field_type)),
r#type: field_type,
};

Ok((collected_otds, object_field))
}

fn make_field_type(
object_type_name: &str,
field_name: &str,
field_value: &Bson,
) -> Result<(Vec<ObjectType>, Type), TypeUnificationError> {
fn scalar(t: BsonScalarType) -> Result<(Vec<ObjectType>, Type), TypeUnificationError> {
) -> TypeUnificationResult<(Vec<ObjectType>, Type)> {
fn scalar(t: BsonScalarType) -> TypeUnificationResult<(Vec<ObjectType>, Type)> {
Ok((vec![], Type::Scalar(t)))
}
match field_value {
Expand All @@ -96,9 +125,11 @@ fn make_field_type(
let mut collected_otds = vec![];
let mut result_type = Type::Scalar(Undefined);
for elem in arr {
let (elem_collected_otds, elem_type) = make_field_type(object_type_name, elem)?;
collected_otds = unify_object_types(collected_otds, elem_collected_otds)?;
result_type = unify_type(result_type, elem_type)?;
let (elem_collected_otds, elem_type) =
make_field_type(object_type_name, field_name, elem)?;
collected_otds = unify_object_types(collected_otds, elem_collected_otds)?;
let context = TypeUnificationContext::new(object_type_name, field_name);
result_type = unify_type(context, result_type, elem_type)?;
}
Ok((collected_otds, Type::ArrayOf(Box::new(result_type))))
}
Expand Down Expand Up @@ -126,13 +157,65 @@ fn make_field_type(
}
}

#[derive(Debug)]
pub struct TypeUnificationContext {
object_type_name: String,
field_name: String,
}

impl TypeUnificationContext {
fn new(object_type_name: &str, field_name: &str) -> Self {
TypeUnificationContext {
object_type_name: object_type_name.to_owned(),
field_name: field_name.to_owned(),
}
}
}

impl Display for TypeUnificationContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"object type: {}, field: {}",
self.object_type_name, self.field_name
)
}
}

#[derive(Debug, Error)]
pub enum TypeUnificationError {
ScalarTypeMismatch(BsonScalarType, BsonScalarType),
ScalarTypeMismatch(TypeUnificationContext, BsonScalarType, BsonScalarType),
ObjectTypeMismatch(String, String),
TypeKindMismatch(Type, Type),
}

fn unify_type(type_a: Type, type_b: Type) -> Result<Type, TypeUnificationError> {
impl Display for TypeUnificationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ScalarTypeMismatch(context, scalar_a, scalar_b) => write!(
f,
"Scalar type mismatch {} {} at {}",
scalar_a.bson_name(),
scalar_b.bson_name(),
context
),
Self::ObjectTypeMismatch(object_a, object_b) => {
write!(f, "Object type mismatch {} {}", object_a, object_b)
}
Self::TypeKindMismatch(type_a, type_b) => {
write!(f, "Object type mismatch {:?} {:?}", type_a, type_b)
}
}
}
}

type TypeUnificationResult<T> = Result<T, TypeUnificationError>;

fn unify_type(
context: TypeUnificationContext,
type_a: Type,
type_b: Type,
) -> TypeUnificationResult<Type> {
match (type_a, type_b) {
// If one type is undefined, the union is the other type.
// This is used as the base case when inferring array types from documents.
Expand All @@ -147,7 +230,9 @@ fn unify_type(type_a: Type, type_b: Type) -> Result<Type, TypeUnificationError>
if scalar_a == scalar_b {
Ok(Type::Scalar(scalar_a))
} else {
Err(TypeUnificationError::ScalarTypeMismatch(scalar_a, scalar_b))
Err(TypeUnificationError::ScalarTypeMismatch(
context, scalar_a, scalar_b,
))
}
}
(Type::Object(object_a), Type::Object(object_b)) => {
Expand All @@ -158,15 +243,15 @@ fn unify_type(type_a: Type, type_b: Type) -> Result<Type, TypeUnificationError>
}
}
(Type::ArrayOf(elem_type_a), Type::ArrayOf(elem_type_b)) => {
let elem_type = unify_type(*elem_type_a, *elem_type_b)?;
let elem_type = unify_type(context, *elem_type_a, *elem_type_b)?;
Ok(Type::ArrayOf(Box::new(elem_type)))
}
(Type::Nullable(nullable_type_a), type_b) => {
let result_type = unify_type(*nullable_type_a, type_b)?;
let result_type = unify_type(context, *nullable_type_a, type_b)?;
Ok(make_nullable(result_type))
}
(type_a, Type::Nullable(nullable_type_b)) => {
let result_type = unify_type(type_a, *nullable_type_b)?;
let result_type = unify_type(context, type_a, *nullable_type_b)?;
Ok(make_nullable(result_type))
}
(type_a, type_b) => Err(TypeUnificationError::TypeKindMismatch(type_a, type_b)),
Expand All @@ -191,7 +276,7 @@ fn make_nullable_field<E>(field: ObjectField) -> Result<ObjectField, E> {
fn unify_object_type(
object_type_a: ObjectType,
object_type_b: ObjectType,
) -> Result<ObjectType, TypeUnificationError> {
) -> TypeUnificationResult<ObjectType> {
let field_map_a: IndexMap<String, ObjectField> = object_type_a
.fields
.into_iter()
Expand All @@ -208,7 +293,7 @@ fn unify_object_type(
field_map_b,
make_nullable_field,
make_nullable_field,
unify_object_field,
|field_a, field_b| unify_object_field(&object_type_a.name, field_a, field_b),
)?;

Ok(ObjectType {
Expand All @@ -219,20 +304,22 @@ fn unify_object_type(
}

fn unify_object_field(
object_type_name: &str,
object_field_a: ObjectField,
object_field_b: ObjectField,
) -> Result<ObjectField, TypeUnificationError> {
) -> TypeUnificationResult<ObjectField> {
let context = TypeUnificationContext::new(object_type_name, &object_field_a.name);
Ok(ObjectField {
name: object_field_a.name,
r#type: unify_type(object_field_a.r#type, object_field_b.r#type)?,
r#type: unify_type(context, object_field_a.r#type, object_field_b.r#type)?,
description: object_field_a.description.or(object_field_b.description),
})
}

fn unify_object_types(
object_types_a: Vec<ObjectType>,
object_types_b: Vec<ObjectType>,
) -> Result<Vec<ObjectType>, TypeUnificationError> {
) -> TypeUnificationResult<Vec<ObjectType>> {
let type_map_a: IndexMap<String, ObjectType> = object_types_a
.into_iter()
.map(|t| (t.name.to_owned(), t))
Expand All @@ -246,3 +333,21 @@ fn unify_object_types(

Ok(merged_type_map.into_values().collect())
}

// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections.
fn unify_schema(schema_a: Schema, schema_b: Schema) -> TypeUnificationResult<Schema> {
let collections = schema_a
.collections
.into_iter()
.chain(schema_b.collections.into_iter())
.collect();
let object_types = schema_a
.object_types
.into_iter()
.chain(schema_b.object_types.into_iter())
.collect();
Ok(Schema {
collections,
object_types,
})
}
2 changes: 1 addition & 1 deletion crates/cli/src/introspection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ pub mod document;
pub mod validation_schema;

pub use validation_schema::get_metadata_from_validation_schema;
pub use document::schema_from_document;
pub use document::sample_schema_from_db;
21 changes: 16 additions & 5 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ mod introspection;

use std::path::PathBuf;

use clap::Subcommand;
use clap::{Parser, Subcommand};

use configuration::Configuration;
use mongodb_agent_common::interface_types::MongoConfig;

#[derive(Debug, Clone, Parser)]
pub struct UpdateArgs {
#[arg(long = "sample-size", value_name = "N")]
sample_size: Option<u32>,
}

/// The command invoked by the user.
#[derive(Debug, Clone, Subcommand)]
pub enum Command {
/// Update the configuration by introspecting the database, using the configuration options.
Update,
Update(UpdateArgs),
}

pub struct Context {
Expand All @@ -24,14 +30,19 @@ pub struct Context {
/// Run a command in a given directory.
pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> {
match command {
Command::Update => update(context).await?,
Command::Update(args) => update(context, &args).await?,
};
Ok(())
}

/// Update the configuration in the current directory by introspecting the database.
async fn update(context: &Context) -> anyhow::Result<()> {
let schema = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?;
async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> {
let schema = match args.sample_size {
None => introspection::get_metadata_from_validation_schema(&context.mongo_config).await?,
Some(sample_size) => {
introspection::sample_schema_from_db(sample_size, &context.mongo_config).await?
}
};
let configuration = Configuration::from_schema(schema);

configuration::write_directory(&context.path, &configuration).await?;
Expand Down

0 comments on commit 9582635

Please sign in to comment.