diff --git a/src/app/cached_app_manager.rs b/src/app/cached_app_manager.rs new file mode 100644 index 00000000..40f0dd7f --- /dev/null +++ b/src/app/cached_app_manager.rs @@ -0,0 +1,339 @@ +use crate::app::config::App; +use crate::app::manager::AppManager; +use crate::cache::manager::CacheManager; +use crate::error::{Error, Result}; +use crate::options::CacheSettings; +use async_trait::async_trait; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{debug, warn}; + +const CACHE_PREFIX_ID: &str = "app:id"; +const CACHE_PREFIX_KEY: &str = "app:key"; + +/// Caching decorator for AppManager implementations +pub struct CachedAppManager { + inner: Arc, + cache: Arc>, + settings: CacheSettings, +} + +impl CachedAppManager { + pub fn new( + inner: Arc, + cache: Arc>, + settings: CacheSettings, + ) -> Self { + Self { + inner, + cache, + settings, + } + } + + fn cache_key_for_id(app_id: &str) -> String { + format!("{}:{}", CACHE_PREFIX_ID, app_id) + } + + fn cache_key_for_key(app_key: &str) -> String { + format!("{}:{}", CACHE_PREFIX_KEY, app_key) + } + + fn serialize(value: &T) -> Result { + serde_json::to_string(value) + .map_err(|e| Error::Internal(format!("Serialization failed: {}", e))) + } + + fn deserialize(json: &str) -> Result { + serde_json::from_str(json) + .map_err(|e| Error::Internal(format!("Deserialization failed: {}", e))) + } + + async fn get(&self, key: &str) -> Option { + let mut cache = self.cache.lock().await; + match cache.get(key).await { + Ok(Some(json)) => match Self::deserialize(&json) { + Ok(value) => { + debug!("Cache hit: {}", key); + Some(value) + } + Err(e) => { + warn!("Cache deserialize error for {}: {}", key, e); + None + } + }, + Ok(None) => { + debug!("Cache miss: {}", key); + None + } + Err(e) => { + warn!("Cache get error for {}: {}", key, e); + None + } + } + } + + async fn set(&self, key: &str, value: &T) { + let json = match Self::serialize(value) { + Ok(j) => j, + Err(e) => { + warn!("Cache serialize error for {}: {}", key, e); + return; + } + }; + + let mut cache = self.cache.lock().await; + if let Err(e) = cache.set(key, &json, self.settings.ttl).await { + warn!("Cache set error for {}: {}", key, e); + } + } + + async fn remove(&self, key: &str) { + let mut cache = self.cache.lock().await; + if let Err(e) = cache.remove(key).await { + warn!("Cache remove error for {}: {}", key, e); + } + } + + async fn cache_app(&self, app: &App) { + self.set(&Self::cache_key_for_id(&app.id), app).await; + self.set(&Self::cache_key_for_key(&app.key), app).await; + } + + async fn invalidate_app(&self, app_id: &str, app_key: &str) { + self.remove(&Self::cache_key_for_id(app_id)).await; + self.remove(&Self::cache_key_for_key(app_key)).await; + } + + async fn invalidate_app_by_id(&self, app_id: &str) { + let id_key = Self::cache_key_for_id(app_id); + if let Some(app) = self.get::(&id_key).await { + self.invalidate_app(app_id, &app.key).await; + } else { + self.remove(&id_key).await; + } + } +} + +#[async_trait] +impl AppManager for CachedAppManager { + async fn init(&self) -> Result<()> { + self.inner.init().await + } + + /// Get an app by ID from cache or database + async fn find_by_id(&self, app_id: &str) -> Result> { + if !self.settings.enabled { + return self.inner.find_by_id(app_id).await; + } + + let cache_key = Self::cache_key_for_id(app_id); + if let Some(app) = self.get::(&cache_key).await { + return Ok(Some(app)); + } + + let app = self.inner.find_by_id(app_id).await?; + if let Some(ref app) = app { + self.cache_app(app).await; + } + + Ok(app) + } + + /// Get an app by key from cache or database + async fn find_by_key(&self, key: &str) -> Result> { + if !self.settings.enabled { + return self.inner.find_by_key(key).await; + } + + let cache_key = Self::cache_key_for_key(key); + if let Some(app) = self.get::(&cache_key).await { + return Ok(Some(app)); + } + + let app = self.inner.find_by_key(key).await?; + if let Some(ref app) = app { + self.cache_app(app).await; + } + + Ok(app) + } + + /// Register a new app in the database and cache + async fn create_app(&self, config: App) -> Result<()> { + self.inner.create_app(config.clone()).await?; + + if self.settings.enabled { + self.cache_app(&config).await; + } + + Ok(()) + } + + /// Update an existing app and refresh the cache + async fn update_app(&self, config: App) -> Result<()> { + self.inner.update_app(config.clone()).await?; + + if self.settings.enabled { + self.invalidate_app(&config.id, &config.key).await; + self.cache_app(&config).await; + } + + Ok(()) + } + + /// Remove an app from the database and cache + async fn delete_app(&self, app_id: &str) -> Result<()> { + let app = if self.settings.enabled { + self.inner.find_by_id(app_id).await? + } else { + None + }; + + self.inner.delete_app(app_id).await?; + + if self.settings.enabled { + if let Some(app) = app { + self.invalidate_app(app_id, &app.key).await; + } else { + self.invalidate_app_by_id(app_id).await; + } + } + + Ok(()) + } + + /// Get all apps from the database + async fn get_apps(&self) -> Result> { + let apps = self.inner.get_apps().await?; + + if self.settings.enabled && !apps.is_empty() { + for app in &apps { + self.cache_app(app).await; + } + debug!("Cached {} apps", apps.len()); + } + + Ok(apps) + } + + async fn check_health(&self) -> Result<()> { + self.inner.check_health().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::app::memory_app_manager::MemoryAppManager; + use crate::cache::memory_cache_manager::MemoryCacheManager; + use crate::options::MemoryCacheOptions; + + fn create_test_app(id: &str) -> App { + App { + id: id.to_string(), + key: format!("{}_key", id), + secret: format!("{}_secret", id), + max_connections: 100, + enable_client_messages: false, + enabled: true, + max_client_events_per_second: 100, + ..Default::default() + } + } + + async fn create_test_manager() -> CachedAppManager { + let inner = Arc::new(MemoryAppManager::new()); + let cache = Arc::new(Mutex::new(MemoryCacheManager::new( + "test".to_string(), + MemoryCacheOptions { + ttl: 300, + cleanup_interval: 60, + max_capacity: 1000, + }, + ))); + let settings = CacheSettings { + enabled: true, + ttl: 300, + }; + + CachedAppManager::new(inner, cache, settings) + } + + #[tokio::test] + async fn test_find_by_id_caches_result() { + let manager = create_test_manager().await; + let app = create_test_app("test1"); + + manager.create_app(app.clone()).await.unwrap(); + + let found1 = manager.find_by_id("test1").await.unwrap(); + assert!(found1.is_some()); + + let found2 = manager.find_by_id("test1").await.unwrap(); + assert!(found2.is_some()); + assert_eq!(found2.unwrap().id, "test1"); + } + + #[tokio::test] + async fn test_find_by_key_caches_result() { + let manager = create_test_manager().await; + let app = create_test_app("test2"); + + manager.create_app(app.clone()).await.unwrap(); + + let found = manager.find_by_key("test2_key").await.unwrap(); + assert!(found.is_some()); + assert_eq!(found.unwrap().key, "test2_key"); + } + + #[tokio::test] + async fn test_update_invalidates_cache() { + let manager = create_test_manager().await; + let mut app = create_test_app("test3"); + + manager.create_app(app.clone()).await.unwrap(); + + let found = manager.find_by_id("test3").await.unwrap().unwrap(); + assert_eq!(found.max_connections, 100); + + app.max_connections = 200; + manager.update_app(app).await.unwrap(); + + let found_updated = manager.find_by_id("test3").await.unwrap().unwrap(); + assert_eq!(found_updated.max_connections, 200); + } + + #[tokio::test] + async fn test_delete_invalidates_cache() { + let manager = create_test_manager().await; + let app = create_test_app("test4"); + + manager.create_app(app).await.unwrap(); + assert!(manager.find_by_id("test4").await.unwrap().is_some()); + + manager.delete_app("test4").await.unwrap(); + assert!(manager.find_by_id("test4").await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_cache_disabled() { + let inner = Arc::new(MemoryAppManager::new()); + let cache = Arc::new(Mutex::new(MemoryCacheManager::new( + "test".to_string(), + MemoryCacheOptions::default(), + ))); + let settings = CacheSettings { + enabled: false, + ttl: 300, + }; + + let manager = CachedAppManager::new(inner, cache, settings); + let app = create_test_app("test5"); + + manager.create_app(app).await.unwrap(); + + let found = manager.find_by_id("test5").await.unwrap(); + assert!(found.is_some()); + } +} diff --git a/src/app/dynamodb_app_manager.rs b/src/app/dynamodb_app_manager.rs index 47db8f0b..ab376e36 100644 --- a/src/app/dynamodb_app_manager.rs +++ b/src/app/dynamodb_app_manager.rs @@ -4,10 +4,7 @@ use crate::app::manager::AppManager; use crate::error::{Error, Result}; use crate::webhook::types::Webhook; use async_trait::async_trait; -use moka::future::Cache; use std::collections::HashMap; -use std::time::Duration; -use tracing::debug; /// Configuration for DynamoDB App Manager #[derive(Debug, Clone)] @@ -18,8 +15,6 @@ pub struct DynamoDbConfig { pub access_key: Option, pub secret_key: Option, pub profile_name: Option, - pub cache_ttl: u64, - pub cache_max_capacity: u64, } impl Default for DynamoDbConfig { @@ -31,8 +26,6 @@ impl Default for DynamoDbConfig { access_key: None, secret_key: None, profile_name: None, - cache_ttl: 3600, - cache_max_capacity: 10000, } } } @@ -44,7 +37,6 @@ type DynamoClient = aws_sdk_dynamodb::Client; pub struct DynamoDbAppManager { config: DynamoDbConfig, client: DynamoClient, - app_cache: Cache, } impl DynamoDbAppManager { @@ -82,18 +74,8 @@ impl DynamoDbAppManager { // Create DynamoDB client let client = aws_sdk_dynamodb::Client::new(&aws_config); - // Initialize cache - let app_cache = Cache::builder() - .time_to_live(Duration::from_secs(config.cache_ttl)) - .max_capacity(config.cache_max_capacity) - .build(); - // Create the manager - let manager = Self { - config, - client, - app_cache, - }; + let manager = Self { config, client }; Ok(manager) } @@ -446,15 +428,8 @@ impl DynamoDbAppManager { )) } - /// Get an app from cache or DynamoDB + /// Get an app from DynamoDB async fn get_app_internal(&self, app_id: &str) -> Result> { - // Check cache first - if let Some(app) = self.app_cache.get(app_id).await { - return Ok(Some(app)); - } - - debug!("Cache miss for app {}, fetching from DynamoDB", app_id); - // Fetch from DynamoDB let response = self .client @@ -471,10 +446,6 @@ impl DynamoDbAppManager { if let Some(item) = response.item() { // Convert DynamoDB item to App let app = self.item_to_app(aws_sdk_dynamodb::types::AttributeValue::M(item.clone()))?; - - // Update cache - self.app_cache.insert(app_id.to_string(), app.clone()).await; - Ok(Some(app)) } else { Ok(None) @@ -502,9 +473,6 @@ impl AppManager for DynamoDbAppManager { .await .map_err(|e| Error::Internal(format!("Failed to insert app into DynamoDB: {e}")))?; - // Update cache - self.app_cache.insert(config.id.clone(), config).await; - Ok(()) } @@ -521,9 +489,6 @@ impl AppManager for DynamoDbAppManager { .await .map_err(|e| Error::Internal(format!("Failed to update app in DynamoDB: {e}")))?; - // Update cache - self.app_cache.insert(config.id.clone(), config).await; - Ok(()) } @@ -540,9 +505,6 @@ impl AppManager for DynamoDbAppManager { .await .map_err(|e| Error::Internal(format!("Failed to delete app from DynamoDB: {e}")))?; - // Remove from cache - self.app_cache.invalidate(app_id).await; - Ok(()) } @@ -590,10 +552,6 @@ impl AppManager for DynamoDbAppManager { { // Convert DynamoDB item to App let app = self.item_to_app(aws_sdk_dynamodb::types::AttributeValue::M(item.clone()))?; - - // Update cache using app ID as key - self.app_cache.insert(app.id.clone(), app.clone()).await; - return Ok(Some(app)); } @@ -612,14 +570,6 @@ impl AppManager for DynamoDbAppManager { } } -#[cfg(test)] -impl DynamoDbAppManager { - // Check if a specific app ID is in the cache. - pub async fn is_cached(&self, app_id: &str) -> bool { - self.app_cache.get(app_id).await.is_some() - } -} - #[cfg(test)] mod tests { use super::*; @@ -636,8 +586,6 @@ mod tests { access_key: Some("test".to_string()), secret_key: Some("test".to_string()), profile_name: None, - cache_ttl: 2, // Short TTL for testing - cache_max_capacity: 100, } } @@ -727,137 +675,6 @@ mod tests { manager.delete_app("dynamo_test2").await.unwrap(); } - #[tokio::test] - async fn test_cache_behavior() { - // Skip test if DynamoDB Local is not available - if !is_dynamodb_available().await { - eprintln!("Skipping test: DynamoDB Local not available"); - return; - } - - let config = get_test_config("sockudo_cache_test"); - let manager = DynamoDbAppManager::new(config).await.unwrap(); - manager.init().await.unwrap(); - - // Cache should be empty initially - assert!(!manager.is_cached("cache_test").await); - - // Create an app (should populate cache) - let app = create_test_app("cache_test"); - manager.create_app(app).await.unwrap(); - - // After create, app should be in cache - assert!(manager.is_cached("cache_test").await); - - // First retrieval - should hit cache - let retrieved1 = manager.find_by_id("cache_test").await.unwrap().unwrap(); - assert_eq!(retrieved1.id, "cache_test"); - - // Verify still cached after retrieval - assert!(manager.is_cached("cache_test").await); - - // Second retrieval - should still hit cache - let retrieved2 = manager.find_by_id("cache_test").await.unwrap().unwrap(); - assert_eq!(retrieved2.id, "cache_test"); - assert!(manager.is_cached("cache_test").await); - - // Wait for cache to expire (TTL is 2 seconds) - tokio::time::sleep(Duration::from_secs(3)).await; - - // After TTL expiration, cache entry should be gone - assert!(!manager.is_cached("cache_test").await); - - // Third retrieval - should hit DynamoDB and repopulate cache - let retrieved3 = manager.find_by_id("cache_test").await.unwrap().unwrap(); - assert_eq!(retrieved3.id, "cache_test"); - - // Cache should be populated again - assert!(manager.is_cached("cache_test").await); - - // Cleanup - manager.delete_app("cache_test").await.unwrap(); - - // After delete, cache should be invalidated - assert!(!manager.is_cached("cache_test").await); - } - - #[tokio::test] - async fn test_cache_invalidation_on_update() { - // Skip test if DynamoDB Local is not available - if !is_dynamodb_available().await { - eprintln!("Skipping test: DynamoDB Local not available"); - return; - } - - let config = get_test_config("sockudo_cache_update_test"); - let manager = DynamoDbAppManager::new(config).await.unwrap(); - manager.init().await.unwrap(); - - // Create an app - let mut app = create_test_app("cache_update_test"); - manager.create_app(app.clone()).await.unwrap(); - - // Retrieve to populate cache - let retrieved1 = manager - .find_by_id("cache_update_test") - .await - .unwrap() - .unwrap(); - assert_eq!(retrieved1.max_connections, 100); - - // Update the app - app.max_connections = 500; - manager.update_app(app).await.unwrap(); - - // Retrieve again - should get updated value from cache - let retrieved2 = manager - .find_by_id("cache_update_test") - .await - .unwrap() - .unwrap(); - assert_eq!(retrieved2.max_connections, 500); - - // Cleanup - manager.delete_app("cache_update_test").await.unwrap(); - } - - #[tokio::test] - async fn test_cache_invalidation_on_delete() { - // Skip test if DynamoDB Local is not available - if !is_dynamodb_available().await { - eprintln!("Skipping test: DynamoDB Local not available"); - return; - } - - let config = get_test_config("sockudo_cache_delete_test"); - let manager = DynamoDbAppManager::new(config).await.unwrap(); - manager.init().await.unwrap(); - - // Create an app - let app = create_test_app("cache_delete_test"); - manager.create_app(app).await.unwrap(); - - // Retrieve to populate cache - let retrieved = manager - .find_by_id("cache_delete_test") - .await - .unwrap() - .unwrap(); - assert_eq!(retrieved.id, "cache_delete_test"); - - // Delete the app - manager.delete_app("cache_delete_test").await.unwrap(); - - // Should not find the app (cache should be invalidated) - assert!( - manager - .find_by_id("cache_delete_test") - .await - .unwrap() - .is_none() - ); - } - #[tokio::test] async fn test_allowed_origins() { // Skip test if DynamoDB Local is not available diff --git a/src/app/factory.rs b/src/app/factory.rs index f6d9f84e..ab2ee87c 100644 --- a/src/app/factory.rs +++ b/src/app/factory.rs @@ -1,4 +1,5 @@ // src/app/factory.rs +use crate::app::cached_app_manager::CachedAppManager; #[cfg(feature = "dynamodb")] use crate::app::dynamodb_app_manager::{DynamoDbAppManager, DynamoDbConfig}; use crate::app::manager::AppManager; @@ -9,9 +10,11 @@ use crate::app::mysql_app_manager::MySQLAppManager; use crate::app::pg_app_manager::PgSQLAppManager; #[cfg(feature = "scylladb")] use crate::app::scylla_app_manager::{ScyllaDbAppManager, ScyllaDbConfig}; +use crate::cache::manager::CacheManager; use crate::error::Result; use crate::options::{AppManagerConfig, AppManagerDriver, DatabaseConfig, DatabasePooling}; use std::sync::Arc; +use tokio::sync::Mutex; use tracing::{info, warn}; pub struct AppManagerFactory; @@ -22,18 +25,19 @@ impl AppManagerFactory { config: &AppManagerConfig, db_config: &DatabaseConfig, pooling: &DatabasePooling, + cache_manager: Arc>, ) -> Result> { info!( "{}", format!("Initializing AppManager with driver: {:?}", config.driver) ); - match config.driver { + let inner: Arc = match config.driver { // Match on the enum #[cfg(feature = "mysql")] AppManagerDriver::Mysql => { let mysql_db_config = db_config.mysql.clone(); match MySQLAppManager::new(mysql_db_config, pooling.clone()).await { - Ok(manager) => Ok(Arc::new(manager)), + Ok(manager) => Arc::new(manager), Err(e) => { warn!( "{}", @@ -42,7 +46,7 @@ impl AppManagerFactory { e ) ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } } } @@ -58,11 +62,9 @@ impl AppManagerFactory { access_key: dynamo_settings.aws_access_key_id.clone(), secret_key: dynamo_settings.aws_secret_access_key.clone(), profile_name: dynamo_settings.aws_profile_name.clone(), - cache_ttl: dynamo_settings.cache_ttl, - cache_max_capacity: dynamo_settings.cache_max_capacity, }; match DynamoDbAppManager::new(dynamo_app_config).await { - Ok(manager) => Ok(Arc::new(manager)), + Ok(manager) => Arc::new(manager), Err(e) => { warn!( "{}", @@ -71,7 +73,7 @@ impl AppManagerFactory { e ) ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } } } @@ -79,7 +81,7 @@ impl AppManagerFactory { AppManagerDriver::PgSql => { let pgsql_db_config = db_config.postgres.clone(); match PgSQLAppManager::new(pgsql_db_config, pooling.clone()).await { - Ok(manager) => Ok(Arc::new(manager)), + Ok(manager) => Arc::new(manager), Err(e) => { warn!( "{}", @@ -88,7 +90,7 @@ impl AppManagerFactory { e ) ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } } } @@ -102,13 +104,11 @@ impl AppManagerFactory { table_name: scylla_settings.table_name.clone(), username: scylla_settings.username.clone(), password: scylla_settings.password.clone(), - cache_ttl: scylla_settings.cache_ttl, - cache_max_capacity: scylla_settings.cache_max_capacity, replication_class: scylla_settings.replication_class.clone(), replication_factor: scylla_settings.replication_factor, }; match ScyllaDbAppManager::new(scylla_config).await { - Ok(manager) => Ok(Arc::new(manager)), + Ok(manager) => Arc::new(manager), Err(e) => { warn!( "{}", @@ -117,14 +117,14 @@ impl AppManagerFactory { e ) ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } } } AppManagerDriver::Memory => { // Handle unknown as Memory or make it an error info!("{}", "Using memory app manager.".to_string()); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } #[cfg(not(feature = "mysql"))] AppManagerDriver::Mysql => { @@ -132,7 +132,7 @@ impl AppManagerFactory { "{}", "MySQL app manager requested but not compiled in. Falling back to memory manager." ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } #[cfg(not(feature = "dynamodb"))] AppManagerDriver::Dynamodb => { @@ -140,7 +140,7 @@ impl AppManagerFactory { "{}", "DynamoDB app manager requested but not compiled in. Falling back to memory manager." ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } #[cfg(not(feature = "postgres"))] AppManagerDriver::PgSql => { @@ -148,7 +148,7 @@ impl AppManagerFactory { "{}", "PostgreSQL app manager requested but not compiled in. Falling back to memory manager." ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } #[cfg(not(feature = "scylladb"))] AppManagerDriver::ScyllaDb => { @@ -156,8 +156,18 @@ impl AppManagerFactory { "{}", "ScyllaDB app manager requested but not compiled in. Falling back to memory manager." ); - Ok(Arc::new(MemoryAppManager::new())) + Arc::new(MemoryAppManager::new()) } + }; + + if config.cache.enabled { + Ok(Arc::new(CachedAppManager::new( + inner, + cache_manager, + config.cache.clone(), + ))) + } else { + Ok(inner) } } } diff --git a/src/app/memory_app_manager.rs b/src/app/memory_app_manager.rs index 8edba432..341150a6 100644 --- a/src/app/memory_app_manager.rs +++ b/src/app/memory_app_manager.rs @@ -1,6 +1,3 @@ -#![allow(unused_variables)] -#![allow(dead_code)] - // src/app/memory_manager.rs use super::config::App; use crate::app::manager::AppManager; @@ -8,14 +5,8 @@ use crate::error::Result; use async_trait::async_trait; use dashmap::DashMap; -struct CacheConfig { - enabled: bool, - ttl: usize, -} - pub struct MemoryAppManager { apps: DashMap, - cache: CacheConfig, } impl Default for MemoryAppManager { @@ -28,10 +19,6 @@ impl MemoryAppManager { pub fn new() -> Self { Self { apps: DashMap::new(), - cache: CacheConfig { - enabled: true, - ttl: 1000, - }, } } } diff --git a/src/app/mod.rs b/src/app/mod.rs index d4ef46f0..5287f193 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -1,4 +1,5 @@ pub mod auth; +pub mod cached_app_manager; pub mod config; #[cfg(feature = "dynamodb")] pub mod dynamodb_app_manager; diff --git a/src/app/mysql_app_manager.rs b/src/app/mysql_app_manager.rs index 72cf8391..4db29d7b 100644 --- a/src/app/mysql_app_manager.rs +++ b/src/app/mysql_app_manager.rs @@ -7,10 +7,7 @@ use crate::token::Token; use crate::webhook::types::Webhook; use crate::websocket::SocketId; use async_trait::async_trait; -use futures_util::{StreamExt, stream}; -use moka::future::Cache; use sqlx::{MySqlPool, mysql::MySqlPoolOptions}; -use std::sync::Arc; use std::time::Duration; use tracing::{debug, error, info, warn}; @@ -19,7 +16,6 @@ use tracing::{debug, error, info, warn}; pub struct MySQLAppManager { config: DatabaseConnection, pool: MySqlPool, - app_cache: Cache, // App ID -> App } impl MySQLAppManager { @@ -57,18 +53,7 @@ impl MySQLAppManager { .await .map_err(|e| Error::Internal(format!("Failed to connect to MySQL: {e}")))?; - // Initialize cache - let app_cache = Cache::builder() - .time_to_live(Duration::from_secs(config.cache_ttl)) - .max_capacity(config.cache_max_capacity) - // Add other options like time_to_idle if needed - .build(); - - let manager = Self { - config, - pool, - app_cache, - }; + let manager = Self { config, pool }; manager.ensure_table_exists().await?; @@ -197,13 +182,7 @@ impl MySQLAppManager { /// Get an app by ID from cache or database pub async fn find_by_id(&self, app_id: &str) -> Result> { - // Try to get from cache first - if let Some(app) = self.app_cache.get(app_id).await { - return Ok(Some(app)); - } - - // Not in cache or expired, fetch from database - debug!("Cache miss for app {}, fetching from database", app_id); + debug!("Fetching app {} from database", app_id); // Use a query_as that matches your App struct // Create the query with the correct table name @@ -241,28 +220,12 @@ impl MySQLAppManager { Error::Internal(format!("Failed to fetch app from MySQL: {e}")) })?; - if let Some(app_row) = app_result { - // Convert to App - let app = app_row.into_app(); - - // Update cache - self.app_cache.insert(app_id.to_string(), app.clone()).await; - - Ok(Some(app)) - } else { - Ok(None) - } + Ok(app_result.map(|row| row.into_app())) } /// Get an app by key from cache or database pub async fn find_by_key(&self, key: &str) -> Result> { - // Check cache first - if let Some(app) = self.app_cache.get(key).await { - return Ok(Some(app)); - } - - // Not found in cache, query database - debug!("Cache miss for app key {}, fetching from database", key); + debug!("Fetching app by key {} from database", key); let query = format!( r#"SELECT @@ -298,17 +261,7 @@ impl MySQLAppManager { Error::Internal(format!("Failed to fetch app from MySQL: {e}")) })?; - if let Some(app_row) = app_result { - let app = app_row.into_app(); - - // Update cache with this app - let app_id = app.id.clone(); - self.app_cache.insert(app_id, app.clone()).await; - - Ok(Some(app.clone())) - } else { - Ok(None) - } + Ok(app_result.map(|row| row.into_app())) } /// Register a new app in the database @@ -360,9 +313,6 @@ impl MySQLAppManager { Error::Internal(format!("Failed to insert app into MySQL: {e}")) })?; - // Update cach - self.app_cache.insert(app.id.clone(), app).await; - Ok(()) } @@ -419,9 +369,6 @@ impl MySQLAppManager { return Err(Error::InvalidAppKey); } - // Update cache - self.app_cache.insert(app.id.clone(), app).await; - Ok(()) } @@ -448,9 +395,6 @@ impl MySQLAppManager { return Err(Error::InvalidAppKey); } - // Remove from cache - self.app_cache.remove(app_id).await; - Ok(()) } @@ -490,38 +434,9 @@ impl MySQLAppManager { Error::Internal(format!("Failed to fetch apps from MySQL: {e}")) })?; - warn!( - "{}", - format!("Fetched {} app rows from database.", app_rows.len()) - ); + let apps: Vec = app_rows.into_iter().map(|row| row.into_app()).collect(); - // Process rows concurrently using streams: - // 1. Convert iterator to stream - // 2. Map each row to an async block that converts, caches, and returns the App - // 3. Buffer the async operations for concurrency - // 4. Collect the results (Apps) into a Vec - let apps = stream::iter(app_rows) - .map(|row| async { - let app = row.into_app(); // Convert row to App struct - let app_arc = Arc::new(app.clone()); // Create Arc for caching - - // Insert the Arc into the cache - // Note: insert takes key by value, so clone app_arc.id - self.app_cache.insert(app_arc.id.clone(), app.clone()).await; - - // Return the owned App for the final Vec - app - }) - // Execute up to N futures concurrently (e.g., based on pool size) - // Adjust buffer size as needed. Using connection_pool_size might be reasonable. - .buffer_unordered(self.config.connection_pool_size as usize) - .collect::>() // Collect the resulting Apps - .await; // Await the stream processing - - info!( - "{}", - format!("Finished processing and caching {} apps.", apps.len()) - ); + debug!("Fetched {} apps from database", apps.len()); Ok(apps) } @@ -726,7 +641,6 @@ impl Clone for MySQLAppManager { Self { config: self.config.clone(), pool: self.pool.clone(), - app_cache: self.app_cache.clone(), } } } @@ -734,7 +648,6 @@ impl Clone for MySQLAppManager { #[cfg(test)] mod tests { use super::*; - use std::time::Duration; // Helper to create a test app fn create_test_app(id: &str) -> App { @@ -768,7 +681,6 @@ mod tests { password: "sockudo123".to_string(), database: "sockudo".to_string(), table_name: "applications".to_string(), - cache_ttl: 5, port: 13306, ..Default::default() }; @@ -792,7 +704,6 @@ mod tests { password: "sockudo123".to_string(), database: "sockudo".to_string(), table_name: "applications".to_string(), - cache_ttl: 5, // Short TTL for testing port: 13306, ..Default::default() }; @@ -823,9 +734,6 @@ mod tests { let app = manager.find_by_id("test1").await.unwrap().unwrap(); assert_eq!(app.max_connections, 200); - // Test cache expiration - tokio::time::sleep(Duration::from_secs(6)).await; - // Add another app let test_app2 = create_test_app("test2"); manager.create_app(test_app2).await.unwrap(); diff --git a/src/app/pg_app_manager.rs b/src/app/pg_app_manager.rs index e41d354e..770f0515 100644 --- a/src/app/pg_app_manager.rs +++ b/src/app/pg_app_manager.rs @@ -6,18 +6,15 @@ use crate::token::Token; use crate::webhook::types::Webhook; use crate::websocket::SocketId; use async_trait::async_trait; -use futures_util::{StreamExt, stream}; -use moka::future::Cache; use sqlx::PgPool; use sqlx::postgres::PgPoolOptions; use std::time::Duration; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; /// PostgreSQL-based implementation of the AppManager pub struct PgSQLAppManager { config: DatabaseConnection, pool: PgPool, - app_cache: Cache, // App ID -> App } impl PgSQLAppManager { @@ -52,17 +49,7 @@ impl PgSQLAppManager { .await .map_err(|e| Error::Internal(format!("Failed to connect to PostgreSQL: {e}")))?; - // Initialize cache - let app_cache = Cache::builder() - .time_to_live(Duration::from_secs(config.cache_ttl)) - .max_capacity(config.cache_max_capacity) - .build(); - - let manager = Self { - config, - pool, - app_cache, - }; + let manager = Self { config, pool }; manager.ensure_table_exists().await?; @@ -132,15 +119,9 @@ impl PgSQLAppManager { Ok(()) } - /// Get an app by ID from cache or database + /// Get an app by ID from database pub async fn find_by_id(&self, app_id: &str) -> Result> { - // Try to get from cache first - if let Some(app) = self.app_cache.get(app_id).await { - return Ok(Some(app)); - } - - // Not in cache or expired, fetch from database - debug!("Cache miss for app {}, fetching from database", app_id); + debug!("Fetching app {} from database", app_id); // Create the query with the correct table name let query = format!( @@ -174,20 +155,10 @@ impl PgSQLAppManager { Error::Internal(format!("Failed to fetch app from PostgreSQL: {e}")) })?; - if let Some(app_row) = app_result { - // Convert to App - let app = app_row.into_app(); - - // Update cache - self.app_cache.insert(app_id.to_string(), app.clone()).await; - - Ok(Some(app)) - } else { - Ok(None) - } + Ok(app_result.map(|row| row.into_app())) } - /// Get an app by key from cache or database + /// Get an app by key from database pub async fn find_by_key(&self, key: &str) -> Result> { // For PostgreSQL, we need to query by key since cache is by ID debug!("Fetching app by key {} from database", key); @@ -223,16 +194,7 @@ impl PgSQLAppManager { Error::Internal(format!("Failed to fetch app from PostgreSQL: {e}")) })?; - if let Some(app_row) = app_result { - let app = app_row.into_app(); - - // Update cache with this app using ID as key - self.app_cache.insert(app.id.clone(), app.clone()).await; - - Ok(Some(app)) - } else { - Ok(None) - } + Ok(app_result.map(|row| row.into_app())) } /// Register a new app in the database @@ -281,9 +243,6 @@ impl PgSQLAppManager { Error::Internal(format!("Failed to insert app into PostgreSQL: {e}")) })?; - // Update cache - self.app_cache.insert(app.id.clone(), app).await; - Ok(()) } @@ -339,9 +298,6 @@ impl PgSQLAppManager { return Err(Error::InvalidAppKey); } - // Update cache - self.app_cache.insert(app.id.clone(), app).await; - Ok(()) } @@ -365,9 +321,6 @@ impl PgSQLAppManager { return Err(Error::InvalidAppKey); } - // Remove from cache - self.app_cache.remove(app_id).await; - Ok(()) } @@ -406,21 +359,9 @@ impl PgSQLAppManager { Error::Internal(format!("Failed to fetch apps from PostgreSQL: {e}")) })?; - warn!("Fetched {} app rows from database.", app_rows.len()); - - // Process rows concurrently using streams - let apps = stream::iter(app_rows) - .map(|row| async { - let app = row.into_app(); - // Insert into cache - self.app_cache.insert(app.id.clone(), app.clone()).await; - app - }) - .buffer_unordered(self.config.connection_pool_size as usize) - .collect::>() - .await; + let apps: Vec = app_rows.into_iter().map(|row| row.into_app()).collect(); - info!("Finished processing and caching {} apps.", apps.len()); + debug!("Fetched {} apps from database", apps.len()); Ok(apps) } @@ -617,7 +558,6 @@ impl Clone for PgSQLAppManager { Self { config: self.config.clone(), pool: self.pool.clone(), - app_cache: self.app_cache.clone(), } } } @@ -625,7 +565,6 @@ impl Clone for PgSQLAppManager { #[cfg(test)] mod tests { use super::*; - use std::time::Duration; // Helper to create test database config fn get_test_db_config(table_name: &str) -> DatabaseConnection { @@ -643,7 +582,6 @@ mod tests { database: std::env::var("DATABASE_POSTGRES_DATABASE") .unwrap_or_else(|_| "sockudo_test".to_string()), table_name: table_name.to_string(), - cache_ttl: 5, // Short TTL for testing ..Default::default() } } @@ -705,9 +643,6 @@ mod tests { let app = manager.find_by_id("test1").await.unwrap().unwrap(); assert_eq!(app.max_connections, 200); - // Test cache expiration - tokio::time::sleep(Duration::from_secs(6)).await; - // Add another app let test_app2 = create_test_app("test2"); manager.create_app(test_app2).await.unwrap(); @@ -922,38 +857,6 @@ mod tests { manager.delete_app("update_webhooks").await.unwrap(); } - #[tokio::test] - async fn test_cache_behavior() { - let mut config = get_test_db_config("apps_cache_test"); - config.cache_ttl = 2; // 2 seconds for quick testing - - let manager = PgSQLAppManager::new(config, DatabasePooling::default()) - .await - .unwrap(); - - // Create an app - let app = create_test_app("cache_test"); - manager.create_app(app).await.unwrap(); - - // First retrieval - should hit database - let retrieved1 = manager.find_by_id("cache_test").await.unwrap().unwrap(); - assert_eq!(retrieved1.id, "cache_test"); - - // Second retrieval - should hit cache - let retrieved2 = manager.find_by_id("cache_test").await.unwrap().unwrap(); - assert_eq!(retrieved2.id, "cache_test"); - - // Wait for cache to expire - tokio::time::sleep(Duration::from_secs(3)).await; - - // Third retrieval - should hit database again - let retrieved3 = manager.find_by_id("cache_test").await.unwrap().unwrap(); - assert_eq!(retrieved3.id, "cache_test"); - - // Cleanup - manager.delete_app("cache_test").await.unwrap(); - } - #[tokio::test] async fn test_find_by_key_with_webhooks() { let config = get_test_db_config("apps_key_webhooks_test"); diff --git a/src/app/scylla_app_manager.rs b/src/app/scylla_app_manager.rs index 77f5b828..4025b23f 100644 --- a/src/app/scylla_app_manager.rs +++ b/src/app/scylla_app_manager.rs @@ -4,13 +4,11 @@ use crate::error::{Error, Result}; use crate::webhook::types::Webhook; use async_trait::async_trait; use futures::TryStreamExt; -use moka::future::Cache; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; use scylla::statement::prepared::PreparedStatement; use scylla::{DeserializeRow, SerializeRow}; use std::sync::Arc; -use std::time::Duration; use tracing::{debug, error, info}; /// Configuration for ScyllaDB App Manager @@ -21,8 +19,6 @@ pub struct ScyllaDbConfig { pub table_name: String, pub username: Option, pub password: Option, - pub cache_ttl: u64, - pub cache_max_capacity: u64, pub replication_class: String, pub replication_factor: u32, } @@ -35,8 +31,6 @@ impl Default for ScyllaDbConfig { table_name: "applications".to_string(), username: None, password: None, - cache_ttl: 3600, - cache_max_capacity: 10000, replication_class: "SimpleStrategy".to_string(), replication_factor: 3, } @@ -47,7 +41,6 @@ impl Default for ScyllaDbConfig { pub struct ScyllaDbAppManager { config: ScyllaDbConfig, session: Arc, - app_cache: Cache, insert_stmt: Arc, update_stmt: Arc, delete_stmt: Arc, @@ -76,12 +69,6 @@ impl ScyllaDbAppManager { let session = Arc::new(session); - // Initialize cache - let app_cache = Cache::builder() - .time_to_live(Duration::from_secs(config.cache_ttl)) - .max_capacity(config.cache_max_capacity) - .build(); - // Ensure keyspace and table exist before preparing statements Self::ensure_keyspace_and_table(&session, &config).await?; @@ -139,7 +126,6 @@ impl ScyllaDbAppManager { Ok(Self { config, session, - app_cache, insert_stmt: Arc::new(insert_stmt), update_stmt: Arc::new(update_stmt), delete_stmt: Arc::new(delete_stmt), @@ -425,9 +411,6 @@ impl AppManager for ScyllaDbAppManager { Error::Internal(format!("Failed to insert app into ScyllaDB: {e}")) })?; - // Update cache - self.app_cache.insert(app.id.clone(), app).await; - Ok(()) } @@ -442,9 +425,6 @@ impl AppManager for ScyllaDbAppManager { Error::Internal(format!("Failed to update app in ScyllaDB: {e}")) })?; - // Invalidate cache - self.app_cache.invalidate(&app.id).await; - Ok(()) } @@ -457,9 +437,6 @@ impl AppManager for ScyllaDbAppManager { Error::Internal(format!("Failed to delete app from ScyllaDB: {e}")) })?; - // Invalidate cache - self.app_cache.invalidate(app_id).await; - Ok(()) } @@ -531,8 +508,6 @@ impl AppManager for ScyllaDbAppManager { .map_err(|e| Error::Internal(format!("Failed to fetch app row: {e}")))? { let app = app_row.into_app(); - // Update cache - self.app_cache.insert(app.id.clone(), app.clone()).await; Ok(Some(app)) } else { Ok(None) @@ -540,12 +515,7 @@ impl AppManager for ScyllaDbAppManager { } async fn find_by_id(&self, app_id: &str) -> Result> { - // Try cache first - if let Some(app) = self.app_cache.get(app_id).await { - return Ok(Some(app)); - } - - debug!("Cache miss for app {}, fetching from ScyllaDB", app_id); + debug!("Fetching app {} from ScyllaDB", app_id); let query = format!( r#"SELECT id, key, secret, max_connections, enable_client_messages, enabled, @@ -577,8 +547,6 @@ impl AppManager for ScyllaDbAppManager { .map_err(|e| Error::Internal(format!("Failed to fetch app row: {e}")))? { let app = app_row.into_app(); - // Update cache - self.app_cache.insert(app_id.to_string(), app.clone()).await; Ok(Some(app)) } else { Ok(None) @@ -624,8 +592,6 @@ mod tests { table_name: "applications_test".to_string(), username: None, password: None, - cache_ttl: 60, - cache_max_capacity: 100, replication_class, replication_factor, } @@ -879,26 +845,6 @@ mod tests { assert!(found.is_none()); } - #[tokio::test] - async fn test_cache_functionality() { - let config = create_test_config(); - let manager = ScyllaDbAppManager::new(config).await.unwrap(); - - let app = create_test_app("test_app_cache"); - manager.create_app(app.clone()).await.unwrap(); - - // First call - should fetch from DB - let found1 = manager.find_by_id("test_app_cache").await.unwrap(); - assert!(found1.is_some()); - - // Second call - should fetch from cache - let found2 = manager.find_by_id("test_app_cache").await.unwrap(); - assert!(found2.is_some()); - assert_eq!(found1.unwrap().id, found2.unwrap().id); - - manager.delete_app("test_app_cache").await.unwrap(); - } - #[tokio::test] async fn test_update_webhooks_to_none() { let config = create_test_config(); diff --git a/src/main.rs b/src/main.rs index 246b4002..50fd9fbc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -274,15 +274,34 @@ impl SockudoServer { debug_enabled ); + let cache_manager = CacheManagerFactory::create(&config.cache, &config.database.redis) + .await + .unwrap_or_else(|e| { + warn!( + "CacheManagerFactory creation failed: {}. Using a NoOp (Memory) Cache.", + e + ); + let fallback_cache_options = config.cache.memory.clone(); + Arc::new(Mutex::new(MemoryCacheManager::new( + "fallback_cache".to_string(), + fallback_cache_options, + ))) + }); + info!( + "CacheManager initialized with driver: {:?}", + config.cache.driver + ); + let app_manager = AppManagerFactory::create( &config.app_manager, &config.database, &config.database_pooling, + cache_manager.clone(), ) .await?; info!( - "AppManager initialized with driver: {:?}", - config.app_manager.driver + "AppManager initialized with driver: {:?} (cache: {})", + config.app_manager.driver, config.app_manager.cache.enabled ); let connection_manager = AdapterFactory::create(&config.adapter, &config.database).await?; @@ -315,24 +334,6 @@ impl SockudoServer { None }; - let cache_manager = CacheManagerFactory::create(&config.cache, &config.database.redis) - .await - .unwrap_or_else(|e| { - warn!( - "CacheManagerFactory creation failed: {}. Using a NoOp (Memory) Cache.", - e - ); - let fallback_cache_options = config.cache.memory.clone(); - Arc::new(Mutex::new(MemoryCacheManager::new( - "fallback_cache".to_string(), - fallback_cache_options, - ))) - }); - info!( - "CacheManager initialized with driver: {:?}", - config.cache.driver - ); - let auth_validator = Arc::new(AuthValidator::new(app_manager.clone())); let metrics = if config.metrics.enabled { diff --git a/src/options.rs b/src/options.rs index d8e69f99..432889ad 100644 --- a/src/options.rs +++ b/src/options.rs @@ -93,8 +93,6 @@ pub struct DynamoDbSettings { pub aws_access_key_id: Option, pub aws_secret_access_key: Option, pub aws_profile_name: Option, - pub cache_ttl: u64, - pub cache_max_capacity: u64, } impl Default for DynamoDbSettings { @@ -106,8 +104,6 @@ impl Default for DynamoDbSettings { aws_access_key_id: None, aws_secret_access_key: None, aws_profile_name: None, - cache_ttl: 3600, - cache_max_capacity: 10000, } } } @@ -120,8 +116,6 @@ pub struct ScyllaDbSettings { pub table_name: String, pub username: Option, pub password: Option, - pub cache_ttl: u64, - pub cache_max_capacity: u64, pub replication_class: String, pub replication_factor: u32, } @@ -134,8 +128,6 @@ impl Default for ScyllaDbSettings { table_name: "applications".to_string(), username: None, password: None, - cache_ttl: 3600, - cache_max_capacity: 10000, replication_class: "SimpleStrategy".to_string(), replication_factor: 3, } @@ -1589,25 +1581,19 @@ impl ServerOptions { self.database.mysql.connection_pool_size = pool_size; self.database.postgres.connection_pool_size = pool_size; } + self.app_manager.cache.enabled = + parse_bool_env("APP_MANAGER_CACHE_ENABLED", self.app_manager.cache.enabled); + self.app_manager.cache.ttl = + parse_env::("APP_MANAGER_CACHE_TTL", self.app_manager.cache.ttl); + if let Some(cache_ttl) = parse_env_optional::("CACHE_TTL_SECONDS") { self.app_manager.cache.ttl = cache_ttl; - self.channel_limits.cache_ttl = cache_ttl; - self.database.mysql.cache_ttl = cache_ttl; - self.database.postgres.cache_ttl = cache_ttl; - self.database.dynamodb.cache_ttl = cache_ttl; - self.database.scylladb.cache_ttl = cache_ttl; self.cache.memory.ttl = cache_ttl; } if let Some(cleanup_interval) = parse_env_optional::("CACHE_CLEANUP_INTERVAL") { - self.database.mysql.cache_cleanup_interval = cleanup_interval; - self.database.postgres.cache_cleanup_interval = cleanup_interval; self.cache.memory.cleanup_interval = cleanup_interval; } if let Some(max_capacity) = parse_env_optional::("CACHE_MAX_CAPACITY") { - self.database.mysql.cache_max_capacity = max_capacity; - self.database.postgres.cache_max_capacity = max_capacity; - self.database.dynamodb.cache_max_capacity = max_capacity; - self.database.scylladb.cache_max_capacity = max_capacity; self.cache.memory.max_capacity = max_capacity; }