Skip to content

Commit

Permalink
Separate schema files for each collection (#14)
Browse files Browse the repository at this point in the history
* write schema dir

* Read and write schemas from schema/ subdirectory

* Don't sample from collections that already have a schema

* Add changelog

* Remove commented out code
  • Loading branch information
dmoverton authored Mar 27, 2024
1 parent ebcf9d0 commit 4c9af83
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 94 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
This changelog documents the changes between release versions.

## [Unreleased]
Changes to be included in the next upcoming release
- Use separate schema files for each collection
- Don't sample from collections that already have a schema

## [0.0.2] - 2024-03-26
- Rename CLI plugin to ndc-mongodb ([PR #13](https://github.com/hasura/ndc-mongodb/pull/13))
Expand Down
22 changes: 11 additions & 11 deletions crates/cli/src/introspection/sampling.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};

use super::type_unification::{
unify_object_types, unify_schema, unify_type, TypeUnificationContext, TypeUnificationResult,
unify_object_types, unify_type, TypeUnificationContext, TypeUnificationResult,
};
use configuration::{
schema::{self, Type},
Expand All @@ -22,21 +22,21 @@ type ObjectType = WithName<schema::ObjectType>;
pub async fn sample_schema_from_db(
sample_size: u32,
config: &MongoConfig,
) -> anyhow::Result<Schema> {
let mut schema = Schema {
collections: BTreeMap::new(),
object_types: BTreeMap::new(),
};
existing_schemas: &HashSet<std::string::String>,
) -> anyhow::Result<BTreeMap<std::string::String, Schema>> {
let mut schemas = BTreeMap::new();
let db = config.client.database(&config.database);
let mut collections_cursor = db.list_collections(None, None).await?;

while let Some(collection_spec) = collections_cursor.try_next().await? {
let collection_name = collection_spec.name;
let collection_schema =
sample_schema_from_collection(&collection_name, sample_size, config).await?;
schema = unify_schema(schema, collection_schema);
if !existing_schemas.contains(&collection_name) {
let collection_schema =
sample_schema_from_collection(&collection_name, sample_size, config).await?;
schemas.insert(collection_name, collection_schema);
}
}
Ok(schema)
Ok(schemas)
}

async fn sample_schema_from_collection(
Expand Down
20 changes: 1 addition & 19 deletions crates/cli/src/introspection/type_unification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
///
use configuration::{
schema::{self, Type},
Schema, WithName,
WithName,
};
use indexmap::IndexMap;
use itertools::Itertools as _;
Expand Down Expand Up @@ -255,24 +255,6 @@ pub fn unify_object_types(
Ok(merged_type_map.into_values().collect())
}

/// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections.
pub fn unify_schema(schema_a: Schema, schema_b: Schema) -> Schema {
let collections = schema_a
.collections
.into_iter()
.chain(schema_b.collections)
.collect();
let object_types = schema_a
.object_types
.into_iter()
.chain(schema_b.object_types)
.collect();
Schema {
collections,
object_types,
}
}

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
42 changes: 29 additions & 13 deletions crates/cli/src/introspection/validation_schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use configuration::{
schema::{self, Type},
Schema, WithName,
Expand All @@ -16,14 +18,14 @@ type ObjectField = WithName<schema::ObjectField>;

pub async fn get_metadata_from_validation_schema(
config: &MongoConfig,
) -> Result<Schema, MongoAgentError> {
) -> Result<BTreeMap<String, Schema>, MongoAgentError> {
let db = config.client.database(&config.database);
let collections_cursor = db.list_collections(None, None).await?;

let (object_types, collections) = collections_cursor
let schemas: Vec<WithName<Schema>> = collections_cursor
.into_stream()
.map(
|collection_spec| -> Result<(Vec<ObjectType>, Collection), MongoAgentError> {
|collection_spec| -> Result<WithName<Schema>, MongoAgentError> {
let collection_spec_value = collection_spec?;
let name = &collection_spec_value.name;
let schema_bson_option = collection_spec_value
Expand All @@ -49,16 +51,27 @@ pub async fn get_metadata_from_validation_schema(
properties: IndexMap::new(),
}),
}
.map(|validator_schema| make_collection(name, &validator_schema))
.map(|validator_schema| make_collection_schema(name, &validator_schema))
},
)
.try_collect::<(Vec<Vec<ObjectType>>, Vec<Collection>)>()
.try_collect::<Vec<WithName<Schema>>>()
.await?;

Ok(Schema {
collections: WithName::into_map(collections),
object_types: WithName::into_map(object_types.concat()),
})
Ok(WithName::into_map(schemas))
}

fn make_collection_schema(
collection_name: &str,
validator_schema: &ValidatorSchema,
) -> WithName<Schema> {
let (object_types, collection) = make_collection(collection_name, validator_schema);
WithName::named(
collection.name.clone(),
Schema {
collections: WithName::into_map(vec![collection]),
object_types: WithName::into_map(object_types),
},
)
}

fn make_collection(
Expand Down Expand Up @@ -100,10 +113,13 @@ fn make_collection(

object_type_defs.push(collection_type);

let collection_info = WithName::named(collection_name, schema::Collection {
description: validator_schema.description.clone(),
r#type: collection_name.to_string(),
});
let collection_info = WithName::named(
collection_name,
schema::Collection {
description: validator_schema.description.clone(),
r#type: collection_name.to_string(),
},
);

(object_type_defs, collection_info)
}
Expand Down
15 changes: 9 additions & 6 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::path::PathBuf;

use clap::{Parser, Subcommand};

use configuration::Configuration;
use mongodb_agent_common::interface_types::MongoConfig;

#[derive(Debug, Clone, Parser)]
Expand Down Expand Up @@ -37,15 +36,19 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> {

/// Update the configuration in the current directory by introspecting the database.
async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> {
let schema = match args.sample_size {
let schemas = match args.sample_size {
None => introspection::get_metadata_from_validation_schema(&context.mongo_config).await?,
Some(sample_size) => {
introspection::sample_schema_from_db(sample_size, &context.mongo_config).await?
let existing_schemas = configuration::list_existing_schemas(&context.path).await?;
introspection::sample_schema_from_db(
sample_size,
&context.mongo_config,
&existing_schemas,
)
.await?
}
};
let configuration = Configuration::from_schema(schema)?;

configuration::write_directory(&context.path, &configuration).await?;
configuration::write_schema_directory(&context.path, schemas).await?;

Ok(())
}
86 changes: 43 additions & 43 deletions crates/configuration/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use futures::stream::TryStreamExt as _;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashSet},
path::{Path, PathBuf},
};
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;

use crate::{with_name::WithName, Configuration};
use crate::{with_name::WithName, Configuration, Schema};

pub const SCHEMA_FILENAME: &str = "schema";
pub const SCHEMA_DIRNAME: &str = "schema";
pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries";

pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] =
Expand All @@ -33,7 +33,10 @@ pub async fn read_directory(
) -> anyhow::Result<Configuration> {
let dir = configuration_dir.as_ref();

let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?;
let schemas = read_subdir_configs(&dir.join(SCHEMA_DIRNAME))
.await?
.unwrap_or_default();
let schema = schemas.into_values().fold(Schema::default(), Schema::merge);

let native_queries = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME))
.await?
Expand Down Expand Up @@ -100,41 +103,6 @@ where
}
}

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc; reads the file; and parses it according to its extension.
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
let (path, format) = find_file(configuration_dir, basename).await?;
parse_config_file(path, format).await
}

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc, and returns the found path with its file format.
async fn find_file(
configuration_dir: &Path,
basename: &str,
) -> anyhow::Result<(PathBuf, FileFormat)> {
for (extension, format) in CONFIGURATION_EXTENSIONS {
let path = configuration_dir.join(format!("{basename}.{extension}"));
if fs::try_exists(&path).await? {
return Ok((path, format));
}
}

Err(anyhow!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
))
}

