diff --git a/crates/sdk/src/connector.rs b/crates/sdk/src/connector.rs index 2dbaf95..72e3093 100644 --- a/crates/sdk/src/connector.rs +++ b/crates/sdk/src/connector.rs @@ -1,225 +1,10 @@ -use std::error::Error; -use std::fmt::Display; -use std::path::{Path, PathBuf}; - +use crate::json_response::JsonResponse; use async_trait::async_trait; use ndc_models as models; -use serde::Serialize; -use thiserror::Error; - -use crate::json_response::JsonResponse; - +use std::path::Path; +pub mod error; pub mod example; - -/// Errors which occur when trying to validate connector -/// configuration. -/// -/// See [`Connector::parse_configuration`]. -#[derive(Debug, Error)] -pub enum ParseError { - #[error("error parsing configuration: {0}")] - ParseError(LocatedError), - #[error("error validating configuration: {0}")] - ValidateError(InvalidNodes), - #[error("could not find configuration file: {0}")] - CouldNotFindConfiguration(PathBuf), - #[error("error processing configuration: {0}")] - IoError(#[from] std::io::Error), - #[error("error processing configuration: {0}")] - Other(#[from] Box), -} - -/// An error associated with the position of a single character in a text file. -#[derive(Debug, Clone)] -pub struct LocatedError { - pub file_path: PathBuf, - pub line: usize, - pub column: usize, - pub message: String, -} - -impl Display for LocatedError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{0}:{1}:{2}: {3}", - self.file_path.display(), - self.line, - self.column, - self.message - ) - } -} - -/// An error associated with a node in a graph structure. -#[derive(Debug, Clone)] -pub struct InvalidNode { - pub file_path: PathBuf, - pub node_path: Vec, - pub message: String, -} - -impl Display for InvalidNode { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}, at ", self.file_path.display())?; - for segment in &self.node_path { - write!(f, ".{segment}")?; - } - write!(f, ": {}", self.message)?; - Ok(()) - } -} - -/// A set of invalid nodes. -#[derive(Debug, Clone)] -pub struct InvalidNodes(pub Vec); - -impl Display for InvalidNodes { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut iterator = self.0.iter(); - if let Some(first) = iterator.next() { - first.fmt(f)?; - } - for next in iterator { - write!(f, ", {next}")?; - } - Ok(()) - } -} - -/// A segment in a node path, used with [InvalidNode]. -#[derive(Debug, Clone, Serialize)] -#[serde(untagged)] -pub enum KeyOrIndex { - Key(String), - Index(u32), -} - -impl Display for KeyOrIndex { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Key(key) => write!(f, "[{key:?}]"), - Self::Index(index) => write!(f, "[{index}]"), - } - } -} - -/// Errors which occur when trying to initialize connector -/// state. -/// -/// See [`Connector::try_init_state`]. -#[derive(Debug, Error)] -pub enum InitializationError { - #[error("error initializing connector state: {0}")] - Other(#[from] Box), -} - -/// Errors which occur when trying to update metrics. -/// -/// See [`Connector::fetch_metrics`]. -#[derive(Debug, Error)] -pub enum FetchMetricsError { - #[error("error fetching metrics: {0}")] - Other(#[from] Box), -} - -/// Errors which occur when checking connector health. -/// -/// See [`Connector::health_check`]. -#[derive(Debug, Error)] -pub enum HealthError { - #[error("error checking health status: {0}")] - Other(#[from] Box), -} - -/// Errors which occur when retrieving the connector schema. -/// -/// See [`Connector::get_schema`]. -#[derive(Debug, Error)] -pub enum SchemaError { - #[error("error retrieving the schema: {0}")] - Other(#[from] Box), -} - -/// Errors which occur when executing a query. -/// -/// See [`Connector::query`]. -#[derive(Debug, Error)] -pub enum QueryError { - /// The request was invalid or did not match the - /// requirements of the specification. This indicates - /// an error with the client. - #[error("invalid request: {0}")] - InvalidRequest(String), - /// The request was well formed but was unable to be - /// followed due to semantic errors. This indicates - /// an error with the client. - #[error("unprocessable content: {0}")] - UnprocessableContent(String), - /// The request relies on an unsupported feature or - /// capability. This may indicate an error with the client, - /// or just an unimplemented feature. - #[error("unsupported operation: {0}")] - UnsupportedOperation(String), - #[error("error executing query: {0}")] - Other(#[from] Box), -} - -/// Errors which occur when explaining a query. -/// -/// See [`Connector::query_explain`, `Connector::mutation_explain`]. -#[derive(Debug, Error)] -pub enum ExplainError { - /// The request was invalid or did not match the - /// requirements of the specification. This indicates - /// an error with the client. - #[error("invalid request: {0}")] - InvalidRequest(String), - /// The request was well formed but was unable to be - /// followed due to semantic errors. This indicates - /// an error with the client. - #[error("unprocessable content: {0}")] - UnprocessableContent(String), - /// The request relies on an unsupported feature or - /// capability. This may indicate an error with the client, - /// or just an unimplemented feature. - #[error("unsupported operation: {0}")] - UnsupportedOperation(String), - #[error("explain error: {0}")] - Other(#[from] Box), -} - -/// Errors which occur when executing a mutation. -/// -/// See [`Connector::mutation`]. -#[derive(Debug, Error)] -pub enum MutationError { - /// The request was invalid or did not match the - /// requirements of the specification. This indicates - /// an error with the client. - #[error("invalid request: {0}")] - InvalidRequest(String), - /// The request was well formed but was unable to be - /// followed due to semantic errors. This indicates - /// an error with the client. - #[error("unprocessable content: {0}")] - UnprocessableContent(String), - /// The request relies on an unsupported feature or - /// capability. This may indicate an error with the client, - /// or just an unimplemented feature. - #[error("unsupported operation: {0}")] - UnsupportedOperation(String), - /// The request would result in a conflicting state - /// in the underlying data store. - #[error("mutation would result in a conflicting state: {0}")] - Conflict(String), - /// The request would violate a constraint in the - /// underlying data store. - #[error("mutation violates constraint: {0}")] - ConstraintNotMet(String), - #[error("error executing mutation: {0}")] - Other(#[from] Box), -} +pub use error::*; /// Connectors using this library should implement this trait. /// diff --git a/crates/sdk/src/connector/error.rs b/crates/sdk/src/connector/error.rs new file mode 100644 index 0000000..7d4944d --- /dev/null +++ b/crates/sdk/src/connector/error.rs @@ -0,0 +1,516 @@ +use std::error::Error; +use std::fmt::Display; +use std::path::PathBuf; + +use axum::response::{IntoResponse, Response}; +use axum::Json; +use http::StatusCode; +use ndc_models as models; +use serde::Serialize; +use thiserror::Error; + +/// Errors which occur when trying to validate connector +/// configuration. +/// +/// See [`Connector::parse_configuration`]. +#[derive(Debug, Error)] +pub enum ParseError { + #[error("error parsing configuration: {0}")] + ParseError(LocatedError), + #[error("error validating configuration: {0}")] + ValidateError(InvalidNodes), + #[error("could not find configuration file: {0}")] + CouldNotFindConfiguration(PathBuf), + #[error("error processing configuration: {0}")] + IoError(#[from] std::io::Error), + #[error("error processing configuration: {0}")] + Other(#[from] Box), +} + +/// An error associated with the position of a single character in a text file. +#[derive(Debug, Clone)] +pub struct LocatedError { + pub file_path: PathBuf, + pub line: usize, + pub column: usize, + pub message: String, +} + +impl Display for LocatedError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{0}:{1}:{2}: {3}", + self.file_path.display(), + self.line, + self.column, + self.message + ) + } +} + +/// An error associated with a node in a graph structure. +#[derive(Debug, Clone)] +pub struct InvalidNode { + pub file_path: PathBuf, + pub node_path: Vec, + pub message: String, +} + +impl Display for InvalidNode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}, at ", self.file_path.display())?; + for segment in &self.node_path { + write!(f, ".{segment}")?; + } + write!(f, ": {}", self.message)?; + Ok(()) + } +} + +/// A set of invalid nodes. +#[derive(Debug, Clone)] +pub struct InvalidNodes(pub Vec); + +impl Display for InvalidNodes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut iterator = self.0.iter(); + if let Some(first) = iterator.next() { + first.fmt(f)?; + } + for next in iterator { + write!(f, ", {next}")?; + } + Ok(()) + } +} + +/// A segment in a node path, used with [InvalidNode]. +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] +pub enum KeyOrIndex { + Key(String), + Index(u32), +} + +impl Display for KeyOrIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Key(key) => write!(f, "[{key:?}]"), + Self::Index(index) => write!(f, "[{index}]"), + } + } +} + +/// Errors which occur when trying to initialize connector +/// state. +/// +/// See [`Connector::try_init_state`]. +#[derive(Debug, Error)] +pub enum InitializationError { + #[error("error initializing connector state: {0}")] + Other(#[from] Box), +} + +/// Errors which occur when trying to update metrics. +/// +/// See [`Connector::fetch_metrics`]. +#[derive(Debug, Error)] +pub enum FetchMetricsError { + #[error("error fetching metrics: {0}")] + Other(Box, serde_json::Value), +} + +impl FetchMetricsError { + pub fn new>>(err: E) -> Self { + Self::Other(err.into(), serde_json::Value::Null) + } + #[must_use] + pub fn with_details(self, details: serde_json::Value) -> Self { + match self { + Self::Other(err, _) => Self::Other(err, details), + } + } +} + +impl IntoResponse for FetchMetricsError { + fn into_response(self) -> Response { + match self { + Self::Other(err, details) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(models::ErrorResponse { + message: err.to_string(), + details, + }), + ), + } + .into_response() + } +} + +/// Errors which occur when checking connector health. +/// +/// See [`Connector::health_check`]. +#[derive(Debug, Error)] +pub enum HealthError { + #[error("error checking health status: {0}")] + Other(Box, serde_json::Value), +} + +impl HealthError { + pub fn new>>(err: E) -> Self { + Self::Other(err.into(), serde_json::Value::Null) + } + #[must_use] + pub fn with_details(self, details: serde_json::Value) -> Self { + match self { + Self::Other(err, _) => Self::Other(err, details), + } + } +} + +impl IntoResponse for HealthError { + fn into_response(self) -> Response { + match self { + Self::Other(err, details) => ( + StatusCode::SERVICE_UNAVAILABLE, + Json(models::ErrorResponse { + message: err.to_string(), + details, + }), + ), + } + .into_response() + } +} + +/// Errors which occur when retrieving the connector schema. +/// +/// See [`Connector::get_schema`]. +#[derive(Debug, Error)] +pub enum SchemaError { + #[error("error retrieving the schema: {0}")] + Other(Box, serde_json::Value), +} + +impl SchemaError { + pub fn new>>(err: E) -> Self { + Self::Other(err.into(), serde_json::Value::Null) + } + #[must_use] + pub fn with_details(self, details: serde_json::Value) -> Self { + match self { + Self::Other(err, _) => Self::Other(err, details), + } + } +} + +impl From> for SchemaError { + fn from(value: Box) -> Self { + Self::new(value) + } +} + +impl IntoResponse for SchemaError { + fn into_response(self) -> Response { + match self { + Self::Other(err, details) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(models::ErrorResponse { + message: err.to_string(), + details, + }), + ), + } + .into_response() + } +} + +/// Errors which occur when executing a query. +/// +/// See [`Connector::query`]. +#[derive(Debug, Error)] +pub enum QueryError { + /// The request was invalid or did not match the + /// requirements of the specification. This indicates + /// an error with the client. + #[error("invalid request: {}", .0.message)] + InvalidRequest(models::ErrorResponse), + /// The request was well formed but was unable to be + /// followed due to semantic errors. This indicates + /// an error with the client. + #[error("unprocessable content: {}", .0.message)] + UnprocessableContent(models::ErrorResponse), + /// The request relies on an unsupported feature or + /// capability. This may indicate an error with the client, + /// or just an unimplemented feature. + #[error("unsupported operation: {}", .0.message)] + UnsupportedOperation(models::ErrorResponse), + #[error("error executing query: {0}")] + Other(Box, serde_json::Value), +} + +impl QueryError { + pub fn new>>(err: E) -> Self { + Self::Other(err.into(), serde_json::Value::Null) + } + pub fn new_invalid_request(message: &T) -> Self { + Self::InvalidRequest(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_unprocessable_content(message: &T) -> Self { + Self::UnprocessableContent(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_unsupported_operation(message: &T) -> Self { + Self::UnsupportedOperation(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + #[must_use] + pub fn with_details(self, details: serde_json::Value) -> Self { + match self { + Self::InvalidRequest(models::ErrorResponse { message, .. }) => { + Self::InvalidRequest(models::ErrorResponse { message, details }) + } + Self::UnprocessableContent(models::ErrorResponse { message, .. }) => { + Self::UnprocessableContent(models::ErrorResponse { message, details }) + } + Self::UnsupportedOperation(models::ErrorResponse { message, .. }) => { + Self::UnsupportedOperation(models::ErrorResponse { message, details }) + } + Self::Other(err, _) => Self::Other(err, details), + } + } +} + +impl From> for QueryError { + fn from(value: Box) -> Self { + Self::new(value) + } +} + +impl IntoResponse for QueryError { + fn into_response(self) -> Response { + match self { + Self::InvalidRequest(err) => (StatusCode::BAD_REQUEST, Json(err)), + Self::UnprocessableContent(err) => (StatusCode::UNPROCESSABLE_ENTITY, Json(err)), + Self::UnsupportedOperation(err) => (StatusCode::NOT_IMPLEMENTED, Json(err)), + Self::Other(err, details) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(models::ErrorResponse { + message: err.to_string(), + details, + }), + ), + } + .into_response() + } +} + +/// Errors which occur when explaining a query. +/// +/// See [`Connector::query_explain`, `Connector::mutation_explain`]. +#[derive(Debug, Error)] +pub enum ExplainError { + /// The request was invalid or did not match the + /// requirements of the specification. This indicates + /// an error with the client. + #[error("invalid request: {}", .0.message)] + InvalidRequest(models::ErrorResponse), + /// The request was well formed but was unable to be + /// followed due to semantic errors. This indicates + /// an error with the client. + #[error("unprocessable content: {}", .0.message)] + UnprocessableContent(models::ErrorResponse), + /// The request relies on an unsupported feature or + /// capability. This may indicate an error with the client, + /// or just an unimplemented feature. + #[error("unsupported operation: {}", .0.message)] + UnsupportedOperation(models::ErrorResponse), + #[error("explain error: {0}")] + Other(Box, serde_json::Value), +} + +impl ExplainError { + pub fn new>>(err: E) -> Self { + Self::Other(err.into(), serde_json::Value::Null) + } + pub fn new_invalid_request(message: &T) -> Self { + Self::InvalidRequest(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_unprocessable_content(message: &T) -> Self { + Self::UnprocessableContent(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_unsupported_operation(message: &T) -> Self { + Self::UnsupportedOperation(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + #[must_use] + pub fn with_details(self, details: serde_json::Value) -> Self { + match self { + Self::InvalidRequest(models::ErrorResponse { message, .. }) => { + Self::InvalidRequest(models::ErrorResponse { message, details }) + } + Self::UnprocessableContent(models::ErrorResponse { message, .. }) => { + Self::UnprocessableContent(models::ErrorResponse { message, details }) + } + Self::UnsupportedOperation(models::ErrorResponse { message, .. }) => { + Self::UnsupportedOperation(models::ErrorResponse { message, details }) + } + Self::Other(err, _) => Self::Other(err, details), + } + } +} + +impl From> for ExplainError { + fn from(value: Box) -> Self { + Self::new(value) + } +} + +impl IntoResponse for ExplainError { + fn into_response(self) -> Response { + match self { + Self::InvalidRequest(err) => (StatusCode::BAD_REQUEST, Json(err)), + Self::UnprocessableContent(err) => (StatusCode::UNPROCESSABLE_ENTITY, Json(err)), + Self::UnsupportedOperation(err) => (StatusCode::NOT_IMPLEMENTED, Json(err)), + Self::Other(err, details) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(models::ErrorResponse { + message: err.to_string(), + details, + }), + ), + } + .into_response() + } +} + +/// Errors which occur when executing a mutation. +/// +/// See [`Connector::mutation`]. +#[derive(Debug, Error)] +pub enum MutationError { + /// The request was invalid or did not match the + /// requirements of the specification. This indicates + /// an error with the client. + #[error("invalid request: {}", .0.message)] + InvalidRequest(models::ErrorResponse), + /// The request was well formed but was unable to be + /// followed due to semantic errors. This indicates + /// an error with the client. + #[error("unprocessable content: {}", .0.message)] + UnprocessableContent(models::ErrorResponse), + /// The request relies on an unsupported feature or + /// capability. This may indicate an error with the client, + /// or just an unimplemented feature. + #[error("unsupported operation: {}", .0.message)] + UnsupportedOperation(models::ErrorResponse), + /// The request would result in a conflicting state + /// in the underlying data store. + #[error("mutation would result in a conflicting state: {}", .0.message)] + Conflict(models::ErrorResponse), + /// The request would violate a constraint in the + /// underlying data store. + #[error("mutation violates constraint: {}", .0.message)] + ConstraintNotMet(models::ErrorResponse), + #[error("error executing mutation: {0}")] + Other(Box, serde_json::Value), +} + +impl MutationError { + pub fn new>>(err: E) -> Self { + Self::Other(err.into(), serde_json::Value::Null) + } + pub fn new_invalid_request(message: &T) -> Self { + Self::InvalidRequest(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_unprocessable_content(message: &T) -> Self { + Self::UnprocessableContent(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_unsupported_operation(message: &T) -> Self { + Self::UnsupportedOperation(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_conflict(message: &T) -> Self { + Self::Conflict(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + pub fn new_constraint_not_met(message: &T) -> Self { + Self::ConstraintNotMet(models::ErrorResponse { + message: message.to_string(), + details: serde_json::Value::Null, + }) + } + #[must_use] + pub fn with_details(self, details: serde_json::Value) -> Self { + match self { + Self::InvalidRequest(models::ErrorResponse { message, .. }) => { + Self::InvalidRequest(models::ErrorResponse { message, details }) + } + Self::UnprocessableContent(models::ErrorResponse { message, .. }) => { + Self::UnprocessableContent(models::ErrorResponse { message, details }) + } + Self::UnsupportedOperation(models::ErrorResponse { message, .. }) => { + Self::UnsupportedOperation(models::ErrorResponse { message, details }) + } + Self::Conflict(models::ErrorResponse { message, .. }) => { + Self::Conflict(models::ErrorResponse { message, details }) + } + Self::ConstraintNotMet(models::ErrorResponse { message, .. }) => { + Self::ConstraintNotMet(models::ErrorResponse { message, details }) + } + Self::Other(err, _) => Self::Other(err, details), + } + } +} + +impl From> for MutationError { + fn from(value: Box) -> Self { + Self::new(value) + } +} + +impl IntoResponse for MutationError { + fn into_response(self) -> Response { + match self { + Self::InvalidRequest(err) => (StatusCode::BAD_REQUEST, Json(err)), + Self::UnprocessableContent(err) => (StatusCode::UNPROCESSABLE_ENTITY, Json(err)), + Self::UnsupportedOperation(err) => (StatusCode::NOT_IMPLEMENTED, Json(err)), + Self::Conflict(err) => (StatusCode::CONFLICT, Json(err)), + Self::ConstraintNotMet(err) => (StatusCode::FORBIDDEN, Json(err)), + Self::Other(err, details) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(models::ErrorResponse { + message: err.to_string(), + details, + }), + ), + } + .into_response() + } +} diff --git a/crates/sdk/src/default_main.rs b/crates/sdk/src/default_main.rs index c692e79..048aac5 100644 --- a/crates/sdk/src/default_main.rs +++ b/crates/sdk/src/default_main.rs @@ -1,7 +1,12 @@ -use std::error::Error; -use std::net; -use std::path::{Path, PathBuf}; - +use crate::check_health; +use crate::connector::{ + Connector, ConnectorSetup, ExplainError, FetchMetricsError, HealthError, MutationError, + QueryError, SchemaError, +}; +use crate::fetch_metrics::fetch_metrics; +use crate::json_rejection::JsonRejection; +use crate::json_response::JsonResponse; +use crate::tracing::{init_tracing, make_span, on_response}; use axum::{ body::Body, extract::State, @@ -12,20 +17,15 @@ use axum::{ }; use axum_extra::extract::WithRejection; use clap::{Parser, Subcommand}; -use prometheus::Registry; -use tower_http::{trace::TraceLayer, validate_request::ValidateRequestHeaderLayer}; - use ndc_models::{ CapabilitiesResponse, ErrorResponse, ExplainResponse, MutationRequest, MutationResponse, QueryRequest, QueryResponse, SchemaResponse, }; - -use crate::check_health; -use crate::connector::{Connector, ConnectorSetup}; -use crate::json_rejection::JsonRejection; -use crate::json_response::JsonResponse; -use crate::routes; -use crate::tracing::{init_tracing, make_span, on_response}; +use prometheus::Registry; +use std::error::Error; +use std::net; +use std::path::{Path, PathBuf}; +use tower_http::{trace::TraceLayer, validate_request::ValidateRequestHeaderLayer}; #[derive(Parser)] struct CliArgs { @@ -373,57 +373,56 @@ fn auth_handler( async fn get_metrics( State(state): State>, -) -> Result)> { - routes::get_metrics::(&state.configuration, &state.state, &state.metrics) +) -> Result { + fetch_metrics::(&state.configuration, &state.state, &state.metrics) } async fn get_capabilities() -> JsonResponse { - routes::get_capabilities::().await + C::get_capabilities().await } -async fn get_health( - State(state): State>, -) -> Result<(), (StatusCode, Json)> { - routes::get_health::(&state.configuration, &state.state).await +async fn get_health(State(state): State>) -> Result<(), HealthError> { + C::health_check(&state.configuration, &state.state).await } async fn get_schema( State(state): State>, -) -> Result, (StatusCode, Json)> { - routes::get_schema::(&state.configuration).await +) -> Result, SchemaError> { + C::get_schema(&state.configuration).await } async fn post_query_explain( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, Json)> { - routes::post_query_explain::(&state.configuration, &state.state, request).await +) -> Result, ExplainError> { + C::query_explain(&state.configuration, &state.state, request).await } async fn post_mutation_explain( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, Json)> { - routes::post_mutation_explain::(&state.configuration, &state.state, request).await +) -> Result, ExplainError> { + C::mutation_explain(&state.configuration, &state.state, request).await } async fn post_mutation( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, Json)> { - routes::post_mutation::(&state.configuration, &state.state, request).await +) -> Result, MutationError> { + C::mutation(&state.configuration, &state.state, request).await } async fn post_query( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, Json)> { - routes::post_query::(&state.configuration, &state.state, request).await +) -> Result, QueryError> { + C::query(&state.configuration, &state.state, request).await } #[cfg(feature = "ndc-test")] mod ndc_test_commands { use super::{BenchCommand, Connector, ConnectorSetup}; + use crate::connector::QueryError; use crate::json_response::JsonResponse; use async_trait::async_trait; use ndc_test::reporter::{ConsoleReporter, TestResults}; @@ -444,15 +443,15 @@ mod ndc_test_commands { ) -> Result { C::get_capabilities() .await - .into_value::>() - .map_err(|err| ndc_test::error::Error::OtherError(err)) + .into_value::>() + .map_err(ndc_test::error::Error::OtherError) } async fn get_schema(&self) -> Result { match C::get_schema(&self.configuration).await { Ok(response) => response - .into_value::>() - .map_err(|err| ndc_test::error::Error::OtherError(err)), + .into_value::>() + .map_err(ndc_test::error::Error::OtherError), Err(err) => Err(ndc_test::error::Error::OtherError(err.into())), } } @@ -461,26 +460,27 @@ mod ndc_test_commands { &self, request: ndc_models::QueryRequest, ) -> Result { - match C::query(&self.configuration, &self.state, request) + Ok(C::query(&self.configuration, &self.state, request) .await .and_then(JsonResponse::into_value) - { - Ok(response) => Ok(response), - Err(err) => Err(ndc_test::error::Error::OtherError(err.into())), - } + .map_err(|err| match err { + QueryError::InvalidRequest(err) + | QueryError::UnprocessableContent(err) + | QueryError::UnsupportedOperation(err) => { + ndc_test::error::Error::OtherError(err.message.into()) + } + QueryError::Other(err, _) => ndc_test::error::Error::OtherError(err), + })?) } async fn mutation( &self, request: ndc_models::MutationRequest, ) -> Result { - match C::mutation(&self.configuration, &self.state, request) + Ok(C::mutation(&self.configuration, &self.state, request) .await .and_then(JsonResponse::into_value) - { - Ok(response) => Ok(response), - Err(err) => Err(ndc_test::error::Error::OtherError(err.into())), - } + .map_err(|err| ndc_test::error::Error::OtherError(err.into()))?) } } diff --git a/crates/sdk/src/fetch_metrics.rs b/crates/sdk/src/fetch_metrics.rs new file mode 100644 index 0000000..654f371 --- /dev/null +++ b/crates/sdk/src/fetch_metrics.rs @@ -0,0 +1,18 @@ +use crate::connector::{Connector, FetchMetricsError}; +use prometheus::{Registry, TextEncoder}; + +pub fn fetch_metrics( + configuration: &C::Configuration, + state: &C::State, + metrics: &Registry, +) -> Result { + let encoder = TextEncoder::new(); + + C::fetch_metrics(configuration, state)?; + + let metric_families = &metrics.gather(); + + encoder + .encode_to_string(metric_families) + .map_err(|_| FetchMetricsError::new("Unable to encode metrics")) +} diff --git a/crates/sdk/src/json_response.rs b/crates/sdk/src/json_response.rs index 85be6e8..4789177 100644 --- a/crates/sdk/src/json_response.rs +++ b/crates/sdk/src/json_response.rs @@ -27,9 +27,7 @@ impl serde::Deserialize<'de>)> JsonResponse { /// /// This is only intended for testing and compatibility. If it lives on a /// critical path, we recommend you avoid it. - pub(crate) fn into_value>>( - self, - ) -> Result { + pub(crate) fn into_value>>(self) -> Result { match self { Self::Value(value) => Ok(value), Self::Serialized(bytes) => { diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index f4d54c7..a4741cd 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -1,9 +1,9 @@ pub mod check_health; pub mod connector; pub mod default_main; +pub mod fetch_metrics; pub mod json_rejection; pub mod json_response; -pub mod routes; pub mod tracing; pub use ndc_models as models; diff --git a/crates/sdk/src/routes.rs b/crates/sdk/src/routes.rs deleted file mode 100644 index fb755b5..0000000 --- a/crates/sdk/src/routes.rs +++ /dev/null @@ -1,269 +0,0 @@ -use axum::{http::StatusCode, Json}; -use ndc_models as models; -use prometheus::{Registry, TextEncoder}; - -use crate::{ - connector::{Connector, HealthError}, - json_response::JsonResponse, -}; - -pub fn get_metrics( - configuration: &C::Configuration, - state: &C::State, - metrics: &Registry, -) -> Result)> { - let encoder = TextEncoder::new(); - - // Ask the connector to update all its metrics - C::fetch_metrics(configuration, state).map_err(|_| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(models::ErrorResponse { - message: "Unable to fetch metrics".into(), - details: serde_json::Value::Null, - }), - ) - })?; - - let metric_families = metrics.gather(); - - encoder.encode_to_string(&metric_families).map_err(|_| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(models::ErrorResponse { - message: "Unable to encode metrics".into(), - details: serde_json::Value::Null, - }), - ) - }) -} - -pub async fn get_capabilities() -> JsonResponse { - C::get_capabilities().await -} - -pub async fn get_health( - configuration: &C::Configuration, - state: &C::State, -) -> Result<(), (StatusCode, Json)> { - // the context extractor will error if the deployment can't be found. - match C::health_check(configuration, state).await { - Ok(()) => Ok(()), - Err(HealthError::Other(err)) => Err(( - StatusCode::SERVICE_UNAVAILABLE, - Json(models::ErrorResponse { - message: err.to_string(), - details: serde_json::Value::Null, - }), - )), - } -} - -pub async fn get_schema( - configuration: &C::Configuration, -) -> Result, (StatusCode, Json)> { - C::get_schema(configuration).await.map_err(|e| match e { - crate::connector::SchemaError::Other(err) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(models::ErrorResponse { - message: "Internal error".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "cause".into(), - serde_json::Value::String(err.to_string()), - )])), - }), - ), - }) -} - -/// Invoke the connector's mutation_explain method and potentially map errors back to error responses. -pub async fn post_mutation_explain( - configuration: &C::Configuration, - state: &C::State, - request: models::MutationRequest, -) -> Result, (StatusCode, Json)> { - C::mutation_explain(configuration, state, request) - .await - .map_err(convert_explain_error) -} - -/// Invoke the connector's query_explain method and potentially map errors back to error responses. -pub async fn post_query_explain( - configuration: &C::Configuration, - state: &C::State, - request: models::QueryRequest, -) -> Result, (StatusCode, Json)> { - C::query_explain(configuration, state, request) - .await - .map_err(convert_explain_error) -} - -/// Convert an sdk explain error to an error response and status code. -fn convert_explain_error( - error: crate::connector::ExplainError, -) -> (StatusCode, Json) { - match error { - crate::connector::ExplainError::Other(err) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(models::ErrorResponse { - message: "Internal error".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "cause".into(), - serde_json::Value::String(err.to_string()), - )])), - }), - ), - crate::connector::ExplainError::InvalidRequest(detail) => ( - StatusCode::BAD_REQUEST, - Json(models::ErrorResponse { - message: "Invalid request".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::ExplainError::UnprocessableContent(detail) => ( - StatusCode::UNPROCESSABLE_ENTITY, - Json(models::ErrorResponse { - message: "Unprocessable content".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::ExplainError::UnsupportedOperation(detail) => ( - StatusCode::NOT_IMPLEMENTED, - Json(models::ErrorResponse { - message: "Unsupported operation".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - } -} - -pub async fn post_mutation( - configuration: &C::Configuration, - state: &C::State, - request: models::MutationRequest, -) -> Result, (StatusCode, Json)> { - C::mutation(configuration, state, request) - .await - .map_err(|e| match e { - crate::connector::MutationError::Other(err) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(models::ErrorResponse { - message: "Internal error".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "cause".into(), - serde_json::Value::String(err.to_string()), - )])), - }), - ), - crate::connector::MutationError::InvalidRequest(detail) => ( - StatusCode::BAD_REQUEST, - Json(models::ErrorResponse { - message: "Invalid request".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::MutationError::UnprocessableContent(detail) => ( - StatusCode::UNPROCESSABLE_ENTITY, - Json(models::ErrorResponse { - message: "Unprocessable content".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::MutationError::UnsupportedOperation(detail) => ( - StatusCode::NOT_IMPLEMENTED, - Json(models::ErrorResponse { - message: "Unsupported operation".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::MutationError::Conflict(detail) => ( - StatusCode::CONFLICT, - Json(models::ErrorResponse { - message: "Request would create a conflicting state".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::MutationError::ConstraintNotMet(detail) => ( - StatusCode::FORBIDDEN, - Json(models::ErrorResponse { - message: "Constraint not met".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - }) -} - -pub async fn post_query( - configuration: &C::Configuration, - state: &C::State, - request: models::QueryRequest, -) -> Result, (StatusCode, Json)> { - C::query(configuration, state, request) - .await - .map_err(|e| match e { - crate::connector::QueryError::Other(err) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(models::ErrorResponse { - message: "Internal error".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "cause".into(), - serde_json::Value::String(err.to_string()), - )])), - }), - ), - crate::connector::QueryError::InvalidRequest(detail) => ( - StatusCode::BAD_REQUEST, - Json(models::ErrorResponse { - message: "Invalid request".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::QueryError::UnprocessableContent(detail) => ( - StatusCode::UNPROCESSABLE_ENTITY, - Json(models::ErrorResponse { - message: "Unprocessable content".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - crate::connector::QueryError::UnsupportedOperation(detail) => ( - StatusCode::NOT_IMPLEMENTED, - Json(models::ErrorResponse { - message: "Unsupported operation".into(), - details: serde_json::Value::Object(serde_json::Map::from_iter([( - "detail".into(), - serde_json::Value::String(detail), - )])), - }), - ), - }) -}