Skip to content

Commit

Permalink
implement read-only native queries with no arguments (#5)
Browse files Browse the repository at this point in the history
Implements read-only native queries without arguments. I added a test query called `hello` to the fixtures. If you run with `arion up -d` and access graphiql at http://localhost:7100/ you can try it out.

I also added on `objectTypes` field to native query definitions. Object types defined there are merged with object types in `schema.json` so that users can keep hand-written types separate from generated ones.

I made some changes to `directory.rs` to add more context to configuration-parsing errors in a couple of spots. If we just stick to `io::Error` errors the messages end up being not very helpful.

Still to do in follow-up PRs:
- implement arguments
- add mutation capability to the connector
- add a mutation endpoint to handle procedures

Ticket: https://hasurahq.atlassian.net/browse/MDB-85
  • Loading branch information
hallettj authored Mar 20, 2024
1 parent e7c8f79 commit 76a3317
Show file tree
Hide file tree
Showing 15 changed files with 1,015 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> {
introspection::sample_schema_from_db(sample_size, &context.mongo_config).await?
}
};
let configuration = Configuration::from_schema(schema);
let configuration = Configuration::from_schema(schema)?;

configuration::write_directory(&context.path, &configuration).await?;

Expand Down
1 change: 1 addition & 0 deletions crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
futures = "^0.3"
itertools = "^0.12"
mongodb = "2.8"
Expand Down
96 changes: 90 additions & 6 deletions crates/configuration/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{io, path::Path};
use std::path::Path;

use anyhow::ensure;
use itertools::Itertools;
use schemars::JsonSchema;
use serde::Deserialize;

use crate::{native_queries::NativeQuery, read_directory, Schema};
use crate::{native_queries::NativeQuery, read_directory, schema::ObjectType, Schema};

#[derive(Clone, Debug, Default, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
Expand All @@ -18,16 +20,98 @@ pub struct Configuration {
}