async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
Expand All @@ -149,12 +117,31 @@ where
Ok(value)
}

/// Currently only writes `schema.json`
pub async fn write_directory(
async fn write_subdir_configs<T>(
subdir: &Path,
configs: impl IntoIterator<Item = (String, T)>,
) -> anyhow::Result<()>
where
T: Serialize,
{
if !(fs::try_exists(subdir).await?) {
fs::create_dir(subdir).await?;
}

for (name, config) in configs {
let with_name: WithName<T> = (name.clone(), config).into();
write_file(subdir, &name, &with_name).await?;
}

Ok(())
}

pub async fn write_schema_directory(
configuration_dir: impl AsRef<Path>,
configuration: &Configuration,
schemas: impl IntoIterator<Item = (String, Schema)>,
) -> anyhow::Result<()> {
write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await
let subdir = configuration_dir.as_ref().join(SCHEMA_DIRNAME);
write_subdir_configs(&subdir, schemas).await
}

fn default_file_path(configuration_dir: impl AsRef<Path>, basename: &str) -> PathBuf {
Expand All @@ -176,3 +163,16 @@ where
.await
.with_context(|| format!("error writing {:?}", path))
}

pub async fn list_existing_schemas(
configuration_dir: impl AsRef<Path>,
) -> anyhow::Result<HashSet<String>> {
let dir = configuration_dir.as_ref();

// TODO: we don't really need to read and parse all the schema files here, just get their names.
let schemas = read_subdir_configs::<Schema>(&dir.join(SCHEMA_DIRNAME))
.await?
.unwrap_or_default();

Ok(schemas.into_keys().collect())
}
3 changes: 2 additions & 1 deletion crates/configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ pub mod schema;
mod with_name;

pub use crate::configuration::Configuration;
pub use crate::directory::list_existing_schemas;
pub use crate::directory::read_directory;
pub use crate::directory::write_directory;
pub use crate::directory::write_schema_directory;
pub use crate::schema::Schema;
pub use crate::with_name::{WithName, WithNameRef};
19 changes: 19 additions & 0 deletions crates/configuration/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,23 @@ impl Schema {
.iter()
.map(|(name, field)| WithNameRef::named(name, field))
}

/// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections.
pub fn merge(schema_a: Schema, schema_b: Schema) -> Schema {
let collections = schema_a
.collections
.into_iter()
.chain(schema_b.collections)
.collect();
let object_types = schema_a
.object_types
.into_iter()
.chain(schema_b.object_types)
.collect();
Schema {
collections,
object_types,
}
}

}

0 comments on commit 4c9af83

Please sign in to comment.