From 02e62f64306bbd995f03de2529910ae76328b5ac Mon Sep 17 00:00:00 2001 From: etorreborre Date: Fri, 21 Jun 2024 17:24:37 +0200 Subject: [PATCH] feat(rust): remove the intermediate encoding of bound values --- Cargo.lock | 14 +- .../rust/ockam/ockam_abac/Cargo.toml | 2 +- .../storage/resource_policy_repository_sql.rs | 32 +--- .../policy/storage/resource_repository_sql.rs | 28 ++- .../resource_type_policy_repository_sql.rs | 66 +++++-- .../rust/ockam/ockam_abac/src/types.rs | 4 + .../rust/ockam/ockam_api/Cargo.toml | 2 +- .../src/authenticator/one_time_code.rs | 7 - ...thority_enrollment_token_repository_sql.rs | 36 ++-- .../authority_members_repository_sql.rs | 18 +- .../ockam_api/src/cli_state/cli_state.rs | 48 +++-- .../ockam/ockam_api/src/cli_state/nodes.rs | 7 +- .../storage/enrollments_repository_sql.rs | 8 +- .../storage/identities_repository_sql.rs | 12 +- .../storage/journeys_repository_sql.rs | 12 +- .../cli_state/storage/nodes_repository_sql.rs | 30 +++- .../storage/projects_repository_sql.rs | 68 +++++-- .../storage/tcp_portals_repository_sql.rs | 10 +- .../cli_state/storage/users_repository_sql.rs | 12 +- .../storage/vaults_repository_sql.rs | 6 +- .../ockam_api/src/cli_state/test_support.rs | 20 ++- .../ockam/ockam_api/src/cli_state/vaults.rs | 17 +- .../ockam_api/src/kafka/integration_test.rs | 3 +- .../ockam_api/src/kafka/portal_worker.rs | 10 +- .../src/kafka/protocol_aware/tests.rs | 2 + .../src/nodes/service/in_memory_node.rs | 11 +- .../ockam/ockam_api/src/test_utils/mod.rs | 15 +- .../rust/ockam/ockam_api/tests/latency.rs | 2 + .../rust/ockam/ockam_api/tests/portals.rs | 5 + .../rust/ockam/ockam_app_lib/Cargo.toml | 2 +- .../rust/ockam/ockam_app_lib/src/lib.rs | 2 + .../src/state/model_state_repository_sql.rs | 6 +- .../rust/ockam/ockam_identity/Cargo.toml | 2 +- .../storage/change_history_repository_sql.rs | 46 +++-- .../storage/credential_repository_sql.rs | 36 +++- .../identity_attributes_repository_sql.rs | 32 ++-- .../ockam_identity/src/models/identifiers.rs | 1 - .../ockam_identity/src/models/timestamp.rs | 15 ++ .../storage/purpose_keys_repository_sql.rs | 59 ++++--- .../storage/secure_channel_repository_sql.rs | 14 +- .../rust/ockam/ockam_node/Cargo.toml | 2 +- .../migrations/migration_support/migrator.rs | 4 +- ...231100000_node_name_identity_attributes.rs | 6 +- ...ion_20240111100002_delete_trust_context.rs | 6 +- ..._20240313100000_remove_orphan_resources.rs | 12 +- ...0240503100000_update_policy_expressions.rs | 8 +- .../ockam_node/src/storage/database/mod.rs | 2 - .../src/storage/database/sqlx_database.rs | 89 ++++++---- .../src/storage/database/sqlx_types.rs | 166 ------------------ .../rust/ockam/ockam_vault/Cargo.toml | 2 +- .../src/software/vault_for_signing/types.rs | 10 ++ .../src/storage/secrets_repository_sql.rs | 121 +++++++++---- 52 files changed, 645 insertions(+), 505 deletions(-) delete mode 100644 implementations/rust/ockam/ockam_node/src/storage/database/sqlx_types.rs diff --git a/Cargo.lock b/Cargo.lock index 767ecaf36e9..5efd6a0cc39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6849,7 +6849,7 @@ dependencies = [ [[package]] name = "sqlx" version = "0.7.4" -source = "git+https://github.com/etorreborre/sqlx?rev=1b24b40de97db1cefe387bd590089bf915598547#1b24b40de97db1cefe387bd590089bf915598547" +source = "git+https://github.com/etorreborre/sqlx?rev=5fec648d2de0cbeed738dcf1c6f5bc9194fc439b#5fec648d2de0cbeed738dcf1c6f5bc9194fc439b" dependencies = [ "sqlx-core", "sqlx-macros", @@ -6861,7 +6861,7 @@ dependencies = [ [[package]] name = "sqlx-core" version = "0.7.4" -source = "git+https://github.com/etorreborre/sqlx?rev=1b24b40de97db1cefe387bd590089bf915598547#1b24b40de97db1cefe387bd590089bf915598547" +source = "git+https://github.com/etorreborre/sqlx?rev=5fec648d2de0cbeed738dcf1c6f5bc9194fc439b#5fec648d2de0cbeed738dcf1c6f5bc9194fc439b" dependencies = [ "ahash", "atoi", @@ -6899,7 +6899,7 @@ dependencies = [ [[package]] name = "sqlx-macros" version = "0.7.4" -source = "git+https://github.com/etorreborre/sqlx?rev=1b24b40de97db1cefe387bd590089bf915598547#1b24b40de97db1cefe387bd590089bf915598547" +source = "git+https://github.com/etorreborre/sqlx?rev=5fec648d2de0cbeed738dcf1c6f5bc9194fc439b#5fec648d2de0cbeed738dcf1c6f5bc9194fc439b" dependencies = [ "proc-macro2", "quote", @@ -6911,7 +6911,7 @@ dependencies = [ [[package]] name = "sqlx-macros-core" version = "0.7.4" -source = "git+https://github.com/etorreborre/sqlx?rev=1b24b40de97db1cefe387bd590089bf915598547#1b24b40de97db1cefe387bd590089bf915598547" +source = "git+https://github.com/etorreborre/sqlx?rev=5fec648d2de0cbeed738dcf1c6f5bc9194fc439b#5fec648d2de0cbeed738dcf1c6f5bc9194fc439b" dependencies = [ "dotenvy", "either", @@ -6936,7 +6936,7 @@ dependencies = [ [[package]] name = "sqlx-mysql" version = "0.7.4" -source = "git+https://github.com/etorreborre/sqlx?rev=1b24b40de97db1cefe387bd590089bf915598547#1b24b40de97db1cefe387bd590089bf915598547" +source = "git+https://github.com/etorreborre/sqlx?rev=5fec648d2de0cbeed738dcf1c6f5bc9194fc439b#5fec648d2de0cbeed738dcf1c6f5bc9194fc439b" dependencies = [ "atoi", "base64 0.21.7", @@ -6977,7 +6977,7 @@ dependencies = [ [[package]] name = "sqlx-postgres" version = "0.7.4" -source = "git+https://github.com/etorreborre/sqlx?rev=1b24b40de97db1cefe387bd590089bf915598547#1b24b40de97db1cefe387bd590089bf915598547" +source = "git+https://github.com/etorreborre/sqlx?rev=5fec648d2de0cbeed738dcf1c6f5bc9194fc439b#5fec648d2de0cbeed738dcf1c6f5bc9194fc439b" dependencies = [ "atoi", "base64 0.21.7", @@ -7014,7 +7014,7 @@ dependencies = [ [[package]] name = "sqlx-sqlite" version = "0.7.4" -source = "git+https://github.com/etorreborre/sqlx?rev=1b24b40de97db1cefe387bd590089bf915598547#1b24b40de97db1cefe387bd590089bf915598547" +source = "git+https://github.com/etorreborre/sqlx?rev=5fec648d2de0cbeed738dcf1c6f5bc9194fc439b#5fec648d2de0cbeed738dcf1c6f5bc9194fc439b" dependencies = [ "atoi", "flume", diff --git a/implementations/rust/ockam/ockam_abac/Cargo.toml b/implementations/rust/ockam/ockam_abac/Cargo.toml index ac33feaea00..5cfe41103d9 100644 --- a/implementations/rust/ockam/ockam_abac/Cargo.toml +++ b/implementations/rust/ockam/ockam_abac/Cargo.toml @@ -50,7 +50,7 @@ ockam_executor = { version = "0.80.0", path = "../ockam_executor", default-featu regex = { version = "1.10.5", default-features = false, optional = true } rustyline = { version = "14.0.0", optional = true } rustyline-derive = { version = "0.10.0", optional = true } -sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "1b24b40de97db1cefe387bd590089bf915598547", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } +sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "5fec648d2de0cbeed738dcf1c6f5bc9194fc439b", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } str-buf = "3.0.3" tokio = { version = "1.38", default-features = false, optional = true, features = ["sync", "time", "rt", "rt-multi-thread", "macros"] } tracing = { version = "0.1", default-features = false, features = ["attributes"] } diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs index 87d58cca97e..7d7c7ed62a5 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_policy_repository_sql.rs @@ -5,7 +5,7 @@ use tracing::debug; use ockam_core::async_trait; use ockam_core::compat::vec::Vec; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, SqlxType, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::{Action, Expr, ResourceName, ResourcePoliciesRepository, ResourcePolicy}; @@ -48,9 +48,9 @@ impl ResourcePoliciesRepository for ResourcePolicySqlxDatabase { ON CONFLICT (resource_name, action, node_name) DO UPDATE SET expression = $3"#, ) - .bind(resource_name.to_sql()) - .bind(action.to_sql()) - .bind(expression.to_string()) + .bind(resource_name) + .bind(action) + .bind(expression) .bind(&self.node_name); query.execute(&*self.database.pool).await.void() } @@ -66,8 +66,8 @@ impl ResourcePoliciesRepository for ResourcePolicySqlxDatabase { WHERE node_name = $1 and resource_name = $2 and action = $3"#, ) .bind(&self.node_name) - .bind(resource_name.to_sql()) - .bind(action.to_sql()); + .bind(resource_name) + .bind(action); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -98,7 +98,7 @@ impl ResourcePoliciesRepository for ResourcePolicySqlxDatabase { WHERE node_name = $1 and resource_name = $2"#, ) .bind(&self.node_name) - .bind(resource_name.to_sql()); + .bind(resource_name); let row: Vec = query.fetch_all(&*self.database.pool).await.into_core()?; row.into_iter() .map(|r| r.try_into()) @@ -111,26 +111,12 @@ impl ResourcePoliciesRepository for ResourcePolicySqlxDatabase { WHERE node_name = $1 and resource_name = $2 and action = $3"#, ) .bind(&self.node_name) - .bind(resource_name.to_sql()) - .bind(action.to_sql()); + .bind(resource_name) + .bind(action); query.execute(&*self.database.pool).await.void() } } -// Database serialization / deserialization - -impl ToSqlxType for ResourceName { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.as_str().to_string()) - } -} - -impl ToSqlxType for Action { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_string()) - } -} - /// Low-level representation of a row in the resource_policy table #[derive(FromRow)] struct PolicyRow { diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs index 611174527a4..6f9a5cfcf50 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_repository_sql.rs @@ -1,10 +1,12 @@ use core::str::FromStr; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use tracing::debug; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::{Resource, ResourceName, ResourceType, ResourcesRepository}; @@ -42,8 +44,8 @@ impl ResourcesRepository for ResourcesSqlxDatabase { VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"#, ) - .bind(resource.resource_name.to_sql()) - .bind(resource.resource_type.to_sql()) + .bind(&resource.resource_name) + .bind(&resource.resource_type) .bind(&self.node_name); query.execute(&*self.database.pool).await.void() } @@ -55,7 +57,7 @@ impl ResourcesRepository for ResourcesSqlxDatabase { WHERE node_name = $1 and resource_name = $2"#, ) .bind(&self.node_name) - .bind(resource_name.to_sql()); + .bind(resource_name); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -71,7 +73,7 @@ impl ResourcesRepository for ResourcesSqlxDatabase { WHERE node_name = $1 and resource_name = $2"#, ) .bind(&self.node_name) - .bind(resource_name.to_sql()); + .bind(resource_name); query.execute(&mut *transaction).await.void()?; let query = sqlx::query( @@ -79,13 +81,27 @@ impl ResourcesRepository for ResourcesSqlxDatabase { WHERE node_name = $1 and resource_name = $2"#, ) .bind(&self.node_name) - .bind(resource_name.to_sql()); + .bind(resource_name); query.execute(&mut *transaction).await.void()?; transaction.commit().await.void() } } +// Database serialization / deserialization + +impl Type for ResourceName { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl sqlx::Encode<'_, Any> for ResourceName { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + /// Low-level representation of a row in the resource_type_policy table #[derive(FromRow)] #[allow(dead_code)] diff --git a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs index 419529b9989..4a87d0f5d78 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy/storage/resource_type_policy_repository_sql.rs @@ -1,11 +1,13 @@ use core::str::FromStr; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use tracing::debug; use ockam_core::async_trait; use ockam_core::compat::vec::Vec; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, SqlxType, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::policy::ResourceTypePolicy; use crate::{Action, Expr, ResourceType, ResourceTypePoliciesRepository}; @@ -50,9 +52,9 @@ impl ResourceTypePoliciesRepository for ResourceTypePolicySqlxDatabase { ON CONFLICT (node_name, resource_type, action) DO UPDATE SET expression = $3"#, ) - .bind(resource_type.to_sql()) - .bind(action.to_sql()) - .bind(expression.to_string()) + .bind(resource_type) + .bind(action) + .bind(expression) .bind(&self.node_name); query.execute(&*self.database.pool).await.void() } @@ -68,8 +70,8 @@ impl ResourceTypePoliciesRepository for ResourceTypePolicySqlxDatabase { WHERE node_name = $1 and resource_type = $2 and action = $3"#, ) .bind(&self.node_name) - .bind(resource_type.to_sql()) - .bind(action.to_sql()); + .bind(resource_type) + .bind(action); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -98,7 +100,7 @@ impl ResourceTypePoliciesRepository for ResourceTypePolicySqlxDatabase { FROM resource_type_policy where node_name = $1 and resource_type = $2"#, ) .bind(&self.node_name) - .bind(resource_type.to_sql()); + .bind(resource_type); let row: Vec = query.fetch_all(&*self.database.pool).await.into_core()?; row.into_iter() .map(|r| r.try_into()) @@ -111,12 +113,50 @@ impl ResourceTypePoliciesRepository for ResourceTypePolicySqlxDatabase { WHERE node_name = $1 and resource_type = $2 and action = $3"#, ) .bind(&self.node_name) - .bind(resource_type.to_sql()) - .bind(action.to_sql()); + .bind(resource_type) + .bind(action); query.execute(&*self.database.pool).await.void() } } +// Database serialization / deserialization + +impl Type for ResourceType { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for ResourceType { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + +impl Type for Action { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl sqlx::Encode<'_, Any> for Action { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + +impl Type for Expr { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for Expr { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + /// Low-level representation of a row in the resource_type_policy table #[derive(FromRow)] struct PolicyRow { @@ -151,14 +191,6 @@ impl TryFrom for ResourceTypePolicy { } } -// Database serialization / deserialization - -impl ToSqlxType for ResourceType { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_string()) - } -} - #[cfg(test)] mod test { use super::*; diff --git a/implementations/rust/ockam/ockam_abac/src/types.rs b/implementations/rust/ockam/ockam_abac/src/types.rs index 61795beae8e..d7b8ab621e2 100644 --- a/implementations/rust/ockam/ockam_abac/src/types.rs +++ b/implementations/rust/ockam/ockam_abac/src/types.rs @@ -37,6 +37,10 @@ macro_rules! define { pub fn as_str(&self) -> &str { &self.0 } + + pub fn to_string(&self) -> String { + self.as_str().to_string() + } } impl From<&str> for $t { diff --git a/implementations/rust/ockam/ockam_api/Cargo.toml b/implementations/rust/ockam/ockam_api/Cargo.toml index f4c44279326..73dac6a9b63 100644 --- a/implementations/rust/ockam/ockam_api/Cargo.toml +++ b/implementations/rust/ockam/ockam_api/Cargo.toml @@ -76,7 +76,7 @@ serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.118" sha2 = "0.10.8" #sqlx = { version = "0.7.4", features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } -sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "1b24b40de97db1cefe387bd590089bf915598547", features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } +sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "5fec648d2de0cbeed738dcf1c6f5bc9194fc439b", features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } strip-ansi-escapes = "0.2" sysinfo = "0.30" thiserror = "1.0" diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/one_time_code.rs b/implementations/rust/ockam/ockam_api/src/authenticator/one_time_code.rs index 4187afa68d1..b6f5060a66d 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/one_time_code.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/one_time_code.rs @@ -8,7 +8,6 @@ use ockam_core::compat::string::{String, ToString}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Error; use ockam_core::Result; -use ockam_node::database::{SqlxType, ToSqlxType}; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -91,12 +90,6 @@ impl<'de> Deserialize<'de> for OneTimeCode { } } -impl ToSqlxType for OneTimeCode { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_string()) - } -} - /// Create an Identity Error fn error(message: String) -> Error { Error::new(Origin::Identity, Kind::Invalid, message.as_str()) diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs index 7fb0b424ca0..8e57534e120 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_enrollment_token_repository_sql.rs @@ -1,10 +1,12 @@ use ockam::identity::TimestampInSeconds; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use tracing::debug; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::authenticator::one_time_code::OneTimeCode; use crate::authenticator::{ @@ -43,7 +45,7 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase // We need to delete expired tokens regularly // Also makes sure we don't get expired tokens later inside this function let query1 = query("DELETE FROM authority_enrollment_token WHERE expires_at <= $1") - .bind(now.to_sql()); + .bind(now.0 as i64); let res = query1.execute(&*self.database.pool).await.into_core()?; debug!("Deleted {} expired enrollment tokens", res.rows_affected()); @@ -51,7 +53,7 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase let mut transaction = self.database.pool.begin().await.into_core()?; let query2 = query_as("SELECT one_time_code, reference, issued_by, created_at, expires_at, ttl_count, attributes FROM authority_enrollment_token WHERE one_time_code = $1") - .bind(one_time_code.to_sql()); + .bind(&one_time_code); let row: Option = query2.fetch_optional(&mut *transaction).await.into_core()?; let token: Option = row.map(|r| r.try_into()).transpose()?; @@ -60,7 +62,7 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase if token.ttl_count <= 1 { let query3 = query("DElETE FROM authority_enrollment_token WHERE one_time_code = $1") - .bind(one_time_code.to_sql()); + .bind(&one_time_code); query3.execute(&mut *transaction).await.void()?; debug!( "Deleted enrollment token because it has been used. Reference: {}", @@ -72,7 +74,7 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase "UPDATE authority_enrollment_token SET ttl_count = $1 WHERE one_time_code = $2", ) .bind(new_ttl_count as i64) - .bind(one_time_code.to_sql()); + .bind(&one_time_code); query3.execute(&mut *transaction).await.void()?; debug!( "Decreasing enrollment token usage count to {}. Reference: {}", @@ -95,18 +97,32 @@ impl AuthorityEnrollmentTokenRepository for AuthorityEnrollmentTokenSqlxDatabase ON CONFLICT (one_time_code) DO UPDATE SET reference = $2, issued_by = $3, created_at = $4, expires_at = $5, ttl_count = $6, attributes = $7"#, ) - .bind(token.one_time_code.to_sql()) + .bind(token.one_time_code) .bind(token.reference) - .bind(token.issued_by.to_sql()) - .bind(token.created_at.to_sql()) - .bind(token.expires_at.to_sql()) - .bind(token.ttl_count.to_sql()) + .bind(token.issued_by) + .bind(token.created_at) + .bind(token.expires_at) + .bind(token.ttl_count as i64) .bind(minicbor::to_vec(token.attrs)?); query.execute(&*self.database.pool).await.void() } } +// Database serialization / deserialization + +impl Type for OneTimeCode { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl sqlx::Encode<'_, Any> for OneTimeCode { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs index 2c346342527..6eeda59b53b 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/storage/authority_members_repository_sql.rs @@ -5,7 +5,7 @@ use tracing::debug; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::authenticator::{ AuthorityMember, AuthorityMemberRow, AuthorityMembersRepository, PreTrustedIdentities, @@ -35,7 +35,7 @@ impl AuthorityMembersSqlxDatabase { impl AuthorityMembersRepository for AuthorityMembersSqlxDatabase { async fn get_member(&self, identifier: &Identifier) -> Result> { let query = query_as("SELECT identifier, added_by, added_at, is_pre_trusted, attributes FROM authority_member WHERE identifier = $1") - .bind(identifier.to_sql()); + .bind(identifier); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -53,7 +53,7 @@ impl AuthorityMembersRepository for AuthorityMembersSqlxDatabase { async fn delete_member(&self, identifier: &Identifier) -> Result<()> { let query = query("DELETE FROM authority_member WHERE identifier = $1 AND is_pre_trusted = $2") - .bind(identifier.to_sql()) + .bind(identifier) .bind(false); query.execute(&*self.database.pool).await.void() } @@ -64,9 +64,9 @@ impl AuthorityMembersRepository for AuthorityMembersSqlxDatabase { VALUES ($1, $2, $3, $4, $5) ON CONFLICT (identifier) DO UPDATE SET added_by = $2, added_at = $3, is_pre_trusted = $4, attributes = $5"#) - .bind(member.identifier().to_sql()) - .bind(member.added_by().to_sql()) - .bind(member.added_at().to_sql()) + .bind(member.identifier()) + .bind(member.added_by()) + .bind(member.added_at()) .bind(member.is_pre_trusted()) .bind(minicbor::to_vec(member.attributes())?); @@ -88,9 +88,9 @@ impl AuthorityMembersRepository for AuthorityMembersSqlxDatabase { VALUES ($1, $2, $3, $4, $5) ON CONFLICT (identifier) DO UPDATE SET added_by = $2, added_at = $3, is_pre_trusted = $4, attributes = $5"#) - .bind(identifier.to_sql()) - .bind(pre_trusted_identity.attested_by().to_sql()) - .bind(pre_trusted_identity.added_at().to_sql()) + .bind(identifier) + .bind(pre_trusted_identity.attested_by()) + .bind(pre_trusted_identity.added_at()) .bind(true) .bind(minicbor::to_vec(pre_trusted_identity.attrs())?); diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs index b13295c60a4..81b6aa27389 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs @@ -137,7 +137,7 @@ impl CliState { /// Delete the local database and log files pub async fn delete(&self) -> Result<()> { - self.database.drop_tables().await?; + self.database.drop_postgres_node_tables().await?; self.delete_local_data() } @@ -308,6 +308,9 @@ pub fn random_name() -> String { mod tests { use super::*; use itertools::Itertools; + use ockam_node::database::DatabaseType; + use sqlx::any::AnyRow; + use sqlx::Row; use std::fs; use tempfile::NamedTempFile; @@ -317,7 +320,7 @@ mod tests { let cli_state_directory = db_file.path().parent().unwrap().join(random_name()); let db = SqlxDatabase::create(CliState::make_database_configuration(&cli_state_directory)?) .await?; - db.drop_tables().await?; + db.drop_all_postgres_tables().await?; let cli = CliState::create(cli_state_directory.clone()).await?; // create 2 vaults @@ -342,16 +345,18 @@ mod tests { .await?; let file_names = list_file_names(&cli_state_directory); - assert_eq!( - file_names.iter().sorted().as_slice(), - [ + let expected = match cli.database_configuration()?.database_type() { + DatabaseType::Sqlite => vec![ "vault-vault2".to_string(), "application_database.sqlite3".to_string(), - "database.sqlite3".to_string() - ] - .iter() - .sorted() - .as_slice() + "database.sqlite3".to_string(), + ], + DatabaseType::Postgres => vec!["vault-vault2".to_string()], + }; + + assert_eq!( + file_names.iter().sorted().as_slice(), + expected.iter().sorted().as_slice() ); // reset the local state @@ -359,10 +364,25 @@ mod tests { let result = fs::read_dir(&cli_state_directory); assert!(result.is_ok(), "the cli state directory is not deleted"); - // only the application database must remain - let file_names = list_file_names(&cli_state_directory); - assert_eq!(file_names, vec!["application_database.sqlite3".to_string()]); - + match cli.database_configuration()?.database_type() { + DatabaseType::Sqlite => { + // When the database is SQLite, only the application database must remain + let file_names = list_file_names(&cli_state_directory); + let expected = vec!["application_database.sqlite3".to_string()]; + assert_eq!(file_names, expected); + } + DatabaseType::Postgres => { + // When the database is Postgres, only the journey tables must remain + let tables: Vec = sqlx::query( + "SELECT tablename::text FROM pg_tables WHERE schemaname = 'public'", + ) + .fetch_all(&*db.pool) + .await + .unwrap(); + let actual: Vec = tables.iter().map(|r| r.get(0)).sorted().collect(); + assert_eq!(actual, vec!["host_journey", "project_journey"]); + } + }; Ok(()) } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs b/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs index 82154e80a49..316884bb056 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs @@ -155,18 +155,13 @@ impl CliState { /// - remove the node log files #[instrument(skip_all, fields(node_name = node_name))] pub async fn remove_node(&self, node_name: &str) -> Result<()> { - // don't try to remove a node on a non-existent database - if !self.database_configuration()?.exists() { - return Ok(()); - }; - // remove the node from the database let repository = self.nodes_repository(); let node_exists = repository.get_node(node_name).await.is_ok(); - repository.delete_node(node_name).await?; // set another node as the default node if node_exists { + repository.delete_node(node_name).await?; let other_nodes = repository.get_nodes().await?; if let Some(other_node) = other_nodes.first() { repository.set_default_node(&other_node.name()).await?; diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs index ca4726df543..c5042e68644 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/enrollments_repository_sql.rs @@ -6,7 +6,7 @@ use sqlx::*; use time::OffsetDateTime; use ockam::identity::Identifier; -use ockam::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_core::async_trait; use ockam_core::Result; use ockam_node::database::{Boolean, Nullable}; @@ -43,9 +43,9 @@ impl EnrollmentsRepository for EnrollmentsSqlxDatabase { ON CONFLICT (identifier) DO UPDATE SET enrolled_at = $2, email = $3"#, ) - .bind(identifier.to_sql()) - .bind(OffsetDateTime::now_utc().to_sql()) - .bind(email.to_sql()); + .bind(identifier) + .bind(OffsetDateTime::now_utc().unix_timestamp()) + .bind(email); Ok(query.execute(&*self.database.pool).await.void()?) } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs index e045128cf6e..d35ac1c2ac6 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/identities_repository_sql.rs @@ -5,7 +5,7 @@ use sqlx::*; use ockam::identity::Identifier; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{Boolean, FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{Boolean, FromSqlxError, SqlxDatabase, ToVoid}; use crate::cli_state::{IdentitiesRepository, NamedIdentity}; @@ -54,7 +54,7 @@ impl IdentitiesRepository for IdentitiesSqlxDatabase { ON CONFLICT (identifier) DO UPDATE SET name = $2, vault_name = $3, is_default = $4"#, ) - .bind(identifier.to_sql()) + .bind(identifier) .bind(name) .bind(vault_name) .bind(is_already_default); @@ -142,7 +142,7 @@ impl IdentitiesRepository for IdentitiesSqlxDatabase { identifier: &Identifier, ) -> Result> { let query = - query_as("SELECT identifier, name, vault_name, is_default FROM named_identity WHERE identifier = $1").bind(identifier.to_sql()); + query_as("SELECT identifier, name, vault_name, is_default FROM named_identity WHERE identifier = $1").bind(identifier); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -167,7 +167,7 @@ impl IdentitiesRepository for IdentitiesSqlxDatabase { identifier: &Identifier, ) -> Result> { let query = - query_as("SELECT identifier, name, vault_name, is_default FROM named_identity WHERE identifier = $1").bind(identifier.to_sql()); + query_as("SELECT identifier, name, vault_name, is_default FROM named_identity WHERE identifier = $1").bind(identifier); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -211,13 +211,13 @@ impl IdentitiesRepository for IdentitiesSqlxDatabase { // set the identifier as the default one let query1 = query("UPDATE named_identity SET is_default = $1 WHERE identifier = $2") .bind(true) - .bind(identifier.to_sql()); + .bind(identifier); query1.execute(&mut *transaction).await.void()?; // set all the others as non-default let query2 = query("UPDATE named_identity SET is_default = $1 WHERE identifier <> $2") .bind(false) - .bind(identifier.to_sql()); + .bind(identifier); query2.execute(&mut *transaction).await.void()?; transaction.commit().await.void() } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs index 6038324338a..3536690752a 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/journeys_repository_sql.rs @@ -6,7 +6,7 @@ use crate::cli_state::JourneysRepository; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Result; use ockam_core::{async_trait, OpenTelemetryContext}; -use ockam_node::database::{FromSqlxError, Nullable, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, Nullable, SqlxDatabase, ToVoid}; #[derive(Clone)] pub struct JourneysSqlxDatabase { @@ -43,7 +43,7 @@ impl JourneysRepository for JourneysSqlxDatabase { ) .bind(project_journey.project_id()) .bind(project_journey.opentelemetry_context().to_string()) - .bind(project_journey.start().to_sql()) + .bind(project_journey.start().to_rfc3339()) .bind(previous); query.execute(&*self.database.pool).await.void() } @@ -62,7 +62,7 @@ impl JourneysRepository for JourneysSqlxDatabase { LIMIT 1 OFFSET 0", ) .bind(project_id) - .bind(now.to_sql()); + .bind(now.to_rfc3339()); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -84,7 +84,7 @@ impl JourneysRepository for JourneysSqlxDatabase { DO UPDATE SET start_datetime = $2, previous_opentelemetry_context = $3"#, ) .bind(host_journey.opentelemetry_context().to_string()) - .bind(host_journey.start().to_sql()) + .bind(host_journey.start().to_rfc3339()) .bind( host_journey .previous_opentelemetry_context() @@ -103,7 +103,7 @@ impl JourneysRepository for JourneysSqlxDatabase { LIMIT 1 OFFSET 0 "#, ) - .bind(now.to_sql()); + .bind(now.to_rfc3339()); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -112,8 +112,6 @@ impl JourneysRepository for JourneysSqlxDatabase { } } -// Database serialization / deserialization - /// Low-level representation of a row in the project journey table #[derive(sqlx::FromRow)] struct ProjectJourneyRow { diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs index 668fe814332..0e813487f54 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/nodes_repository_sql.rs @@ -1,10 +1,12 @@ use std::str::FromStr; use sqlx::any::AnyRow; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use ockam::identity::Identifier; -use ockam::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_core::async_trait; use ockam_core::Result; @@ -39,8 +41,8 @@ impl NodesRepository for NodesSqlxDatabase { ON CONFLICT (name) DO UPDATE SET identifier = $2, verbosity = $3, is_default = $4, is_authority = $5, tcp_listener_address = $6, pid = $7, http_server_address = $8"#) .bind(node_info.name()) - .bind(node_info.identifier().to_sql()) - .bind(node_info.verbosity().to_sql()) + .bind(node_info.identifier()) + .bind(node_info.verbosity() as i16) .bind(node_info.is_default()) .bind(node_info.is_authority_node()) .bind( @@ -49,7 +51,7 @@ impl NodesRepository for NodesSqlxDatabase { .as_ref() .map(|a| a.to_string()), ) - .bind(node_info.pid().map(|p| p.to_sql())) + .bind(node_info.pid().map(|p| p as i32)) .bind( node_info .http_server_address() @@ -75,7 +77,7 @@ impl NodesRepository for NodesSqlxDatabase { } async fn get_nodes_by_identifier(&self, identifier: &Identifier) -> Result> { - let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address FROM node WHERE identifier = $1").bind(identifier.to_sql()); + let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address FROM node WHERE identifier = $1").bind(identifier.to_string()); let rows: Vec = query.fetch_all(&*self.database.pool).await.into_core()?; rows.iter().map(|r| r.node_info()).collect() } @@ -158,7 +160,7 @@ impl NodesRepository for NodesSqlxDatabase { address: &InternetAddress, ) -> Result<()> { let query = query("UPDATE node SET tcp_listener_address = $1 WHERE name = $2") - .bind(address.to_string()) + .bind(address) .bind(node_name); query.execute(&*self.database.pool).await.void() } @@ -169,7 +171,7 @@ impl NodesRepository for NodesSqlxDatabase { address: &InternetAddress, ) -> Result<()> { let query = query("UPDATE node SET http_server_address = $1 WHERE name = $2") - .bind(address.to_string()) + .bind(address) .bind(node_name); query.execute(&*self.database.pool).await.void() } @@ -197,7 +199,7 @@ impl NodesRepository for NodesSqlxDatabase { async fn set_node_pid(&self, node_name: &str, pid: u32) -> Result<()> { let query = query("UPDATE node SET pid = $1 WHERE name = $2") - .bind(pid.to_sql()) + .bind(pid as i32) .bind(node_name); query.execute(&*self.database.pool).await.void() } @@ -234,6 +236,18 @@ impl NodesRepository for NodesSqlxDatabase { // Database serialization / deserialization +impl Type for InternetAddress { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl sqlx::Encode<'_, Any> for InternetAddress { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + #[derive(FromRow)] pub(crate) struct NodeRow { name: String, diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs index ef8cb2cd7fc..b6b9853bda3 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/projects_repository_sql.rs @@ -1,6 +1,8 @@ use std::str::FromStr; use sqlx::any::AnyRow; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use ockam::identity::Identifier; @@ -8,9 +10,7 @@ use ockam_core::async_trait; use ockam_core::env::FromString; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Error, Result}; -use ockam_node::database::{ - Boolean, FromSqlxError, Nullable, SqlxDatabase, SqlxType, ToSqlxType, ToVoid, -}; +use ockam_node::database::{Boolean, FromSqlxError, Nullable, SqlxDatabase, ToVoid}; use crate::cloud::addon::KafkaConfig; use crate::cloud::email_address::EmailAddress; @@ -71,7 +71,7 @@ impl ProjectsRepository for ProjectsSqlxDatabase { .bind(is_already_default) .bind(&project.space_id) .bind(&project.space_name) - .bind(project.identity.as_ref().map(|i| i.to_sql())) + .bind(&project.identity) .bind(project.project_change_history.as_ref()) .bind(&project.access_route) .bind(project.authority_identity.as_ref()) @@ -93,7 +93,7 @@ impl ProjectsRepository for ProjectsSqlxDatabase { VALUES ($1, $2) ON CONFLICT DO NOTHING"#, ) - .bind(user_email.to_sql()) + .bind(user_email) .bind(&project.id); query.execute(&mut *transaction).await.void()?; } @@ -110,11 +110,11 @@ impl ProjectsRepository for ProjectsSqlxDatabase { VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING"#, ) - .bind(user_role.id.to_sql()) + .bind(user_role.id as i64) .bind(&project.id) - .bind(user_role.email.to_sql()) - .bind(user_role.role.to_string()) - .bind(user_role.scope.to_string()); + .bind(&user_role.email) + .bind(&user_role.role) + .bind(&user_role.scope); query.execute(&mut *transaction).await.void()?; } @@ -138,7 +138,7 @@ impl ProjectsRepository for ProjectsSqlxDatabase { VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING"#) .bind(&project.id) - .bind(okta_config.tenant_base_url.to_string()) + .bind(&okta_config.tenant_base_url) .bind(&okta_config.client_id) .bind(&okta_config.certificate) .bind(okta_config.attributes.join(",").to_string()); @@ -383,9 +383,51 @@ struct UserRoleRow { scope: String, } -impl ToSqlxType for EmailAddress { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_string()) +impl Type for EmailAddress { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for EmailAddress { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + +impl Type for RoleInShare { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for RoleInShare { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + +impl Type for ShareScope { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for ShareScope { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + +impl Type for Url { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for Url { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) } } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs index 5d729636e84..bf8df959335 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs @@ -6,7 +6,7 @@ use sqlx::*; use tracing::debug; use crate::nodes::models::portal::OutletStatus; -use ockam::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Error; use ockam_core::Result; @@ -95,8 +95,8 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase { ON CONFLICT DO NOTHING"#, ) .bind(node_name) - .bind(tcp_outlet_status.socket_addr.to_sql()) - .bind(tcp_outlet_status.worker_addr.to_sql()) + .bind(tcp_outlet_status.socket_addr.to_string()) + .bind(tcp_outlet_status.worker_addr.to_string()) .bind(tcp_outlet_status.payload.as_ref()); query.execute(&*self.database.pool).await.void()?; Ok(()) @@ -109,7 +109,7 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase { ) -> ockam_core::Result> { let query = query_as("SELECT socket_addr, worker_addr, payload FROM tcp_outlet_status WHERE node_name = $1 AND worker_addr = $2") .bind(node_name) - .bind(worker_addr.to_sql()); + .bind(worker_addr.to_string()); let result: Option = query .fetch_optional(&*self.database.pool) .await @@ -125,7 +125,7 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase { let query = query("DELETE FROM tcp_outlet_status WHERE node_name = $1 AND worker_addr = $2") .bind(node_name) - .bind(worker_addr.to_sql()); + .bind(worker_addr.to_string()); query.execute(&*self.database.pool).await.into_core()?; Ok(()) } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs index 610c0f63790..c45b0f44653 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/users_repository_sql.rs @@ -3,7 +3,7 @@ use sqlx::*; use crate::cloud::email_address::EmailAddress; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{Boolean, FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{Boolean, FromSqlxError, SqlxDatabase, ToVoid}; use crate::cloud::enroll::auth0::UserInfo; @@ -36,7 +36,7 @@ impl UsersRepository for UsersSqlxDatabase { r#"SELECT EXISTS(SELECT email FROM "user" WHERE is_default = $1 AND email = $2)"#, ) .bind(true) - .bind(user.email.to_sql()); + .bind(&user.email); let is_already_default: Boolean = query1.fetch_one(&mut *transaction).await.into_core()?; let is_already_default = is_already_default.to_bool(); @@ -45,7 +45,7 @@ impl UsersRepository for UsersSqlxDatabase { VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (email) DO UPDATE SET sub = $2, nickname = $3, name = $4, picture = $5, updated_at = $6, email_verified = $7, is_default = $8"#) - .bind(user.email.to_sql()) + .bind(&user.email) .bind(&user.sub) .bind(&user.nickname) .bind(&user.name) @@ -70,12 +70,12 @@ impl UsersRepository for UsersSqlxDatabase { async fn set_default_user(&self, email: &EmailAddress) -> Result<()> { let query = query(r#"UPDATE "user" SET is_default = $1 WHERE email = $2"#) .bind(true) - .bind(email.to_sql()); + .bind(email); query.execute(&*self.database.pool).await.void() } async fn get_user(&self, email: &EmailAddress) -> Result> { - let query = query_as(r#"SELECT email, sub, nickname, name, picture, updated_at, email_verified, is_default FROM "user" WHERE email = $1"#).bind(email.to_sql()); + let query = query_as(r#"SELECT email, sub, nickname, name, picture, updated_at, email_verified, is_default FROM "user" WHERE email = $1"#).bind(email); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -92,7 +92,7 @@ impl UsersRepository for UsersSqlxDatabase { } async fn delete_user(&self, email: &EmailAddress) -> Result<()> { - let query1 = query(r#"DELETE FROM "user" WHERE email = $1"#).bind(email.to_sql()); + let query1 = query(r#"DELETE FROM "user" WHERE email = $1"#).bind(email); query1.execute(&*self.database.pool).await.void() } } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs b/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs index dc787813f12..8b085c70f5e 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/storage/vaults_repository_sql.rs @@ -5,7 +5,7 @@ use sqlx::*; use ockam::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{Boolean, Nullable, ToSqlxType}; +use ockam_node::database::{Boolean, Nullable}; use crate::cli_state::{NamedVault, UseAwsKms, VaultType, VaultsRepository}; @@ -45,7 +45,7 @@ impl VaultsRepository for VaultsSqlxDatabase { DO UPDATE SET path = $2, is_default = $3, is_kms = $4"#, ) .bind(name) - .bind(vault_type.path().map(|p| p.to_sql())) + .bind(vault_type.path().map(|p| p.to_string_lossy().to_string())) .bind(!default_exists) .bind(vault_type.use_aws_kms()); query.execute(&mut *transaction).await.void()?; @@ -56,7 +56,7 @@ impl VaultsRepository for VaultsSqlxDatabase { async fn update_vault(&self, name: &str, vault_type: VaultType) -> Result<()> { let query = query("UPDATE vault SET path = $1, is_kms = $2 WHERE name = $3") - .bind(vault_type.path().map(|p| p.to_sql())) + .bind(vault_type.path().map(|p| p.to_string_lossy().to_string())) .bind(vault_type.use_aws_kms()) .bind(name); query.execute(&*self.database.pool).await.void() diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs b/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs index a12176e58d7..56c1edadbfe 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs @@ -1,12 +1,30 @@ use crate::cli_state::Result; use crate::cli_state::{random_name, CliState, CliStateError}; +use ockam_node::database::SqlxDatabase; use std::path::PathBuf; /// Test support impl CliState { /// Return a test CliState with a random root directory + /// Use this CliState for a simple integration test since every call to that function deletes + /// all previous state if the database being used is Postgres. pub async fn test() -> Result { - Self::create(Self::test_dir()?).await + let test_dir = Self::test_dir()?; + + // clean the existing state if any + let db = SqlxDatabase::create(CliState::make_database_configuration(&test_dir)?).await?; + db.drop_all_postgres_tables().await?; + + Self::create(test_dir).await + } + + /// Return a test CliState with a random root directory + /// Use this CliState for system tests involving several nodes + /// since calls to that function do not delete + /// any previous state if the database being used is Postgres. + pub async fn system() -> Result { + let test_dir = Self::test_dir()?; + Self::create(test_dir).await } /// Return a random root directory diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs b/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs index 9bb81dfa083..af66a49d322 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs @@ -551,7 +551,6 @@ mod tests { ECDSASHA256CurveP256SecretKey, ECDSASHA256CurveP256Signature, HandleToSecret, SigningSecret, SigningSecretKeyHandle, X25519SecretKey, X25519SecretKeyHandle, }; - use tempfile::NamedTempFile; #[tokio::test] async fn test_create_named_vault() -> Result<()> { @@ -667,16 +666,14 @@ mod tests { #[tokio::test] async fn test_move_vault() -> Result<()> { - let db_file = NamedTempFile::new().unwrap(); - let cli_state_directory = db_file.path().parent().unwrap().join(random_name()); - let cli = CliState::create(cli_state_directory.clone()).await?; + let cli = CliState::test().await?; // create a vault let _ = cli.get_or_create_named_vault("vault1").await?; // try to move it. That should fail because the first vault is // stored in the main database - let new_vault_path = cli_state_directory.join("new-vault-name"); + let new_vault_path = cli.dir().join("new-vault-name"); let result = cli.move_vault("vault1", &new_vault_path).await; assert!(result.is_err()); @@ -685,7 +682,7 @@ mod tests { // try to move it. This should succeed let result = cli - .move_vault("vault2", &cli_state_directory.join("new-vault-name")) + .move_vault("vault2", &cli.dir().join("new-vault-name")) .await; if let Err(e) = result { panic!("{}", e.to_string()) @@ -740,13 +737,7 @@ mod tests { #[tokio::test] async fn test_create_vault_with_a_user_path() -> Result<()> { let cli = CliState::test().await?; - let vault_path = cli - .database_configuration()? - .path() - .unwrap() - .parent() - .unwrap() - .join(random_name()); + let vault_path = cli.dir().join(random_name()); let result = cli .create_named_vault( diff --git a/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs index 13facfdacf1..241f2d6a13e 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs @@ -45,7 +45,7 @@ mod test { use crate::kafka::{ ConsumerPublishing, ConsumerResolution, KafkaInletController, KafkaPortalListener, }; - use crate::test_utils::NodeManagerHandle; + use crate::test_utils::{NodeManagerHandle, TestNode}; // TODO: upgrade to 13 by adding a metadata request to map uuid<=>topic_name const TEST_KAFKA_API_VERSION: i16 = 12; @@ -131,6 +131,7 @@ mod test { async fn producer__flow_with_mock_kafka__content_encryption_and_decryption( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let consumer_bootstrap_port = create_kafka_service( diff --git a/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs b/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs index 049dbe02316..54a3d67d0d6 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs @@ -477,7 +477,7 @@ mod test { use crate::kafka::secure_channel_map::controller::KafkaSecureChannelControllerImpl; use crate::kafka::{ConsumerPublishing, ConsumerResolution}; use crate::port_range::PortRange; - use crate::test_utils::NodeManagerHandle; + use crate::test_utils::{NodeManagerHandle, TestNode}; use ockam::MessageReceiveOptions; use ockam_abac::{ Action, Env, Policies, Resource, ResourcePolicySqlxDatabase, ResourceType, @@ -517,6 +517,7 @@ mod test { async fn kafka_portal_worker__ping_pong_pass_through__should_pass( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let portal_inlet_address = setup_only_worker(context, &handle).await; @@ -554,6 +555,7 @@ mod test { async fn kafka_portal_worker__pieces_of_kafka_message__message_assembled( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let portal_inlet_address = setup_only_worker(context, &handle).await; @@ -596,6 +598,7 @@ mod test { async fn kafka_portal_worker__double_kafka_message__message_assembled( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let portal_inlet_address = setup_only_worker(context, &handle).await; @@ -633,6 +636,7 @@ mod test { async fn kafka_portal_worker__bigger_than_limit_kafka_message__error( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let portal_inlet_address = setup_only_worker(context, &handle).await; @@ -682,10 +686,11 @@ mod test { } #[allow(non_snake_case)] - #[ockam_macros::test(timeout = 5_000)] + #[ockam_macros::test(timeout = 500_000)] async fn kafka_portal_worker__almost_over_limit_than_limit_kafka_message__two_kafka_message_pass( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let portal_inlet_address = setup_only_worker(context, &handle).await; @@ -859,6 +864,7 @@ mod test { async fn kafka_portal_worker__metadata_exchange__response_changed( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let project_authority = handle .node_manager diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs index f9799553518..62d7afe7fab 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs @@ -7,6 +7,7 @@ mod test { use crate::kafka::secure_channel_map::controller::KafkaSecureChannelControllerImpl; use crate::kafka::{ConsumerPublishing, ConsumerResolution}; use crate::port_range::PortRange; + use crate::test_utils::TestNode; use kafka_protocol::messages::ApiKey; use kafka_protocol::messages::BrokerId; use kafka_protocol::messages::{ApiVersionsRequest, MetadataRequest, MetadataResponse}; @@ -22,6 +23,7 @@ mod test { async fn interceptor__basic_messages_with_several_api_versions__parsed_correctly( context: &mut Context, ) -> ockam::Result<()> { + TestNode::clean().await?; let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; let inlet_map = KafkaInletController::new( diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs index f3083103e05..d3c3c5de996 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs @@ -58,10 +58,13 @@ impl Drop for InMemoryNode { // because in that case they can be restarted if !self.persistent { executor::block_on(async { - self.node_manager - .delete_node() - .await - .unwrap_or_else(|e| panic!("cannot delete the node {}: {e:?}", self.node_name)) + // We need to recreate the CliState here to make sure that + // we get a fresh connection to the database (otherwise this code blocks) + // let cli_state = CliState::create(self.cli_state.dir()).await.unwrap(); + // cli_state + // .remove_node(&self.node_name) + // .await + // .unwrap_or_else(|e| panic!("cannot delete the node {}: {e:?}", self.node_name)); }); } } diff --git a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs index 767ba757017..d661061122c 100644 --- a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs @@ -19,6 +19,7 @@ use ockam::identity::utils::AttributesBuilder; use ockam::identity::SecureChannels; use ockam::Result; use ockam_core::AsyncTryClone; +use ockam_node::database::{DatabaseConfiguration, SqlxDatabase}; use ockam_transport_tcp::{HostnamePort, TcpListenerOptions, TcpTransport}; use crate::authenticator::credential_issuer::{DEFAULT_CREDENTIAL_VALIDITY, PROJECT_MEMBER_SCHEMA}; @@ -64,7 +65,7 @@ pub async fn start_manager_for_tests( ) .await?; - let cli_state = CliState::test().await?; + let cli_state = CliState::system().await?; let node_name = random_name(); cli_state @@ -202,6 +203,18 @@ pub struct TestNode { } impl TestNode { + /// If the database being used for the tests is Postgres then it is shared across all the tests and + /// needs be cleaned-up before a test is executed + pub async fn clean() -> Result<()> { + if let Some(configuration) = DatabaseConfiguration::postgres()? { + let db = SqlxDatabase::create_no_migration(configuration) + .await + .unwrap(); + db.drop_all_postgres_tables().await?; + }; + Ok(()) + } + pub async fn create(runtime: Arc, listen_addr: Option<&str>) -> Self { let (mut context, mut executor) = NodeBuilder::new().with_runtime(runtime.clone()).build(); runtime.spawn(async move { diff --git a/implementations/rust/ockam/ockam_api/tests/latency.rs b/implementations/rust/ockam/ockam_api/tests/latency.rs index 3bb6f6824ee..4e113a0aada 100644 --- a/implementations/rust/ockam/ockam_api/tests/latency.rs +++ b/implementations/rust/ockam/ockam_api/tests/latency.rs @@ -27,6 +27,7 @@ pub fn measure_message_latency_two_nodes() { let result: ockam::Result<()> = runtime_cloned.block_on(async move { let test_body = async move { + TestNode::clean().await?; let mut first_node = TestNode::create(runtime.clone(), None).await; let second_node = TestNode::create(runtime.clone(), None).await; @@ -124,6 +125,7 @@ pub fn measure_buffer_latency_two_nodes_portal() { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; + TestNode::clean().await?; let first_node = TestNode::create(runtime.clone(), None).await; let second_node = TestNode::create(runtime.clone(), None).await; diff --git a/implementations/rust/ockam/ockam_api/tests/portals.rs b/implementations/rust/ockam/ockam_api/tests/portals.rs index d1812ec593e..db712debf5b 100644 --- a/implementations/rust/ockam/ockam_api/tests/portals.rs +++ b/implementations/rust/ockam/ockam_api/tests/portals.rs @@ -21,6 +21,7 @@ use tracing::info; #[ockam_macros::test] async fn inlet_outlet_local_successful(context: &mut Context) -> ockam::Result<()> { + TestNode::clean().await?; let echo_server_handle = start_tcp_echo_server().await; let node_manager_handle = start_manager_for_tests(context, None, None).await?; @@ -94,6 +95,7 @@ fn portal_node_goes_down_reconnect() { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; + TestNode::clean().await?; let first_node = TestNode::create(runtime_cloned.clone(), None).await; let second_node = TestNode::create(runtime_cloned.clone(), None).await; @@ -233,6 +235,7 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; + TestNode::clean().await?; let first_node = TestNode::create(runtime_cloned.clone(), None).await; let second_node = TestNode::create(runtime_cloned, None).await; @@ -348,6 +351,7 @@ fn portal_heavy_load_exchanged() { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; + TestNode::clean().await?; let first_node = TestNode::create(runtime_cloned.clone(), None).await; let second_node = TestNode::create(runtime_cloned, None).await; @@ -488,6 +492,7 @@ fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disrup let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; + TestNode::clean().await?; let first_node = TestNode::create(runtime_cloned.clone(), None).await; let second_node = TestNode::create(runtime_cloned, None).await; diff --git a/implementations/rust/ockam/ockam_app_lib/Cargo.toml b/implementations/rust/ockam/ockam_app_lib/Cargo.toml index 8d380669714..11d43df5090 100644 --- a/implementations/rust/ockam/ockam_app_lib/Cargo.toml +++ b/implementations/rust/ockam/ockam_app_lib/Cargo.toml @@ -45,7 +45,7 @@ ockam_transport_tcp = { path = "../ockam_transport_tcp", version = "^0.117.0", d serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" #sqlx = { version = "0.7.4", features = ["runtime-tokio", "sqlite", "migrate", "postgres", "any"] } -sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "1b24b40de97db1cefe387bd590089bf915598547", features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } +sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "5fec648d2de0cbeed738dcf1c6f5bc9194fc439b", features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } thiserror = "1.0" tokio = { version = "1.38.0", features = ["full"] } tracing = { version = "0.1", default-features = false } diff --git a/implementations/rust/ockam/ockam_app_lib/src/lib.rs b/implementations/rust/ockam/ockam_app_lib/src/lib.rs index 9f200662556..3e7488fe2e3 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/lib.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/lib.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "256"] + //! //! This crate implements the business logic of the Ockam desktop application without providing a //! frontend. diff --git a/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository_sql.rs b/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository_sql.rs index 86c6d1da06d..af2fdf6970a 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository_sql.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/state/model_state_repository_sql.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use sqlx::*; use tracing::debug; -use ockam::{Boolean, FromSqlxError, Nullable, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam::{Boolean, FromSqlxError, Nullable, SqlxDatabase, ToVoid}; use ockam_api::nodes::models::portal::OutletStatus; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Error; @@ -59,8 +59,8 @@ impl ModelStateRepository for ModelStateSqlxDatabase { ON CONFLICT DO NOTHING"#, ) .bind(node_name) - .bind(tcp_outlet_status.socket_addr.to_sql()) - .bind(tcp_outlet_status.worker_addr.to_sql()) + .bind(tcp_outlet_status.socket_addr.to_string()) + .bind(tcp_outlet_status.worker_addr.to_string()) .bind(tcp_outlet_status.payload.as_ref()); query.execute(&mut *transaction).await.void()?; } diff --git a/implementations/rust/ockam/ockam_identity/Cargo.toml b/implementations/rust/ockam/ockam_identity/Cargo.toml index 77a951fd129..f01f65e2394 100644 --- a/implementations/rust/ockam/ockam_identity/Cargo.toml +++ b/implementations/rust/ockam/ockam_identity/Cargo.toml @@ -88,7 +88,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"] } serde_bare = { version = "0.5.0", default-features = false, features = ["alloc"] } serde_json = { version = "1.0", optional = true } sha2 = { version = "0.10", default-features = false } -sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "1b24b40de97db1cefe387bd590089bf915598547", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } +sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "5fec648d2de0cbeed738dcf1c6f5bc9194fc439b", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } tokio-retry = { version = "0.3.0", default-features = false, optional = true } tracing = { version = "0.1", default_features = false } tracing-attributes = { version = "0.1", default_features = false } diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs index 3a828f3b685..685a04c8a28 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/change_history_repository_sql.rs @@ -1,13 +1,15 @@ use core::str::FromStr; use sqlx::any::AnyArguments; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::query::Query; use sqlx::*; use tracing::debug; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, SqlxType, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::models::{ChangeHistory, Identifier}; use crate::{ChangeHistoryRepository, Identity, IdentityError, IdentityHistoryComparison, Vault}; @@ -38,7 +40,7 @@ impl ChangeHistoryRepository for ChangeHistorySqlxDatabase { let mut transaction = self.database.begin().await.into_core()?; let query1 = query_as("SELECT identifier, change_history FROM identity WHERE identifier = $1") - .bind(identity.identifier().to_sql()); + .bind(identity.identifier()); let row: Option = query1.fetch_optional(&mut *transaction).await.into_core()?; @@ -91,11 +93,11 @@ impl ChangeHistoryRepository for ChangeHistorySqlxDatabase { async fn delete_change_history(&self, identifier: &Identifier) -> Result<()> { let mut transaction = self.database.begin().await.into_core()?; - let query1 = query("DELETE FROM identity where identifier = $1").bind(identifier.to_sql()); + let query1 = query("DELETE FROM identity where identifier = $1").bind(identifier); query1.execute(&mut *transaction).await.void()?; - let query2 = query("DELETE FROM identity_attributes where identifier = $1") - .bind(identifier.to_sql()); + let query2 = + query("DELETE FROM identity_attributes where identifier = $1").bind(identifier); query2.execute(&mut *transaction).await.void()?; transaction.commit().await.void()?; Ok(()) @@ -104,7 +106,7 @@ impl ChangeHistoryRepository for ChangeHistorySqlxDatabase { async fn get_change_history(&self, identifier: &Identifier) -> Result> { let query = query_as("SELECT identifier, change_history FROM identity WHERE identifier = $1") - .bind(identifier.to_sql()); + .bind(identifier); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -121,8 +123,8 @@ impl ChangeHistoryRepository for ChangeHistorySqlxDatabase { impl ChangeHistorySqlxDatabase { fn insert_query<'a>( - identifier: &Identifier, - change_history: &ChangeHistory, + identifier: &'a Identifier, + change_history: &'a ChangeHistory, ) -> Query<'a, Any, AnyArguments<'a>> { query( r#" @@ -131,22 +133,34 @@ impl ChangeHistorySqlxDatabase { ON CONFLICT (identifier) DO UPDATE SET change_history = $2"#, ) - .bind(identifier.to_sql()) - .bind(change_history.to_sql()) + .bind(identifier) + .bind(change_history) } } // Database serialization / deserialization -impl ToSqlxType for Identifier { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_string()) +impl Type for Identifier { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for Identifier { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.to_string(), buf) + } +} + +impl Type for ChangeHistory { + fn type_info() -> ::TypeInfo { + >::type_info() } } -impl ToSqlxType for ChangeHistory { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.export_as_string().unwrap()) +impl Encode<'_, Any> for ChangeHistory { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.export_as_string().unwrap(), buf) } } diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs index ceebbbeb7af..0957b76dd23 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs @@ -1,9 +1,11 @@ +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use tracing::debug; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::models::{CredentialAndPurposeKey, Identifier}; use crate::{CredentialRepository, TimestampInSeconds}; @@ -67,8 +69,8 @@ impl CredentialRepository for CredentialSqlxDatabase { let query = query_as( "SELECT credential FROM credential WHERE subject_identifier = $1 AND issuer_identifier = $2 AND scope = $3 AND node_name = $4" ) - .bind(subject.to_sql()) - .bind(issuer.to_sql()) + .bind(subject) + .bind(issuer) .bind(scope) .bind(self.node_name.clone()); let cached_credential: Option = query @@ -91,25 +93,41 @@ impl CredentialRepository for CredentialSqlxDatabase { VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (subject_identifier, issuer_identifier, scope) DO UPDATE SET credential = $4, expires_at = $5, node_name = $6"#) - .bind(subject.to_sql()) - .bind(issuer.to_sql()) + .bind(subject) + .bind(issuer) .bind(scope) - .bind(credential.encode_as_cbor_bytes()?) - .bind(expires_at.to_sql()) + .bind(credential) + .bind(expires_at) .bind(self.node_name.clone()); query.execute(&*self.database.pool).await.void() } async fn delete(&self, subject: &Identifier, issuer: &Identifier, scope: &str) -> Result<()> { let query = query("DELETE FROM credential WHERE subject_identifier = $1 AND issuer_identifier = $2 AND scope = $3 AND node_name = $4") - .bind(subject.to_sql()) - .bind(issuer.to_sql()) + .bind(subject) + .bind(issuer) .bind(scope) .bind(self.node_name.clone()); query.execute(&*self.database.pool).await.void() } } +// Database serialization / deserialization + +// + +impl Type for CredentialAndPurposeKey { + fn type_info() -> ::TypeInfo { + as Type>::type_info() + } +} + +impl Encode<'_, Any> for CredentialAndPurposeKey { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + as Encode<'_, Any>>::encode_by_ref(&self.encode_as_cbor_bytes().unwrap(), buf) + } +} + // Low-level representation of a table row #[derive(FromRow)] struct CachedCredentialRow { diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs index beb7ce31699..f580a58b24b 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/identity_attributes_repository_sql.rs @@ -1,11 +1,13 @@ use core::str::FromStr; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use tracing::debug; use ockam_core::async_trait; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, Nullable, SqlxDatabase, SqlxType, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, Nullable, SqlxDatabase, ToVoid}; use crate::models::Identifier; use crate::{AttributesEntry, IdentityAttributesRepository, TimestampInSeconds}; @@ -47,8 +49,8 @@ impl IdentityAttributesRepository for IdentityAttributesSqlxDatabase { let query = query_as( "SELECT identifier, attributes, added, expires, attested_by FROM identity_attributes WHERE identifier = $1 AND attested_by = $2 AND node_name = $3" ) - .bind(identity.to_sql()) - .bind(attested_by.to_sql()) + .bind(identity) + .bind(attested_by) .bind(&self.node_name); let identity_attributes: Option = query .fetch_optional(&*self.database.pool) @@ -64,11 +66,11 @@ impl IdentityAttributesRepository for IdentityAttributesSqlxDatabase { VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (identifier, node_name) DO UPDATE SET attributes = $2, added = $3, expires = $4, attested_by = $5, node_name = $6"#) - .bind(subject.to_sql()) - .bind(minicbor::to_vec(entry.attrs())?) - .bind(entry.added_at().to_sql()) - .bind(entry.expires_at().map(|e| e.to_sql())) - .bind(entry.attested_by().map(|e| e.to_sql())) + .bind(subject) + .bind(&entry) + .bind(entry.added_at()) + .bind(entry.expires_at()) + .bind(entry.attested_by()) .bind(&self.node_name); query.execute(&*self.database.pool).await.void() } @@ -76,7 +78,7 @@ impl IdentityAttributesRepository for IdentityAttributesSqlxDatabase { // This query is regularly invoked by IdentitiesAttributes to make sure that we expire attributes regularly async fn delete_expired_attributes(&self, now: TimestampInSeconds) -> Result<()> { let query = query("DELETE FROM identity_attributes WHERE expires <= $1 AND node_name = $2") - .bind(now.to_sql()) + .bind(now) .bind(&self.node_name); query.execute(&*self.database.pool).await.void() } @@ -84,9 +86,15 @@ impl IdentityAttributesRepository for IdentityAttributesSqlxDatabase { // Database serialization / deserialization -impl ToSqlxType for TimestampInSeconds { - fn to_sql(&self) -> SqlxType { - self.0.to_sql() +impl Type for AttributesEntry { + fn type_info() -> ::TypeInfo { + as Type>::type_info() + } +} + +impl Encode<'_, Any> for AttributesEntry { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + as Encode<'_, Any>>::encode_by_ref(&minicbor::to_vec(self.attrs()).unwrap(), buf) } } diff --git a/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs b/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs index a51c7d4e406..3df30677e11 100644 --- a/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs +++ b/implementations/rust/ockam/ockam_identity/src/models/identifiers.rs @@ -1,5 +1,4 @@ use core::fmt::{Debug, Formatter}; - use minicbor::{Decode, Encode}; use crate::alloc::string::ToString; diff --git a/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs b/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs index 041f0d03c31..c28b09ddf7d 100644 --- a/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs +++ b/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs @@ -1,5 +1,8 @@ use minicbor::{Decode, Encode}; use serde::{Deserialize, Serialize}; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; +use sqlx::{Any, Database, Type}; /// Timestamp in seconds (UTC) #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Encode, Decode, Serialize, Deserialize)] @@ -7,3 +10,15 @@ use serde::{Deserialize, Serialize}; #[cbor(transparent)] #[serde(transparent)] pub struct TimestampInSeconds(#[n(0)] pub u64); + +impl Type for TimestampInSeconds { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl sqlx::Encode<'_, Any> for TimestampInSeconds { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&(self.0 as i64), buf) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs index d6a08ec9d5b..0f8e11d3768 100644 --- a/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/purpose_keys/storage/purpose_keys_repository_sql.rs @@ -1,5 +1,7 @@ use core::str::FromStr; +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use tracing::debug; @@ -8,7 +10,7 @@ use ockam_core::compat::string::{String, ToString}; use ockam_core::compat::vec::Vec; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, SqlxType, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::identity::IdentityConstants; use crate::models::{Identifier, PurposeKeyAttestation}; @@ -49,16 +51,16 @@ impl PurposeKeysRepository for PurposeKeysSqlxDatabase { ON CONFLICT (identifier, purpose) DO UPDATE SET purpose_key_attestation = $3"#, ) - .bind(subject.to_sql()) - .bind(purpose.to_sql()) - .bind(minicbor::to_vec(purpose_key_attestation)?); + .bind(subject) + .bind(purpose) + .bind(purpose_key_attestation); query.execute(&*self.database.pool).await.void() } async fn delete_purpose_key(&self, subject: &Identifier, purpose: Purpose) -> Result<()> { let query = query("DELETE FROM purpose_key WHERE identifier = $1 and purpose = $2") - .bind(subject.to_sql()) - .bind(purpose.to_sql()); + .bind(subject) + .bind(purpose); query.execute(&*self.database.pool).await.void() } @@ -68,8 +70,8 @@ impl PurposeKeysRepository for PurposeKeysSqlxDatabase { purpose: Purpose, ) -> Result> { let query = query_as("SELECT identifier, purpose, purpose_key_attestation FROM purpose_key WHERE identifier = $1 and purpose = $2") - .bind(identifier.to_sql()) - .bind(purpose.to_sql()); + .bind(identifier) + .bind(purpose); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -87,6 +89,34 @@ impl PurposeKeysRepository for PurposeKeysSqlxDatabase { // Database serialization / deserialization +impl Type for Purpose { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for Purpose { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + let purpose = match self { + Purpose::SecureChannel => IdentityConstants::SECURE_CHANNEL_PURPOSE_KEY, + Purpose::Credentials => IdentityConstants::CREDENTIALS_PURPOSE_KEY, + }; + >::encode_by_ref(&purpose.to_string(), buf) + } +} + +impl Type for PurposeKeyAttestation { + fn type_info() -> ::TypeInfo { + as Type>::type_info() + } +} + +impl Encode<'_, Any> for PurposeKeyAttestation { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + as Encode<'_, Any>>::encode_by_ref(&minicbor::to_vec(self).unwrap(), buf) + } +} + #[derive(FromRow)] pub(crate) struct PurposeKeyRow { // The identifier who is using this key @@ -121,19 +151,6 @@ impl PurposeKeyRow { } } -impl ToSqlxType for Purpose { - fn to_sql(&self) -> SqlxType { - match self { - Purpose::SecureChannel => { - SqlxType::Text(IdentityConstants::SECURE_CHANNEL_PURPOSE_KEY.to_string()) - } - Purpose::Credentials => { - SqlxType::Text(IdentityConstants::CREDENTIALS_PURPOSE_KEY.to_string()) - } - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs index 8b1339374ab..bd9651a0c39 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/storage/secure_channel_repository_sql.rs @@ -5,7 +5,7 @@ use crate::secure_channel::Role; use crate::Identifier; use ockam_core::{async_trait, Address}; use ockam_core::{Error, Result}; -use ockam_node::database::{FromSqlxError, SqlxDatabase, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use ockam_vault::{AeadSecretKeyHandle, HandleToSecret}; use crate::secure_channels::storage::secure_channel_repository::{ @@ -58,17 +58,17 @@ impl SecureChannelRepository for SecureChannelSqlxDatabase { DO UPDATE SET role = $1, my_identifier = $2, their_identifier = $3, decryptor_api_address = $5, decryption_key_handle = $6"# ) .bind(secure_channel.role().str()) - .bind(secure_channel.my_identifier().to_sql()) - .bind(secure_channel.their_identifier().to_sql()) - .bind(secure_channel.decryptor_remote().to_sql()) - .bind(secure_channel.decryptor_api().to_sql()) - .bind(secure_channel.decryption_key_handle().to_sql()); + .bind(secure_channel.my_identifier()) + .bind(secure_channel.their_identifier()) + .bind(secure_channel.decryptor_remote().to_string()) + .bind(secure_channel.decryptor_api().to_string()) + .bind(secure_channel.decryption_key_handle()); query.execute(&*self.database.pool).await.void() } async fn delete(&self, decryptor_remote_address: &Address) -> Result<()> { let query = query("DELETE FROM secure_channel WHERE decryptor_remote_address = $1") - .bind(decryptor_remote_address.to_sql()); + .bind(decryptor_remote_address.to_string()); query.execute(&*self.database.pool).await.void() } } diff --git a/implementations/rust/ockam/ockam_node/Cargo.toml b/implementations/rust/ockam/ockam_node/Cargo.toml index 50d4289abee..63e5df5b726 100644 --- a/implementations/rust/ockam/ockam_node/Cargo.toml +++ b/implementations/rust/ockam/ockam_node/Cargo.toml @@ -88,7 +88,7 @@ regex = { version = "1.10.5", default-features = false, optional = true } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1", optional = true } #sqlx = { version = "0.7.4", optional = true, features = ["sqlite", "postgres", "migrate", "runtime-tokio", "any"] } -sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "1b24b40de97db1cefe387bd590089bf915598547", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } +sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "5fec648d2de0cbeed738dcf1c6f5bc9194fc439b", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } tempfile = { version = "3.10.1" } time = { version = "0.3.36", default-features = false, optional = true } tokio = { version = "1.38", default-features = false, optional = true, features = ["sync", "time", "rt", "rt-multi-thread", "macros"] } diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/migration_support/migrator.rs b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/migration_support/migrator.rs index e316d1d8d54..dab03d369e6 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/migration_support/migrator.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/migration_support/migrator.rs @@ -8,7 +8,7 @@ use std::cmp::Ordering; use time::OffsetDateTime; use crate::database::migrations::migration_support::rust_migration::RustMigration; -use crate::database::{FromSqlxError, ToSqlxType, ToVoid}; +use crate::database::{FromSqlxError, ToVoid}; use ockam_core::Result; /// Migrator is responsible for running Sql and Rust migrations side by side in the correct order, @@ -160,7 +160,7 @@ impl Migrator { DO UPDATE SET run_on = $2"#, ) .bind(migration_name) - .bind(now.to_sql()); + .bind(now.unix_timestamp()); query.execute(&mut *connection).await.void()?; Ok(()) diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20231231100000_node_name_identity_attributes.rs b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20231231100000_node_name_identity_attributes.rs index c179d28ab86..a030e9d990b 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20231231100000_node_name_identity_attributes.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20231231100000_node_name_identity_attributes.rs @@ -71,12 +71,12 @@ impl NodeNameIdentityAttributes { for row in rows { for node_name in &node_names { let insert = query("INSERT INTO identity_attributes (identifier, attributes, added, expires, attested_by, node_name) VALUES ($1, $2, $3, $4, $5, $6)") - .bind(row.identifier.clone()) - .bind(row.attributes.clone()) + .bind(&row.identifier) + .bind(&row.attributes) .bind(row.added) .bind(row.expires.to_option()) .bind(row.attested_by.to_option()) - .bind(node_name.name.clone()); + .bind(&node_name.name); insert.execute(&mut *transaction).await.void()?; } diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240111100002_delete_trust_context.rs b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240111100002_delete_trust_context.rs index b1c27fa6381..0c9771797ae 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240111100002_delete_trust_context.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240111100002_delete_trust_context.rs @@ -64,9 +64,9 @@ impl PolicyTrustContextId { }; for node_name in &node_names { let insert = query("INSERT INTO policy (resource, action, expression, node_name) VALUES ($1, $2, $3, $4)") - .bind(row.resource.clone()) - .bind(row.action.clone()) - .bind(expression.clone()) + .bind(&row.resource) + .bind(&row.action) + .bind(&expression) .bind(node_name); insert.execute(&mut *transaction).await.void()?; diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240313100000_remove_orphan_resources.rs b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240313100000_remove_orphan_resources.rs index 6765b5524d4..6a491dcbb00 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240313100000_remove_orphan_resources.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240313100000_remove_orphan_resources.rs @@ -157,14 +157,14 @@ mod test { Ok(()) } /// HELPERS - fn insert_resource( - resource: &str, - node_name: &str, - ) -> Query<'static, Any, AnyArguments<'static>> { + fn insert_resource<'a>( + resource: &'a str, + node_name: &'a str, + ) -> Query<'a, Any, AnyArguments<'a>> { let resource_type = random_string(); query("INSERT INTO resource (resource_name, resource_type, node_name) VALUES ($1, $2, $3)") - .bind(resource.to_string()) + .bind(resource) .bind(resource_type) - .bind(node_name.to_string()) + .bind(node_name) } } diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240503100000_update_policy_expressions.rs b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240503100000_update_policy_expressions.rs index 9e65e4146fe..1c61bd2acff 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240503100000_update_policy_expressions.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/rust/sqlite/migration_20240503100000_update_policy_expressions.rs @@ -124,23 +124,23 @@ mod test { } /// HELPERS - fn insert_resource_policy(resource: &str) -> Query<'static, Any, AnyArguments<'static>> { + fn insert_resource_policy(resource: &str) -> Query<'_, Any, AnyArguments<'_>> { let action = "handle_message"; let expression = "subject.has_credential"; let node_name = random_string(); query("INSERT INTO resource_policy (resource_name, action, expression, node_name) VALUES ($1, $2, $3, $4)") - .bind(resource.to_string()) + .bind(resource) .bind(action) .bind(expression) .bind(node_name) } - fn insert_resource_type_policy(resource: &str) -> Query<'static, Any, AnyArguments<'static>> { + fn insert_resource_type_policy(resource: &str) -> Query<'_, Any, AnyArguments<'_>> { let action = "handle_message"; let expression = "subject.has_credential"; let node_name = random_string(); query("INSERT INTO resource_type_policy (resource_type, action, expression, node_name) VALUES ($1, $2, $3, $4)") - .bind(resource.to_string()) + .bind(resource) .bind(action) .bind(expression) .bind(node_name) diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs b/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs index 23886ddc5c3..aa19faf6447 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/mod.rs @@ -2,10 +2,8 @@ mod database_configuration; mod migrations; mod sqlx_database; mod sqlx_from_row_types; -mod sqlx_types; pub use database_configuration::*; pub use migrations::*; pub use sqlx_database::*; pub use sqlx_from_row_types::*; -pub use sqlx_types::*; diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs b/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs index ed38cbcb3c0..04af4737e36 100644 --- a/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs +++ b/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_database.rs @@ -7,7 +7,7 @@ use std::path::{Path, PathBuf}; use ockam_core::errcode::{Kind, Origin}; use sqlx::any::{install_default_drivers, AnyConnectOptions}; use sqlx::pool::PoolOptions; -use sqlx::{Any, ConnectOptions, Pool, Row}; +use sqlx::{Any, ConnectOptions, Pool}; use tempfile::NamedTempFile; use tokio_retry::strategy::{jitter, FixedInterval}; use tokio_retry::Retry; @@ -84,7 +84,7 @@ impl SqlxDatabase { match DatabaseConfiguration::postgres()? { Some(configuration) => { let db = Self::create_no_migration(configuration.clone()).await?; - db.drop_tables().await?; + db.drop_all_postgres_tables().await?; SqlxDatabase::create(configuration).await }, None => Err(Error::new(Origin::Core, Kind::NotFound, "There is no postgres database configuration, or it is incomplete. Please run ockam environment to check the database environment variables".to_string())), @@ -96,7 +96,7 @@ impl SqlxDatabase { match DatabaseConfiguration::postgres()? { Some(configuration) => { let db = Self::create_application_no_migration(configuration.clone()).await?; - db.drop_tables().await?; + db.drop_all_postgres_tables().await?; SqlxDatabase::create_application_database(configuration).await }, None => Err(Error::new(Origin::Core, Kind::NotFound, "There is no postgres database configuration, or it is incomplete. Please run ockam environment to check the database environment variables".to_string())), @@ -247,38 +247,61 @@ impl SqlxDatabase { Error::new(Origin::Application, Kind::Io, err) } - /// Drop all the database tables - pub async fn drop_tables(&self) -> Result<()> { + /// Drop all the postgres database tables + pub async fn drop_all_postgres_tables(&self) -> Result<()> { + self.clean_postgres_node_tables(Clean::Drop, None).await + } + + /// Truncate all the postgres database tables + pub async fn truncate_all_postgres_tables(&self) -> Result<()> { + self.clean_postgres_node_tables(Clean::Truncate, None).await + } + + /// Drop all the database tables _except_ for the journey tables + pub async fn drop_postgres_node_tables(&self) -> Result<()> { + self.clean_postgres_node_tables(Clean::Drop, Some("AND tablename NOT LIKE '%journey%'")) + .await + } + + /// Truncate all the database tables _except_ for the journey tables + pub async fn truncate_postgres_node_tables(&self) -> Result<()> { + self.clean_postgres_node_tables(Clean::Truncate, Some("AND tablename NOT LIKE '%journey%'")) + .await + } + + /// Truncate all the database tables _except_ for the journey tables + async fn clean_postgres_node_tables(&self, clean: Clean, filter: Option<&str>) -> Result<()> { match self.configuration.database_type() { - DatabaseType::Sqlite => { - let tables: Vec = - sqlx::query("SELECT name FROM sqlite_master WHERE type='table';") - .fetch_all(&*self.pool) - .await.into_core()? - .into_iter() - .map(|row| row.get(0)) - .collect(); - - // Generate and execute DROP TABLE statements for each table - for table in tables { - let drop_query = format!("DROP TABLE IF EXISTS {};", table); - sqlx::query(&drop_query).execute(&*self.pool).await.void()?; - } - Ok(()) - } - DatabaseType::Postgres => sqlx::query( - r#"DO $$ + DatabaseType::Sqlite => Ok(()), + DatabaseType::Postgres => { + sqlx::query( + format!(r#"DO $$ DECLARE r RECORD; BEGIN - FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') LOOP - EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE'; + FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public' {}) LOOP + EXECUTE '{} TABLE ' || quote_ident(r.tablename) || ' CASCADE'; END LOOP; - END $$;"#, - ) - .execute(&*self.pool) - .await - .void(), + END $$;"#, filter.unwrap_or(""), clean.as_str(), + ).as_str()) + .execute(&*self.pool) + .await + .void() + } + } + } +} + +enum Clean { + Drop, + Truncate, +} + +impl Clean { + fn as_str(&self) -> &str { + match self { + Clean::Drop => "DROP", + Clean::Truncate => "TRUNCATE", } } } @@ -299,7 +322,7 @@ where // only run the postgres tests if the OCKAM_POSTGRES_* environment variables are set if let Ok(db) = SqlxDatabase::create_new_postgres().await { rethrow("Postgres local", f(db.clone())).await?; - db.drop_tables().await?; + db.drop_all_postgres_tables().await?; }; Ok(()) } @@ -321,7 +344,7 @@ where // only run the postgres tests if the OCKAM_POSTGRES_* environment variables are set if let Ok(db) = SqlxDatabase::create_new_application_postgres().await { rethrow("Postgres local", f(db.clone())).await?; - db.drop_tables().await?; + db.drop_all_postgres_tables().await?; } Ok(()) } @@ -420,7 +443,7 @@ pub mod tests { async fn test_create_postgres_database() -> Result<()> { if let Some(configuration) = DatabaseConfiguration::postgres()? { let db = SqlxDatabase::create_no_migration(configuration.clone()).await?; - db.drop_tables().await?; + db.drop_all_postgres_tables().await?; let db = SqlxDatabase::create(configuration).await?; let inserted = insert_identity(&db).await.unwrap(); diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_types.rs b/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_types.rs deleted file mode 100644 index 4f45c4f4c85..00000000000 --- a/implementations/rust/ockam/ockam_node/src/storage/database/sqlx_types.rs +++ /dev/null @@ -1,166 +0,0 @@ -use chrono::{DateTime, Utc}; -use sqlx::database::HasArguments; -use sqlx::encode::IsNull; -use sqlx::{Any, Database, Encode, Type}; -use std::net::SocketAddr; -use std::path::{Path, PathBuf}; -use time::OffsetDateTime; - -use ockam_core::Address; - -/// This enum represents the set of types that we currently support in our database. -/// We support the types which Sqlite uses: -/// https://www.sqlite.org/datatype3.html -/// -/// The purpose of this type is to ease the serialization of data types in Ockam into data types in -/// our database. For example, if we describe how to translate an `Identifier` into some `Text` then -/// we can use the `Text` as a parameter in a sqlx query. -/// -/// Note: see the `ToSqlxType` trait and its instances for how the conversion is done -/// -pub enum SqlxType { - /// This type represents text in the database - Text(String), - /// This type represents arbitrary bytes in the database - Blob(Vec), - /// This type represents ints, signed or unsigned - Integer(i64), - /// This type represents floats - #[allow(unused)] - Real(f64), -} - -/// The SqlxType implements the Type trait from sqlx to allow its values to be serialized -/// to any database -impl Type for SqlxType { - fn type_info() -> ::TypeInfo { - as Type>::type_info() - } -} - -/// The SqlType implements the Encode trait from sqlx to allow its values to be serialized -/// to any database. There is a 1 to 1 mapping with the database native types -impl Encode<'_, Any> for SqlxType { - fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { - match self { - SqlxType::Text(v) => >::encode_by_ref(v, buf), - SqlxType::Blob(v) => as Encode<'_, Any>>::encode_by_ref(v, buf), - SqlxType::Integer(v) => >::encode_by_ref(v, buf), - SqlxType::Real(v) => >::encode_by_ref(v, buf), - } - } - - fn produces(&self) -> Option<::TypeInfo> { - Some(match self { - SqlxType::Text(_) => >::type_info(), - SqlxType::Blob(_) => as Type>::type_info(), - SqlxType::Integer(_) => >::type_info(), - SqlxType::Real(_) => >::type_info(), - }) - } -} - -/// This trait can be implemented by any type that can be converted to a database type -/// Typically an `Identifier` (to a `Text`), a `TimestampInSeconds` (to an `Integer`) etc... -/// -/// This allows a value to be used as a bind parameters in a sqlx query for example: -/// -/// use std::str::FromStr; -/// use sqlx::query_as; -/// use ockam_node::database::{SqlxType, ToSqlxType}; -/// -/// // newtype for a UNIX-like timestamp -/// struct TimestampInSeconds(u64); -/// -/// // this implementation maps the TimestampInSecond type to one of the types that Sqlx -/// // can serialize for any database -/// impl ToSqlxType for TimestampInSeconds { -/// fn to_sql(&self) -> SqlxType { -/// self.0.to_sql() -/// } -/// } -/// -/// let timestamp = TimestampInSeconds(10000000); -/// let query = query_as("SELECT identifier, change_history FROM identity WHERE created_at >= $1").bind(timestamp.as_sql()); -/// -/// -pub trait ToSqlxType { - /// Return the appropriate sql type - fn to_sql(&self) -> SqlxType; -} - -impl ToSqlxType for u64 { - fn to_sql(&self) -> SqlxType { - SqlxType::Integer(*self as i64) - } -} - -impl ToSqlxType for u32 { - fn to_sql(&self) -> SqlxType { - SqlxType::Integer(*self as i64) - } -} - -impl ToSqlxType for u16 { - fn to_sql(&self) -> SqlxType { - SqlxType::Integer(*self as i64) - } -} - -impl ToSqlxType for u8 { - fn to_sql(&self) -> SqlxType { - SqlxType::Integer(*self as i64) - } -} - -impl ToSqlxType for i8 { - fn to_sql(&self) -> SqlxType { - SqlxType::Integer(*self as i64) - } -} - -impl ToSqlxType for OffsetDateTime { - fn to_sql(&self) -> SqlxType { - SqlxType::Integer(self.unix_timestamp()) - } -} - -impl ToSqlxType for DateTime { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_rfc3339()) - } -} - -impl ToSqlxType for &[u8; 32] { - fn to_sql(&self) -> SqlxType { - SqlxType::Blob(self.to_vec().clone()) - } -} - -impl ToSqlxType for SocketAddr { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_string()) - } -} - -impl ToSqlxType for Address { - fn to_sql(&self) -> SqlxType { - SqlxType::Text(self.to_string()) - } -} - -impl ToSqlxType for PathBuf { - fn to_sql(&self) -> SqlxType { - self.as_path().to_sql() - } -} - -impl ToSqlxType for &Path { - fn to_sql(&self) -> SqlxType { - SqlxType::Text( - self.to_str() - .unwrap_or("a path should be a valid string") - .into(), - ) - } -} diff --git a/implementations/rust/ockam/ockam_vault/Cargo.toml b/implementations/rust/ockam/ockam_vault/Cargo.toml index 13cb2a9526c..ee5bb609c48 100644 --- a/implementations/rust/ockam/ockam_vault/Cargo.toml +++ b/implementations/rust/ockam/ockam_vault/Cargo.toml @@ -90,7 +90,7 @@ rand_pcg = { version = "0.3.1", default-features = false, optional = true } serde = { version = "1", default-features = false, features = ["derive"] } sha2 = { version = "0.10", default-features = false } #sqlx = { version = "0.7.4", optional = true, features = ["migrate", "postgres", "sqlite", "any"] } -sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "1b24b40de97db1cefe387bd590089bf915598547", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } +sqlx = { git = "https://github.com/etorreborre/sqlx", rev = "5fec648d2de0cbeed738dcf1c6f5bc9194fc439b", optional = true, features = ["runtime-tokio", "sqlite", "postgres", "migrate", "any"] } static_assertions = "1.1.0" tracing = { version = "0.1", default-features = false } x25519-dalek = { version = "2.0.1", default_features = false, features = ["precomputed-tables", "static_secrets", "zeroize"] } diff --git a/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/types.rs b/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/types.rs index c348caffb95..e4b4015e914 100644 --- a/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/types.rs +++ b/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/types.rs @@ -47,6 +47,16 @@ pub enum SigningSecret { ECDSASHA256CurveP256(ECDSASHA256CurveP256SecretKey), } +impl SigningSecret { + /// Return the secret key + pub fn key(&self) -> &[u8; 32] { + match self { + SigningSecret::EdDSACurve25519(k) => k.key(), + SigningSecret::ECDSASHA256CurveP256(k) => k.key(), + } + } +} + const_assert_eq!( ed25519_dalek::SECRET_KEY_LENGTH, EDDSA_CURVE25519_SECRET_KEY_LENGTH diff --git a/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs b/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs index 2fffb0cb908..dbb3aa5a7d2 100644 --- a/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs +++ b/implementations/rust/ockam/ockam_vault/src/storage/secrets_repository_sql.rs @@ -1,3 +1,5 @@ +use sqlx::database::HasArguments; +use sqlx::encode::IsNull; use sqlx::*; use tracing::debug; use zeroize::{Zeroize, ZeroizeOnDrop}; @@ -6,7 +8,7 @@ use ockam_core::async_trait; use ockam_core::compat::vec::Vec; use ockam_core::errcode::{Kind, Origin}; use ockam_core::Result; -use ockam_node::database::{FromSqlxError, SqlxDatabase, SqlxType, ToSqlxType, ToVoid}; +use ockam_node::database::{FromSqlxError, SqlxDatabase, ToVoid}; use crate::storage::secrets_repository::SecretsRepository; @@ -57,14 +59,14 @@ impl SecretsRepository for SecretsSqlxDatabase { ON CONFLICT (handle) DO UPDATE SET secret_type = $2, secret = $3"#, ) - .bind(handle.to_sql()) + .bind(handle) .bind(secret_type) - .bind(secret.to_sql()); + .bind(secret); query.execute(&*self.database.pool).await.void() } async fn delete_signing_secret(&self, handle: &SigningSecretKeyHandle) -> Result { - let query = query("DELETE FROM signing_secret WHERE handle = $1").bind(handle.to_sql()); + let query = query("DELETE FROM signing_secret WHERE handle = $1").bind(handle); let res = query.execute(&*self.database.pool).await.into_core()?; Ok(res.rows_affected() != 0) @@ -76,7 +78,7 @@ impl SecretsRepository for SecretsSqlxDatabase { ) -> Result> { let query = query_as("SELECT handle, secret_type, secret FROM signing_secret WHERE handle = $1") - .bind(handle.to_sql()); + .bind(handle); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -106,13 +108,13 @@ impl SecretsRepository for SecretsSqlxDatabase { ON CONFLICT (handle) DO UPDATE SET secret = $2"#, ) - .bind(handle.to_sql()) - .bind(secret.to_sql()); + .bind(handle) + .bind(secret); query.execute(&*self.database.pool).await.void() } async fn delete_x25519_secret(&self, handle: &X25519SecretKeyHandle) -> Result { - let query = query("DELETE FROM x25519_secret WHERE handle = $1").bind(handle.to_sql()); + let query = query("DELETE FROM x25519_secret WHERE handle = $1").bind(handle); let res = query.execute(&*self.database.pool).await.into_core()?; Ok(res.rows_affected() != 0) @@ -122,8 +124,8 @@ impl SecretsRepository for SecretsSqlxDatabase { &self, handle: &X25519SecretKeyHandle, ) -> Result> { - let query = query_as("SELECT handle, secret FROM x25519_secret WHERE handle = $1") - .bind(handle.to_sql()); + let query = + query_as("SELECT handle, secret FROM x25519_secret WHERE handle = $1").bind(handle); let row: Option = query .fetch_optional(&*self.database.pool) .await @@ -152,14 +154,14 @@ impl SecretsRepository for SecretsSqlxDatabase { ON CONFLICT (handle) DO UPDATE SET type = $2, secret = $3"#, ) - .bind(handle.to_sql()) + .bind(handle) .bind(AEAD_TYPE) - .bind(secret.to_sql()); + .bind(secret); query.execute(&*self.database.pool).await.void() } async fn delete_aead_secret(&self, handle: &AeadSecretKeyHandle) -> Result { - let query = query("DELETE FROM aead_secret WHERE handle = $1").bind(handle.to_sql()); + let query = query("DELETE FROM aead_secret WHERE handle = $1").bind(handle); let res = query.execute(&*self.database.pool).await.into_core()?; Ok(res.rows_affected() != 0) @@ -167,7 +169,7 @@ impl SecretsRepository for SecretsSqlxDatabase { async fn get_aead_secret(&self, handle: &AeadSecretKeyHandle) -> Result> { let query = query_as("SELECT secret FROM aead_secret WHERE handle = $1 AND type = $2") - .bind(handle.to_sql()) + .bind(handle) .bind(AEAD_TYPE); let row: Option = query .fetch_optional(&*self.database.pool) @@ -191,48 +193,91 @@ impl SecretsRepository for SecretsSqlxDatabase { } } -impl ToSqlxType for SigningSecret { - fn to_sql(&self) -> SqlxType { - match self { - SigningSecret::EdDSACurve25519(k) => k.key().to_sql(), - SigningSecret::ECDSASHA256CurveP256(k) => k.key().to_sql(), - } +impl Type for SigningSecret { + fn type_info() -> ::TypeInfo { + as Type>::type_info() + } +} + +impl Encode<'_, Any> for SigningSecret { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + let key = match self { + SigningSecret::EdDSACurve25519(k) => k.key(), + SigningSecret::ECDSASHA256CurveP256(k) => k.key(), + }; + as Encode<'_, Any>>::encode_by_ref(&key.to_vec(), buf) + } +} + +impl Type for SigningSecretKeyHandle { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for SigningSecretKeyHandle { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(self.handle(), buf) + } +} + +impl Type for HandleToSecret { + fn type_info() -> ::TypeInfo { + as Type>::type_info() + } +} + +impl Encode<'_, Any> for HandleToSecret { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + as Encode<'_, Any>>::encode_by_ref(self.value(), buf) + } +} + +impl Type for X25519SecretKeyHandle { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl Encode<'_, Any> for X25519SecretKeyHandle { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.0, buf) } } -impl ToSqlxType for SigningSecretKeyHandle { - fn to_sql(&self) -> SqlxType { - self.handle().to_sql() +impl Type for AeadSecretKeyHandle { + fn type_info() -> ::TypeInfo { + >::type_info() } } -impl ToSqlxType for X25519SecretKeyHandle { - fn to_sql(&self) -> SqlxType { - SqlxType::Blob(self.0.value().clone()) +impl Encode<'_, Any> for AeadSecretKeyHandle { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + >::encode_by_ref(&self.0 .0, buf) } } -impl ToSqlxType for AeadSecretKeyHandle { - fn to_sql(&self) -> SqlxType { - self.0 .0.to_sql() +impl Type for X25519SecretKey { + fn type_info() -> ::TypeInfo { + as Type>::type_info() } } -impl ToSqlxType for HandleToSecret { - fn to_sql(&self) -> SqlxType { - SqlxType::Blob(self.value().clone()) +impl Encode<'_, Any> for X25519SecretKey { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + as Encode<'_, Any>>::encode_by_ref(&self.key().to_vec(), buf) } } -impl ToSqlxType for X25519SecretKey { - fn to_sql(&self) -> SqlxType { - self.key().to_sql() +impl Type for AeadSecret { + fn type_info() -> ::TypeInfo { + as Type>::type_info() } } -impl ToSqlxType for AeadSecret { - fn to_sql(&self) -> SqlxType { - SqlxType::Blob(self.0.to_vec().clone()) +impl Encode<'_, Any> for AeadSecret { + fn encode_by_ref(&self, buf: &mut ::ArgumentBuffer) -> IsNull { + as Encode<'_, Any>>::encode_by_ref(&self.0.to_vec(), buf) } }