impl Configuration {
pub fn from_schema(schema: Schema) -> Self {
Self {
pub fn validate(schema: Schema, native_queries: Vec<NativeQuery>) -> anyhow::Result<Self> {
let config = Configuration {
schema,
..Default::default()
native_queries,
};

{
let duplicate_type_names: Vec<&str> = config
.object_types()
.map(|t| t.name.as_ref())
.duplicates()
.collect();
ensure!(
duplicate_type_names.is_empty(),
"configuration contains multiple definitions for these object type names: {}",
duplicate_type_names.join(", ")
);
}

{
let duplicate_collection_names: Vec<&str> = config
.schema
.collections
.iter()
.map(|c| c.name.as_ref())
.duplicates()
.collect();
ensure!(
duplicate_collection_names.is_empty(),
"configuration contains multiple definitions for these collection names: {}",
duplicate_collection_names.join(", ")
);
}

Ok(config)
}

pub fn from_schema(schema: Schema) -> anyhow::Result<Self> {
Self::validate(schema, Default::default())
}

pub async fn parse_configuration(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Self> {
) -> anyhow::Result<Self> {
read_directory(configuration_dir).await
}

/// Returns object types collected from schema and native queries
pub fn object_types(&self) -> impl Iterator<Item = &ObjectType> {
let object_types_from_schema = self.schema.object_types.iter();
let object_types_from_native_queries = self
.native_queries
.iter()
.flat_map(|native_query| &native_query.object_types);
object_types_from_schema.chain(object_types_from_native_queries)
}
}

#[cfg(test)]
mod tests {
use mongodb::bson::doc;

use super::*;
use crate::{schema::Type, Schema};

#[test]
fn fails_with_duplicate_object_types() {
let schema = Schema {
collections: Default::default(),
object_types: vec![ObjectType {
name: "Album".to_owned(),
fields: Default::default(),
description: Default::default(),
}],
};
let native_queries = vec![NativeQuery {
name: "hello".to_owned(),
object_types: vec![ObjectType {
name: "Album".to_owned(),
fields: Default::default(),
description: Default::default(),
}],
result_type: Type::Object("Album".to_owned()),
command: doc! { "command": 1 },
arguments: Default::default(),
selection_criteria: Default::default(),
description: Default::default(),
mode: Default::default(),
}];
let result = Configuration::validate(schema, native_queries);
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("multiple definitions"));
assert!(error_msg.contains("Album"));
}
}
59 changes: 29 additions & 30 deletions crates/configuration/src/directory.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use anyhow::{anyhow, Context as _};
use futures::stream::TryStreamExt as _;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use std::{
io,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;

Expand All @@ -29,7 +27,7 @@ const YAML: FileFormat = FileFormat::Yaml;
/// Read configuration from a directory
pub async fn read_directory(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Configuration> {
) -> anyhow::Result<Configuration> {
let dir = configuration_dir.as_ref();

let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?;
Expand All @@ -38,16 +36,13 @@ pub async fn read_directory(
.await?
.unwrap_or_default();

Ok(Configuration {
schema,
native_queries,
})
Configuration::validate(schema, native_queries)
}

/// Parse all files in a directory with one of the allowed configuration extensions according to
/// the given type argument. For example if `T` is `NativeQuery` this function assumes that all
/// json and yaml files in the given directory should be parsed as native query configurations.
async fn read_subdir_configs<T>(subdir: &Path) -> io::Result<Option<Vec<T>>>
async fn read_subdir_configs<T>(subdir: &Path) -> anyhow::Result<Option<Vec<T>>>
where
for<'a> T: Deserialize<'a>,
{
Expand All @@ -57,6 +52,7 @@ where

let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?);
let configs = dir_stream
.map_err(|err| err.into())
.try_filter_map(|dir_entry| async move {
// Permits regular files and symlinks, does not filter out symlinks to directories.
let is_file = !(dir_entry.file_type().await?.is_dir());
Expand Down Expand Up @@ -86,7 +82,7 @@ where

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc; reads the file; and parses it according to its extension.
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> io::Result<T>
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
Expand All @@ -96,38 +92,39 @@ where

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc, and returns the found path with its file format.
async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> {
async fn find_file(
configuration_dir: &Path,
basename: &str,
) -> anyhow::Result<(PathBuf, FileFormat)> {
for (extension, format) in CONFIGURATION_EXTENSIONS {
let path = configuration_dir.join(format!("{basename}.{extension}"));
if fs::try_exists(&path).await? {
return Ok((path, format));
}
}

Err(io::Error::new(
io::ErrorKind::NotFound,
format!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
),
Err(anyhow!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
))
}

async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> io::Result<T>
async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
let bytes = fs::read(path.as_ref()).await?;
let value = match format {
FileFormat::Json => serde_json::from_slice(&bytes)?,
FileFormat::Json => serde_json::from_slice(&bytes)
.with_context(|| format!("error parsing {:?}", path.as_ref()))?,
FileFormat::Yaml => serde_yaml::from_slice(&bytes)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
.with_context(|| format!("error parsing {:?}", path.as_ref()))?,
};
Ok(value)
}
Expand All @@ -136,7 +133,7 @@ where
pub async fn write_directory(
configuration_dir: impl AsRef<Path>,
configuration: &Configuration,
) -> io::Result<()> {
) -> anyhow::Result<()> {
write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await
}

Expand All @@ -149,11 +146,13 @@ async fn write_file<T>(
configuration_dir: impl AsRef<Path>,
basename: &str,
value: &T,
) -> io::Result<()>
) -> anyhow::Result<()>
where
T: Serialize,
{
let path = default_file_path(configuration_dir, basename);
let bytes = serde_json::to_vec_pretty(value)?;
fs::write(path, bytes).await
fs::write(path.clone(), bytes)
.await
.with_context(|| format!("error writing {:?}", path))
}
19 changes: 16 additions & 3 deletions crates/configuration/src/native_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria};
use schemars::JsonSchema;
use serde::Deserialize;

use crate::schema::{ObjectField, Type};
use crate::schema::{ObjectField, ObjectType, Type};

/// An arbitrary database command using MongoDB's runCommand API.
/// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/
Expand All @@ -12,10 +12,19 @@ pub struct NativeQuery {
/// Name that will be used to identify the query in your data graph
pub name: String,

/// Type of data returned by the query.
/// You may define object types here to reference in `result_type`. Any types defined here will
/// be merged with the definitions in `schema.json`. This allows you to maintain hand-written
/// types for native queries without having to edit a generated `schema.json` file.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub object_types: Vec<ObjectType>,

/// Type of data returned by the query. You may reference object types defined in the
/// `object_types` list in this definition, or you may reference object types from
/// `schema.json`.
pub result_type: Type,

/// Arguments for per-query customization
#[serde(default)]
pub arguments: Vec<ObjectField>,

/// Command to run expressed as a BSON document
Expand All @@ -31,7 +40,11 @@ pub struct NativeQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,

/// Set to `readWrite` if this native query might modify data in the database.
/// Set to `readWrite` if this native query might modify data in the database. When refreshing
/// a dataconnector native queries will appear in the corresponding `DataConnectorLink`
/// definition as `functions` if they are read-only, or as `procedures` if they are read-write.
/// Functions are intended to map to GraphQL Query fields, while procedures map to Mutation
/// fields.
#[serde(default)]
pub mode: Mode,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use configuration::native_queries::NativeQuery;
use dc_api::JsonResponse;
use dc_api_types::{QueryResponse, ResponseFieldValue, RowSet};
use mongodb::Database;

use crate::interface_types::MongoAgentError;

pub async fn handle_native_query_request(
native_query: NativeQuery,
database: Database,
) -> Result<JsonResponse<QueryResponse>, MongoAgentError> {
let result = database
.run_command(native_query.command, native_query.selection_criteria)
.await?;
let result_json =
serde_json::to_value(result).map_err(|err| MongoAgentError::AdHoc(err.into()))?;

// A function returs a single row with a single column called `__value`
// https://hasura.github.io/ndc-spec/specification/queries/functions.html
let response_row = [(
"__value".to_owned(),
ResponseFieldValue::Column(result_json),
)]
.into_iter()
.collect();

Ok(JsonResponse::Value(QueryResponse::Single(RowSet {
aggregates: None,
rows: Some(vec![response_row]),
})))
}
21 changes: 16 additions & 5 deletions crates/mongodb-agent-common/src/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod column_ref;
mod constants;
mod execute_native_query_request;
mod execute_query_request;
mod foreach;
mod make_selector;
Expand All @@ -17,7 +18,10 @@ pub use self::{
make_sort::make_sort,
pipeline::{is_response_faceted, pipeline_for_non_foreach, pipeline_for_query_request},
};
use crate::interface_types::{MongoAgentError, MongoConfig};
use crate::{
interface_types::{MongoAgentError, MongoConfig},
query::execute_native_query_request::handle_native_query_request,
};

pub fn collection_name(query_request_target: &Target) -> String {
query_request_target.name().join(".")
Expand All @@ -29,10 +33,17 @@ pub async fn handle_query_request(
) -> Result<JsonResponse<QueryResponse>, MongoAgentError> {
tracing::debug!(?config, query_request = %serde_json::to_string(&query_request).unwrap(), "executing query");

let collection = config
.client
.database(&config.database)
.collection::<Document>(&collection_name(&query_request.target));
let database = config.client.database(&config.database);

let target = &query_request.target;
if let Some(native_query) = config.native_queries.iter().find(|query| {
let target_name = target.name();
target_name.len() == 1 && target_name[0] == query.name
}) {
return handle_native_query_request(native_query.clone(), database).await;
}

let collection = database.collection::<Document>(&collection_name(&query_request.target));

execute_query_request(&collection, query_request).await
}
Expand Down
Loading

0 comments on commit 76a3317

Please sign in to comment.