From 05a459654590de488d271626a90401903db9593a Mon Sep 17 00:00:00 2001 From: Akshola00 Date: Fri, 7 Nov 2025 01:58:26 +0100 Subject: [PATCH 1/4] redis_impl --- server/Cargo.lock | 103 ++++++++++++++++++ server/Cargo.toml | 3 + server/docker-compose.yml | 7 ++ server/src/lib.rs | 61 ++++++++++- server/src/libs/redis.rs | 78 +++++++++++++ server/src/main.rs | 16 +-- .../crowd_funding/crowd_funding_routes.rs | 7 +- server/src/routes/groups/group.rs | 10 +- server/src/routes/groups/pay_group.rs | 2 +- 9 files changed, 263 insertions(+), 24 deletions(-) create mode 100644 server/src/libs/redis.rs diff --git a/server/Cargo.lock b/server/Cargo.lock index c25ae8e..b473032 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -242,6 +242,27 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +[[package]] +name = "bb8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d8b8e1a22743d9241575c6ba822cf9c8fef34771c86ab7e477a4fbfd254e5" +dependencies = [ + "futures-util", + "parking_lot", + "tokio", +] + +[[package]] +name = "bb8-redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5143936af5e1eea1a881e3e3d21b6777da6315e5e307bc3d0c2301c44fa37da9" +dependencies = [ + "bb8", + "redis", +] + [[package]] name = "bigdecimal" version = "0.4.8" @@ -363,6 +384,20 @@ dependencies = [ "inout", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2084,6 +2119,52 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "redis" +version = "0.32.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "014cc767fefab6a3e798ca45112bccad9c6e0e218fbd49720042716c73cfef44" +dependencies = [ + "bytes", + "cfg-if", + "combine", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.6.0", + "tokio", + "tokio-util", + "url", +] + +[[package]] +name = "redis-macros" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ac3609707f8c68aac6f3871c9a9596a7a9e30e3cc42ebba741d13b39e27c6d8" +dependencies = [ + "redis", + "redis-macros-derive", + "serde", + "serde_json", +] + +[[package]] +name = "redis-macros-derive" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "923db076e23f014fd5c5504fba397d3dd06b29724a3dd8e21aa7a3423c00b129" +dependencies = [ + "proc-macro2", + "quote", + "redis", + "syn", +] + [[package]] name = "redox_syscall" version = "0.5.17" @@ -2536,9 +2617,12 @@ dependencies = [ "argon2", "axum", "axum-extra", + "bb8-redis", "bigdecimal", "dotenvy", "jsonwebtoken", + "redis", + "redis-macros", "serde", "serde_json", "sha2", @@ -2569,6 +2653,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -3314,6 +3404,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.6.11" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5081027..966ddae 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -42,3 +42,6 @@ utoipa-axum = "0.2.0" utoipa-scalar = { version = "0.3.0", features = ["axum"] } sha2 = "0.10.9" uuid = { version = "1.18.1", features = ["v4"] } +bb8-redis = "0.24" +redis-macros = "0.5.6" +redis = "0.32.7" \ No newline at end of file diff --git a/server/docker-compose.yml b/server/docker-compose.yml index 97171b0..7bc637c 100644 --- a/server/docker-compose.yml +++ b/server/docker-compose.yml @@ -16,10 +16,12 @@ services: JWT_SECRET: "${JWT_SECRET}" JWT_EXPIRED_IN: "${JWT_EXPIRED_IN}" JWT_MAXAGE: "${JWT_MAXAGE}" + REDIS_URL: "${REDIS_URL}" ports: - "8080:8080" depends_on: - db + - redis db: image: postgres:16 @@ -31,5 +33,10 @@ services: volumes: - pgdata:/var/lib/postgresql/data + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: pgdata: diff --git a/server/src/lib.rs b/server/src/lib.rs index ef8fe58..f424681 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,11 +1,11 @@ pub mod libs { pub mod auth; - pub mod cache; pub mod config; pub mod db; pub mod error; pub mod logging; pub mod middleware; + pub mod redis; pub mod router; pub mod utopia; } @@ -28,13 +28,68 @@ pub mod util { pub mod validate_address; } -use crate::libs::{cache::Cache, config::Env}; +use crate::libs::{config::Env, error::ApiError, redis::RedisPool}; +use redis::AsyncCommands; use sqlx::PgPool; #[derive(Clone)] pub struct AppState { pub db: PgPool, - pub cache: Cache, + pub redis: RedisPool, pub env: Env, } + +impl AppState { + async fn add_group_address(&self, addr: &str) -> Result<(), ApiError> { + let mut conn = self.redis.get().await.expect("Error connecting to redis"); + conn.sadd("group_funding_address", addr).await.map_err(|e| { + tracing::error!("Error adding group address to redis: {:?}", e); + ApiError::Internal("Error adding group address to redis") + }) + } + async fn has_group_address(&self, addr: &str) -> Result { + let mut conn = self.redis.get().await.expect("Error connecting to redis"); + conn.sismember("group_funding_address", addr) + .await + .map_err(|e| { + tracing::error!("Error checking group address in redis: {:?}", e); + ApiError::Internal("Error checking group address in redis") + }) + } + + async fn get_all_group_addresses(&self) -> Result, ApiError> { + let mut conn = self.redis.get().await.expect("Error connecting to redis"); + let addresses: Vec = conn.smembers("group_funding_address").await.map_err(|e| { + tracing::error!("Error getting group addresses from redis: {:?}", e); + ApiError::Internal("Error getting group addresses from redis") + })?; + Ok(addresses) + } + + async fn add_crowd_funding_address(&self, addr: &str) -> Result<(), ApiError> { + let mut conn = self.redis.get().await.expect("Error connecting to redis"); + conn.sadd("crowd_funding_address", addr).await.map_err(|e| { + tracing::error!("Error adding crowd funding address to redis: {:?}", e); + ApiError::Internal("Error adding crowd funding address to redis") + }) + } + async fn _has_crowd_funding_address(&self, addr: &str) -> Result { + let mut conn = self.redis.get().await.expect("Error connecting to redis"); + conn.sismember("crowd_funding_address", addr) + .await + .map_err(|e| { + tracing::error!("Error checking crowd funding address in redis: {:?}", e); + ApiError::Internal("Error checking crowd funding address in redis") + }) + } + + async fn get_all_crowd_funding_addresses(&self) -> Result, ApiError> { + let mut conn = self.redis.get().await.expect("Error connecting to redis"); + let addresses: Vec = conn.smembers("crowd_funding_address").await.map_err(|e| { + tracing::error!("Error getting crowd funding addresses from redis: {:?}", e); + ApiError::Internal("Error getting crowd funding addresses from redis") + })?; + Ok(addresses) + } +} diff --git a/server/src/libs/redis.rs b/server/src/libs/redis.rs new file mode 100644 index 0000000..f4cb1a1 --- /dev/null +++ b/server/src/libs/redis.rs @@ -0,0 +1,78 @@ +use std::env; + +use bb8_redis::{RedisConnectionManager, bb8}; +use redis::AsyncCommands; +use sqlx::PgPool; + +pub type RedisPool = bb8::Pool; + +pub async fn init_redis(pool: &PgPool) -> RedisPool { + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string()); + let manager = RedisConnectionManager::new(redis_url).expect("init redis failed"); + + let redis_pool = bb8::Pool::builder() + .max_size(100) + .build(manager) + .await + .expect("Failed to build Redis pool"); + + { + let crowd_funding_addresses: Vec = + sqlx::query_scalar("SELECT pool_address FROM crowd_funding") + .fetch_all(pool) + .await + .expect("Failed to get crowd funding addresses"); + + let group_funding_addresses: Vec = + sqlx::query_scalar("SELECT group_address FROM groups") + .fetch_all(pool) + .await + .expect("Failed to get group addresses"); + + let mut conn = redis_pool + .get() + .await + .expect("Failed to get Redis connection"); + + let _: () = conn + .sadd("crowd_funding_addresses", &crowd_funding_addresses) + .await + .expect("Failed to store crowd funding addresses"); + + let _: () = conn + .sadd("group_funding_addresses", &group_funding_addresses) + .await + .expect("Failed to store group addresses"); + } + + redis_pool +} + +pub async fn refresh_cache(pool: &PgPool, redis_pool: &RedisPool) { + let crowd_funding_addresses: Vec = + sqlx::query_scalar("SELECT pool_address FROM crowd_funding") + .fetch_all(pool) + .await + .expect("Failed to get crowd funding addresses"); + + let group_funding_addresses: Vec = + sqlx::query_scalar("SELECT group_address FROM groups") + .fetch_all(pool) + .await + .expect("Failed to get group addresses"); + + let mut conn = redis_pool + .get() + .await + .expect("Failed to get Redis connection"); + + let _: () = conn + .sadd("crowd_funding_addresses", &crowd_funding_addresses) + .await + .expect("Failed to store crowd funding addresses"); + + let _: () = conn + .sadd("group_funding_addresses", &group_funding_addresses) + .await + .expect("Failed to store group addresses"); +} diff --git a/server/src/main.rs b/server/src/main.rs index b0b9dde..5acf238 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,6 @@ use server::{ AppState, - libs::{cache::init_cache, config::Env, db::Db, logging::init_tracing, router::router}, + libs::{config::Env, db::Db, logging::init_tracing, redis, router::router}, }; use tokio::net::TcpListener; @@ -16,27 +16,27 @@ async fn main() { tracing::debug!("Initializing db"); let db = Db::new().await.expect("Failed to initialize DB"); - tracing::debug!("Initializing cache"); - let cache = init_cache(&db.pool.clone()).await; + tracing::debug!("Initializing redis"); + let redis_pool = redis::init_redis(&db.pool.clone()).await; let config = AppState { db: db.pool.clone(), - cache, + redis: redis_pool, env, }; { - let cache = config.cache.clone(); + let redis_pool = config.redis.clone(); let db = config.db.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); loop { - let new_cache = init_cache(&db).await; - *cache.write().await = new_cache.read().await.clone(); + tracing::info!("Refreshing redis cache"); + redis::refresh_cache(&db, &redis_pool).await; interval.tick().await; } }); - tracing::info!("Cache Refreshed"); + tracing::info!("Started cache refresher task"); } tracing::debug!("Running Migrations"); diff --git a/server/src/routes/crowd_funding/crowd_funding_routes.rs b/server/src/routes/crowd_funding/crowd_funding_routes.rs index 6d9c5dc..000ef3c 100644 --- a/server/src/routes/crowd_funding/crowd_funding_routes.rs +++ b/server/src/routes/crowd_funding/crowd_funding_routes.rs @@ -141,6 +141,8 @@ pub async fn create_crowd_funding( ApiError::Internal("Failed to commit transaction") })?; + state.add_crowd_funding_address(&pool_address).await?; + tracing::info!("Crowd funding created: {}", pool_address); Ok(StatusCode::CREATED) @@ -473,10 +475,7 @@ pub async fn resolve_crowd_funding( pub async fn get_all_crowd_funding_addresses( State(state): State, ) -> Result { - let crowd_funding_addresses = sqlx::query_scalar!(r#"SELECT pool_address FROM crowd_funding"#,) - .fetch_all(&state.db) - .await - .map_err(|e| map_sqlx_error(&e))?; + let crowd_funding_addresses = state.get_all_crowd_funding_addresses().await?; Ok(Json(crowd_funding_addresses)) } diff --git a/server/src/routes/groups/group.rs b/server/src/routes/groups/group.rs index 5f56e2a..b2b8898 100644 --- a/server/src/routes/groups/group.rs +++ b/server/src/routes/groups/group.rs @@ -65,11 +65,7 @@ pub async fn create_group( map_sqlx_error(&e) })?; - { - let mut cache = state.cache.write().await; - cache.insert(group_address.to_string()); - } - + state.add_group_address(group_address).await?; for group_member in payload.members { let member_percentage: BigDecimal = group_member.percentage.into(); sqlx::query!( @@ -314,8 +310,6 @@ pub async fn get_groups(State(state): State) -> Result, ) -> Result { - let cache = RwLockReadGuard::map(state.cache.read().await, |f| f).clone(); - let vec_cache: Vec = cache.into_iter().collect(); - + let vec_cache = state.get_all_group_addresses().await?; Ok((StatusCode::OK, Json(vec_cache))) } diff --git a/server/src/routes/groups/pay_group.rs b/server/src/routes/groups/pay_group.rs index 69a38c9..2349ce2 100644 --- a/server/src/routes/groups/pay_group.rs +++ b/server/src/routes/groups/pay_group.rs @@ -49,7 +49,7 @@ pub async fn pay_group( ApiError::BadRequest("Invalid token amount") })?; - if !state.cache.read().await.contains(&group_address) { + if !state.has_group_address(&group_address).await? { return Err(ApiError::NotFound("Group doesnt exist")); } From bd1664a26b1f3a18d9591b83415bf40c3247d817 Mon Sep 17 00:00:00 2001 From: Akshola00 Date: Mon, 10 Nov 2025 11:33:03 +0100 Subject: [PATCH 2/4] update redis container to restart always --- server/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/server/docker-compose.yml b/server/docker-compose.yml index 7bc637c..01d766f 100644 --- a/server/docker-compose.yml +++ b/server/docker-compose.yml @@ -35,6 +35,7 @@ services: redis: image: redis:7-alpine + restart: always ports: - "6379:6379" From 9f355e27ede0298142fd6e3e72ea9da660cd3ef6 Mon Sep 17 00:00:00 2001 From: Akshola00 Date: Mon, 10 Nov 2025 11:43:19 +0100 Subject: [PATCH 3/4] fix: redis typo bug --- server/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index f424681..7d89076 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -60,7 +60,7 @@ impl AppState { async fn get_all_group_addresses(&self) -> Result, ApiError> { let mut conn = self.redis.get().await.expect("Error connecting to redis"); - let addresses: Vec = conn.smembers("group_funding_address").await.map_err(|e| { + let addresses: Vec = conn.smembers("group_funding_addresses").await.map_err(|e| { tracing::error!("Error getting group addresses from redis: {:?}", e); ApiError::Internal("Error getting group addresses from redis") })?; @@ -86,7 +86,7 @@ impl AppState { async fn get_all_crowd_funding_addresses(&self) -> Result, ApiError> { let mut conn = self.redis.get().await.expect("Error connecting to redis"); - let addresses: Vec = conn.smembers("crowd_funding_address").await.map_err(|e| { + let addresses: Vec = conn.smembers("crowd_funding_addresses").await.map_err(|e| { tracing::error!("Error getting crowd funding addresses from redis: {:?}", e); ApiError::Internal("Error getting crowd funding addresses from redis") })?; From b8d9ddf43508befa52892866f85d54e60fda19f4 Mon Sep 17 00:00:00 2001 From: Akshola00 Date: Mon, 10 Nov 2025 11:43:27 +0100 Subject: [PATCH 4/4] fmt --- server/src/lib.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index 7d89076..cff1fc6 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -60,10 +60,13 @@ impl AppState { async fn get_all_group_addresses(&self) -> Result, ApiError> { let mut conn = self.redis.get().await.expect("Error connecting to redis"); - let addresses: Vec = conn.smembers("group_funding_addresses").await.map_err(|e| { - tracing::error!("Error getting group addresses from redis: {:?}", e); - ApiError::Internal("Error getting group addresses from redis") - })?; + let addresses: Vec = + conn.smembers("group_funding_addresses") + .await + .map_err(|e| { + tracing::error!("Error getting group addresses from redis: {:?}", e); + ApiError::Internal("Error getting group addresses from redis") + })?; Ok(addresses) } @@ -86,10 +89,13 @@ impl AppState { async fn get_all_crowd_funding_addresses(&self) -> Result, ApiError> { let mut conn = self.redis.get().await.expect("Error connecting to redis"); - let addresses: Vec = conn.smembers("crowd_funding_addresses").await.map_err(|e| { - tracing::error!("Error getting crowd funding addresses from redis: {:?}", e); - ApiError::Internal("Error getting crowd funding addresses from redis") - })?; + let addresses: Vec = + conn.smembers("crowd_funding_addresses") + .await + .map_err(|e| { + tracing::error!("Error getting crowd funding addresses from redis: {:?}", e); + ApiError::Internal("Error getting crowd funding addresses from redis") + })?; Ok(addresses) } }