Skip to content

fix(websocket): update Redis lock handler for better concurrency control #618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 11, 2024

Conversation

kasugamirai
Copy link
Member

@kasugamirai kasugamirai commented Nov 7, 2024

Overview

What I've done

What I haven't done

How I tested

Screenshot

Which point I want you to review particularly

Memo

Summary by CodeRabbit

  • New Features

    • Introduced a Redis connection pool for improved connection management.
    • Added new error handling variants to enhance error reporting.
    • Streamlined methods for managing project editing sessions and Y.js document updates.
    • Implemented a new macro for generating unique identifiers.
  • Bug Fixes

    • Improved error propagation and handling in various components.
  • Refactor

    • Restructured imports to centralize domain logic under the flow_websocket_infra namespace.
    • Simplified method signatures and error handling in the ProjectEditingSession and related services.
    • Updated data structures to accommodate binary data handling.
    • Removed unnecessary functions and modules to enhance code clarity.
  • Chores

    • Removed deprecated dependencies and modules to clean up the codebase.

Copy link
Contributor

coderabbitai bot commented Nov 7, 2024

Walkthrough

The pull request introduces significant changes to the websocket project, primarily focusing on dependency management, error handling, and the restructuring of various components. New dependencies bb8 and bb8-redis are added, while flow-websocket-domain is removed. The WsError enum is updated to handle additional error scenarios. The AppState structure is modified to utilize a Redis connection pool instead of a direct client. Various modules and files are reorganized or removed, streamlining the codebase and enhancing error handling throughout the application.

Changes

File Change Summary
websocket/app/Cargo.toml Added dependencies: bb8, bb8-redis; removed dependency: flow-websocket-domain.
websocket/app/src/errors.rs Added enum variants: Pool, LocalStorage to WsError.
websocket/app/src/state.rs Updated AppState to use bb8::Pool instead of FlowRedisClient; modified initialization logic.
websocket/crates/infra/src/persistence/project_repository.rs Updated ProjectRedisRepository to use bb8::Pool; removed RedisClientTrait.
websocket/crates/infra/src/persistence/redis/errors.rs Removed GlobalLockError; added FlowProjectLock variant.
websocket/crates/infra/src/persistence/redis/flow_project_lock.rs Simplified locking methods; added macro for generating lock methods.
websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs Introduced new methods for processing Redis stream entries; modified existing methods.
websocket/crates/infra/src/persistence/redis/mod.rs Removed modules: redis_client, utils, default_key_manager.
websocket/crates/infra/src/persistence/redis/redis_client.rs Deleted file containing RedisClient struct and related methods.
websocket/crates/infra/src/persistence/redis/updates.rs Updated type alias; removed decode_state_data method.
websocket/crates/services/examples/edit_session_service.rs Updated to use Redis connection pool; refined Y.js document management.
websocket/crates/infra/src/persistence/redis/types.rs Updated FlowUpdate struct to include Vec<u8>; renamed FlowEncodedUpdate.
websocket/crates/infra/src/persistence/redis/utils.rs Deleted utility functions for encoding/decoding state data.
websocket/crates/services/Cargo.toml Added dependency: yrs; removed dependency: flow-websocket-domain.
websocket/crates/services/src/manage_project_edit_session.rs Removed MAX_SNAPSHOT_DELTA; refined command handling logic.
websocket/Cargo.toml Removed dependency: flow-websocket-domain.
websocket/app/src/handler.rs Updated import path for User type.
websocket/crates/domain/Cargo.toml Deleted file defining the flow-websocket-domain package.
websocket/crates/domain/src/lib.rs Deleted file containing module declarations for editing_session, repository, and types.
websocket/crates/domain/src/utils.rs Deleted file containing generate_id macro and calculate_diff function.
websocket/crates/infra/Cargo.toml Removed dependency: flow-websocket-domain.
websocket/crates/infra/src/lib.rs Added new modules: types, utils.
websocket/crates/infra/src/persistence/editing_session.rs Streamlined error handling in ProjectEditingSession methods.
websocket/crates/infra/src/persistence/mod.rs Added new modules: editing_session, repository.
websocket/crates/infra/src/persistence/repository.rs Updated import paths for ProjectEditingSession and Project.
websocket/crates/infra/src/utils.rs Added new macro: generate_id.
websocket/crates/services/src/error.rs Updated EditingSession variant to reference new error type location.
websocket/crates/services/src/project.rs Restructured import statements to reference flow_websocket_infra.
websocket/crates/services/examples/project_service.rs Overhauled main function to streamline Redis connection management and service operations.

Possibly related PRs

Suggested labels

worker, api

Suggested reviewers

  • pyshx

Poem

In the burrow deep and wide,
New dependencies now abide.
With Redis pools, we hop and play,
Streamlined paths, come what may.
Error handling, crisp and neat,
In our code, we feel the beat! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

netlify bot commented Nov 7, 2024

Deploy Preview for reearth-flow canceled.

Name Link
🔨 Latest commit 7d9de01
🔍 Latest deploy log https://app.netlify.com/sites/reearth-flow/deploys/6731b093bee74f0008188dda

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (23)
websocket/app/Cargo.toml (1)

29-30: LGTM! Good choice of connection pooling implementation.

The addition of bb8 and bb8-redis for connection pooling is a solid architectural improvement that will help manage Redis connections more efficiently and handle concurrent requests better.

This change will provide several benefits:

  • Better resource management through connection reuse
  • Improved handling of concurrent websocket connections
  • Prevention of connection leaks
websocket/app/src/errors.rs (1)

Line range hint 1-28: Consider adding documentation for error variants.

While the implementation is correct, adding documentation comments for the error variants would improve maintainability, especially for the new Pool and LocalStorage variants. This would help other developers understand when these errors occur and how to handle them.

Example documentation:

 #[derive(Debug, Error)]
 pub enum WsError {
+    /// Error occurred when a room could not be found with the given identifier
     #[error("Room not found: {0}")]
     RoomNotFound(String),
     // ... other variants ...
+    /// Error occurred during Redis connection pool operations
     #[error(transparent)]
     Pool(#[from] FlowProjectRedisDataManagerError),
+    /// Error occurred during local storage operations
     #[error(transparent)]
     LocalStorage(#[from] std::io::Error),
 }
websocket/crates/infra/src/persistence/redis/errors.rs (1)

31-33: Consider using Display formatting for lock errors.

The current error message uses debug formatting ({:?}). While this provides detailed information, it might not be the most user-friendly format. Consider using Display formatting if rslock::LockError implements std::fmt::Display.

-    #[error("Global Lock Error: {0:?}")]
+    #[error("Global Lock Error: {0}")]
     FlowProjectLock(rslock::LockError),
websocket/crates/infra/src/persistence/redis/flow_project_lock.rs (3)

3-18: Good use of macro for DRY implementation, but consider some improvements.

The macro effectively reduces code duplication and ensures consistency across lock methods. However, there are a few potential improvements:

  1. The duration_ms parameter is converted from u64 to usize which could potentially overflow on 32-bit systems.
  2. The lock key separator (":") is hardcoded in the macro.

Consider these improvements:

 macro_rules! define_lock_method {
-    ($name:ident, $($lock_key:expr),+) => {
+    ($name:ident, $separator:expr, $($lock_key:expr),+) => {
         pub async fn $name<F, T>(
             &self,
             project_id: &str,
-            duration_ms: u64,
+            duration_ms: usize,
             callback: F,
         ) -> Result<T, LockError>
         where
             F: FnOnce(&LockGuard) -> T + Send,
             T: Send,
         {
-            let lock_keys = vec![$(format!("{}:locks:{}", project_id, $lock_key)),+];
+            let lock_keys = vec![$(format!("{}{}locks{}{}", project_id, $separator, $separator, $lock_key)),+];
             self.with_lock(lock_keys, duration_ms, callback).await
         }
     };
 }

Line range hint 36-46: Consider improving error handling for unlock operation.

While the lock acquisition error handling is good, the unlock operation at line 50 silently ignores potential errors. In a distributed system, failed unlock operations could lead to deadlocks.

Consider handling unlock errors:

-        self.lock_manager.unlock(&guard.lock).await;
+        if let Err(e) = self.lock_manager.unlock(&guard.lock).await {
+            tracing::error!("Failed to unlock resource: {}", e);
+            // Optionally: return Err(e) if you want to make unlock failures visible
+        }

54-57: Document locking strategy and consider lock ordering.

The lock methods show a hierarchical pattern, but several concerns need to be addressed:

  1. lock_session and lock_snapshots use identical locks, which might indicate redundancy
  2. No documentation explains the locking strategy or resource hierarchy
  3. No explicit lock ordering strategy to prevent deadlocks

Consider:

  1. Adding comprehensive documentation explaining the locking strategy
  2. Implementing a consistent lock ordering strategy
  3. Differentiating between lock_session and lock_snapshots if they serve different purposes

Example documentation:

/// Locks follow a hierarchical pattern to prevent deadlocks:
/// 1. state: Base lock for all project operations
/// 2. updates: Additional lock for modification operations
/// 3. snapshots: Highest level lock for snapshot operations
/// 
/// Lock ordering is enforced by the macro to prevent deadlocks.
websocket/crates/services/examples/edit_session_service.rs (2)

27-30: Consider configuring pool parameters for production use

While the pool initialization is correct, consider making the pool configuration parameters configurable for production use. Parameters like max_size, min_idle, and connection timeout can significantly impact performance under load.

Example improvement:

-    let redis_pool = Pool::builder().build(manager).await?;
+    let redis_pool = Pool::builder()
+        .max_size(15)        // Adjust based on your needs
+        .min_idle(Some(5))   // Keep minimum connections ready
+        .connection_timeout(Duration::from_secs(10))
+        .build(manager)
+        .await?;

Line range hint 91-103: Clean up commented code sections

The example file contains commented-out code sections that demonstrate task removal and session completion. Consider either:

  1. Removing these sections if they're no longer relevant, or
  2. Un-commenting and including them in the example flow with proper documentation
websocket/crates/infra/src/persistence/redis/updates.rs (1)

Line range hint 83-92: Consider documenting concurrent behavior.

This method is crucial for handling Redis streams in a concurrent environment. Consider adding documentation that explains:

  • The concurrent access patterns
  • Any potential race conditions
  • The atomicity guarantees of the XREAD operation

Example documentation:

/// Retrieves stream items for a project using Redis XREAD.
/// 
/// # Concurrency
/// This operation is atomic at Redis level. Multiple concurrent readers
/// will receive consistent stream data. The connection pool handles
/// concurrent connection requests safely.
/// 
/// # Returns
/// Returns stream items containing updates, ordered by their stream IDs.
websocket/app/src/state.rs (2)

38-40: Consider logging when default Redis URL is used

If the REDIS_URL environment variable is not set and no redis_url is provided, the code defaults to "redis://localhost:6379/0". It might be helpful to log a warning or info message indicating that the default Redis URL is being used, to aid in debugging and configuration.


41-42: Configure the Redis connection pool size for optimal performance

Currently, the Redis connection pool is initialized with default settings. To ensure optimal performance and resource utilization, consider configuring the pool with appropriate parameters like max_size, which sets the maximum number of connections in the pool.

You can apply the following changes to set the maximum pool size:

 let manager = RedisConnectionManager::new(&*redis_url)?;
+let pool_builder = Pool::builder().max_size(15); // Example: Set max connections to 15
-let redis_pool = Pool::builder().build(manager).await?;
+let redis_pool = pool_builder.build(manager).await?;

Adjust the max_size value based on your application's requirements and expected load.

websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (6)

11-11: Confirm the necessity of importing serde::de.

The import statement use serde::de; is added but it doesn't appear to be used within the file. Unused imports can clutter the codebase and potentially lead to confusion.

Please verify if serde::de is required. If not, consider removing it to keep the code clean.


41-54: Handle unexpected data types in parse_entry_fields.

The parse_entry_fields function processes Redis values assuming they are bulk strings. If the Redis stream contains values of different types, the function will silently skip them.

Consider adding explicit handling or logging for non-bulk string types to improve robustness and debuggability.

 fn parse_entry_fields(fields: &[redis::Value]) -> Option<Vec<(String, String)>> {
     fields
         .chunks(2)
         .filter(|chunk| chunk.len() == 2)
-        .filter_map(|chunk| match (&chunk[0], &chunk[1]) {
+        .map(|chunk| match (&chunk[0], &chunk[1]) {
             (redis::Value::BulkString(key), redis::Value::BulkString(val)) => Some((
                 String::from_utf8_lossy(key).into_owned(),
                 String::from_utf8_lossy(val).into_owned(),
             )),
+            (key, val) => {
+                // Log or handle unexpected types
+                debug!("Unexpected Redis value types: key={:?}, val={:?}", key, val);
+                None
+            }
         })
         .collect::<Vec<_>>()
         .into()
 }

56-73: Simplify control flow in parse_stream_entry.

The function parse_stream_entry contains multiple nested matches and early returns, which can make it harder to read and maintain.

Refactor the function to streamline the control flow and improve readability.

 fn parse_stream_entry(
     entry_fields: &[redis::Value],
 ) -> Option<(String, Vec<(String, String)>)> {
     if entry_fields.len() < 2 {
         return None;
     }

-    let entry_id = match &entry_fields[0] {
+    let entry_id = if let redis::Value::BulkString(bytes) = &entry_fields[0] {
         String::from_utf8_lossy(bytes).into_owned()
-    };
+    } else {
+        return None;
+    };

-    match &entry_fields[1] {
-        redis::Value::Array(fields) => Some((entry_id, Self::parse_entry_fields(fields)?)),
-        _ => None,
-    }
+    let fields = if let redis::Value::Array(fields) = &entry_fields[1] {
+        Self::parse_entry_fields(fields)?
+    } else {
+        return None;
+    };

+    Some((entry_id, fields))
 }

74-106: Add error logging for unexpected Redis response formats in xread_map.

The xread_map function assumes a specific structure of the Redis response. If the response format deviates, the current implementation may fail silently.

Consider adding logging to capture unexpected response formats, which can aid in debugging.

 // Inside the loop where you process 'stream_data'
 if let redis::Value::Array(entries) = &stream_data[1] {
     mapped_results.extend(entries.iter().filter_map(|entry| {
         if let redis::Value::Array(entry_fields) = entry {
             Self::parse_stream_entry(entry_fields)
         } else {
+            debug!("Unexpected entry format: {:?}", entry);
             None
         }
     }));
 } else {
+    debug!("Unexpected stream data format: {:?}", stream_data[1]);
 }

113-117: Ensure consistent debug logging in get_state_update_in_redis.

The method provides debug logs when fetching state updates. Consistent logging across methods enhances traceability and eases debugging.

Ensure that similar debug statements are present in other methods interacting with Redis streams or critical operations.


128-131: Enhance error context information in MissingStateUpdate error.

The current error context includes the project ID and session ID. Additional details, such as the Redis key and any relevant parameters, can provide more insight during troubleshooting.

Consider expanding the context field to include more diagnostic information.

 Err(FlowProjectRedisDataManagerError::MissingStateUpdate {
     key: state_update_key,
-    context: format!("Project: {}, Session: {:?}", project_id, session_id),
+    context: format!(
+        "Project: {}, Session: {:?}, Key: {}",
+        project_id, session_id, state_update_key
+    ),
 })
websocket/crates/infra/src/persistence/project_repository.rs (6)

2-2: Remove unused imports instead of commenting them out

The imports for RedisClientError and RedisClientTrait are commented out. If these imports are no longer needed due to the refactoring, it's cleaner to remove them entirely to maintain code cleanliness.

Also applies to: 20-20


62-65: Consider logging errors for better observability

In the get_project method, errors from Redis operations are propagated using the ? operator. While this is correct, adding logging for errors can aid in debugging and monitoring system health.


Line range hint 77-89: Ensure consistency in error handling and logging

Across methods like create_session, consider consistently logging errors from Redis operations. This uniformity helps in maintaining observability throughout the application.


113-125: Validate session existence before updating

In update_session, ensure that the session exists before attempting to update it. This prevents potential errors if an invalid session ID is provided.


Line range hint 58-133: Update and re-enable unit tests for ProjectRedisRepository

With the transition to a Redis connection pool, the existing unit tests need to be updated to reflect the new implementation. Re-enabling and adapting these tests is essential to ensure the repository functions correctly.

Would you like assistance in updating the unit tests to align with the new implementation?


Line range hint 2-133: Remove commented-out code to maintain codebase cleanliness

There are multiple sections of commented-out code, including imports and previous implementations. Removing obsolete code keeps the codebase clean and reduces confusion.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between c454b86 and 69a2ee0.

⛔ Files ignored due to path filters (1)
  • websocket/Cargo.lock is excluded by !**/*.lock, !**/*.lock
📒 Files selected for processing (11)
  • websocket/app/Cargo.toml (1 hunks)
  • websocket/app/src/errors.rs (2 hunks)
  • websocket/app/src/state.rs (4 hunks)
  • websocket/crates/infra/src/persistence/project_repository.rs (5 hunks)
  • websocket/crates/infra/src/persistence/redis/errors.rs (1 hunks)
  • websocket/crates/infra/src/persistence/redis/flow_project_lock.rs (3 hunks)
  • websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (2 hunks)
  • websocket/crates/infra/src/persistence/redis/mod.rs (0 hunks)
  • websocket/crates/infra/src/persistence/redis/redis_client.rs (0 hunks)
  • websocket/crates/infra/src/persistence/redis/updates.rs (1 hunks)
  • websocket/crates/services/examples/edit_session_service.rs (2 hunks)
💤 Files with no reviewable changes (2)
  • websocket/crates/infra/src/persistence/redis/mod.rs
  • websocket/crates/infra/src/persistence/redis/redis_client.rs
🔇 Additional comments (10)
websocket/app/Cargo.toml (1)

29-30: Verify workspace-level dependency configuration.

Both dependencies are configured with workspace = true, which is correct for version consistency.

Let's verify the workspace-level configuration:

✅ Verification successful

Dependencies are correctly configured at workspace level

The verification confirms that:

  • bb8 (v0.8) and bb8-redis (v0.17.0) are properly defined in the workspace root Cargo.toml
  • All workspace members (app, infra, and services) correctly reference these dependencies using workspace = true
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that bb8 and bb8-redis are properly configured in the workspace
# Expected: Find workspace-level version definitions for these dependencies

# Check the workspace root Cargo.toml for these dependencies
rg -A 1 "^(bb8|bb8-redis)" $(fd -t f "^Cargo.toml$" --max-depth 1)

Length of output: 679

websocket/app/src/errors.rs (1)

25-28: LGTM! Error handling implementation follows best practices.

The new error variants are well-implemented:

  • Uses #[error(transparent)] appropriately for error propagation
  • Follows the pattern established by other variants
  • Integrates cleanly with the Redis connection pooling changes

Let's verify the usage of these new error variants:

✅ Verification successful

Let me gather more context about the error handling implementation to ensure these variants are properly used.


Based on the comprehensive results, I can now generate the final response:

Error handling implementation is correctly integrated and used throughout the codebase

The new error variants in WsError are properly integrated:

  • Pool(FlowProjectRedisDataManagerError) is used extensively in Redis-related operations across the codebase
  • LocalStorage(std::io::Error) is appropriately used for I/O error handling
  • Both variants follow the established error handling patterns with proper error propagation
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for proper error handling of the new variants
# Expected: Find error handling for Pool and LocalStorage variants

# Check for Pool error handling
rg -A 3 "Pool\(" websocket/
# Check for LocalStorage error handling
rg -A 3 "LocalStorage\(" websocket/

Length of output: 720


Script:

#!/bin/bash
# Check for actual usage of these error variants in the codebase
rg -A 5 "-> Result<.+, WsError>" websocket/

# Check the full content of errors.rs to understand the complete error handling context
cat websocket/app/src/errors.rs

# Look for any Redis-related error handling patterns
rg -A 3 "FlowProjectRedisDataManagerError" websocket/

Length of output: 24155

websocket/crates/infra/src/persistence/redis/errors.rs (1)

35-38: LGTM! Clean error conversion implementation.

The From trait implementation follows Rust's error handling best practices and enables ergonomic error propagation using the ? operator.

websocket/crates/infra/src/persistence/redis/flow_project_lock.rs (1)

Line range hint 1-57: Verify lock usage patterns across the codebase.

Let's ensure the lock methods are used consistently and that there are no potential deadlock scenarios.

✅ Verification successful

Lock implementation is safe and properly structured

The verification of the lock implementation shows:

  1. The lock hierarchy is well-defined and consistent:

    • lock_state (single lock)
    • lock_updates (includes state)
    • lock_snapshots and lock_session (includes state and updates)
  2. Lock usage is safe:

    • No nested locks detected in the codebase
    • Locks are automatically released through RAII (LockGuard)
    • Timeouts are consistently set (5000ms)
    • Skip-lock mechanism available when needed
  3. Error handling is properly implemented:

    • LockError is properly propagated
    • Lock acquisition failures are handled gracefully

The implementation follows best practices for distributed locking and shows no signs of potential deadlocks or race conditions.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for lock method usage patterns to verify consistent usage
echo "Checking lock method usage patterns..."
rg -A 2 "lock_(state|updates|snapshots|session)" --type rust

# Look for potential nested locks that might cause deadlocks
echo "Checking for potential nested locks..."
rg -B 2 -A 2 "lock_\w+.*?await.*?lock_\w+" --type rust

Length of output: 1659


Script:

#!/bin/bash
# Let's search for the actual usage of these locks in the codebase
echo "Searching for lock usage patterns..."
rg -B 5 -A 10 "\.lock_(state|updates|snapshots|session)" --type rust

# Let's also check the implementation of FlowProjectRedisDataManager
echo "Checking FlowProjectRedisDataManager implementation..."
ast-grep --pattern 'impl FlowProjectRedisDataManager {
  $$$
}'

# Let's verify if there are any other lock-related patterns
echo "Checking other lock patterns..."
rg "with_lock|LockGuard|LockError" --type rust

Length of output: 29812

websocket/crates/services/examples/edit_session_service.rs (2)

1-2: LGTM: Appropriate imports for Redis connection pooling

The addition of bb8::Pool and bb8_redis::RedisConnectionManager imports is correct for implementing connection pooling, which will help improve concurrency control.


33-34: Consider unifying Redis connection handling

While ProjectRedisRepository now uses the connection pool, FlowProjectRedisDataManager still creates its own Redis connection. Consider refactoring FlowProjectRedisDataManager to use the same connection pool for consistent connection management and better resource utilization.

Let's verify if FlowProjectRedisDataManager creates its own Redis connection:

websocket/crates/infra/src/persistence/redis/updates.rs (1)

83-83: Verify error type conversion implementation.

The simplified error handling suggests that bb8::RunError can be converted to FlowProjectRedisDataManagerError. Please ensure that the appropriate From implementation exists.

✅ Verification successful

Error handling is correctly implemented

The error handling simplification is valid. The FlowProjectRedisDataManagerError enum has a PoolRunError variant that derives From<bb8::RunError<redis::RedisError>> using the #[from] attribute, which automatically implements the conversion from bb8::RunError to FlowProjectRedisDataManagerError. This makes the direct use of the ? operator safe and correct.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for From implementation for the error types
rg "impl From<(bb8::RunError|redis::RedisError)> for FlowProjectRedisDataManagerError"

Length of output: 88


Script:

#!/bin/bash
# Let's first check the error type definition and its implementations
rg -A 10 "FlowProjectRedisDataManagerError" websocket/crates/infra/src/persistence/redis/

# Also check for any error conversion implementations in the module
ast-grep --pattern 'impl From<$_> for FlowProjectRedisDataManagerError { $$ }'

# Check the actual error type returned by the redis pool
rg "redis_pool.*get\(\)" -A 3 websocket/crates/infra/src/persistence/redis/

Length of output: 34085


Script:

#!/bin/bash
# Let's check the error module for PoolRunError and its implementations
rg "PoolRunError" websocket/crates/infra/src/persistence/redis/errors.rs -A 5

# Also check for any From implementations for bb8::RunError
ast-grep --pattern 'impl From<bb8::RunError> for FlowProjectRedisDataManagerError { $$ }'

Length of output: 381

websocket/app/src/state.rs (1)

26-28: Verify all references to redis_client have been updated to redis_pool

Ensure that all instances of redis_client have been replaced with redis_pool throughout the codebase to prevent compilation errors or runtime issues.

Run the following script to check for any remaining references to redis_client:

✅ Verification successful

Let me verify if these occurrences are in example/test files or actual implementation code by checking the file contents.


All references to redis_client have been properly updated to redis_pool

The occurrences of redis_client found in the codebase are either:

  • In commented-out code in project_repository.rs (old test code and imports)
  • In an example file project_service.rs which is not part of the actual implementation

The actual implementation in ProjectRedisRepository correctly uses redis_pool with the bb8 connection pool, and there are no active references to the old redis_client in the production code.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find any occurrences of 'redis_client' in the codebase.

# Test: Search for 'redis_client'. Expect: No matches.
rg 'redis_client'

Length of output: 794


Script:

#!/bin/bash
# Check content of the files containing redis_client references
cat websocket/crates/services/examples/project_service.rs
cat websocket/crates/infra/src/persistence/project_repository.rs

# Also search for RedisClient to ensure we catch all variants
rg "RedisClient" -A 2

Length of output: 23704

websocket/crates/infra/src/persistence/project_repository.rs (2)

40-43: Appropriate addition of error variants for Redis operations

Adding Redis and Pool variants to ProjectRepositoryError enhances error handling by capturing specific errors related to Redis operations and connection pooling. This improves the robustness of error reporting.


47-53: Struct and constructor updated correctly for connection pooling

The ProjectRedisRepository struct now holds a redis_pool, and the constructor is updated accordingly. This aligns with the shift to using a connection pool for better resource management.

@kasugamirai kasugamirai enabled auto-merge (squash) November 7, 2024 19:35
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (13)
websocket/crates/infra/src/persistence/redis/types.rs (1)

3-7: Add documentation for the encoded data format and consider validation.

The struct handles encoded binary data but lacks documentation about the expected encoding format. This could lead to confusion for other developers working with this type.

Consider:

  1. Adding documentation about the encoding format and data structure
  2. Adding validation for the encoded data format
  3. Including examples in the documentation
+/// Represents an encoded flow update with binary data.
+/// The update field contains binary data encoded in [specify format] format.
+/// Example:
+/// ```
+/// let update = FlowEncodedUpdate {
+///     update: vec![1, 2, 3],  // encoded data
+///     updated_by: Some("user123".to_string()),
+/// };
+/// ```
 #[derive(Debug, Serialize, Deserialize, Default)]
 pub struct FlowEncodedUpdate {
+    /// Binary encoded update data in [specify format] format
     pub update: Vec<u8>,
+    /// Optional identifier of the user who made the update
     pub updated_by: Option<String>,
 }
websocket/crates/services/examples/edit_session_service.rs (2)

28-31: Add detailed error handling for connection pool initialization

While the connection pool setup is correct, consider adding more robust error handling and logging for production use.

 // Initialize Redis connection pool
 let manager = RedisConnectionManager::new(&*redis_url)?;
-let redis_pool = Pool::builder().build(manager).await?;
+let redis_pool = Pool::builder()
+    .max_size(15) // Adjust based on your needs
+    .build(manager)
+    .await
+    .map_err(|e| {
+        error!("Failed to create Redis connection pool: {}", e);
+        e
+    })?;
+info!("Redis connection pool initialized successfully");

Also applies to: 34-34


92-99: Consider Y.js document cleanup

The Y.js document handling is implemented correctly, but consider adding cleanup code to free resources when they're no longer needed.

+    // Clean up Y.js resources
+    drop(doc);
+    info!("Y.js resources cleaned up");
+
     info!("Edit session service example completed");
     Ok(())

Also applies to: 109-114

websocket/crates/infra/src/persistence/redis/updates.rs (3)

12-12: Consider documenting the binary data format.

The change from String to Vec<u8> in RedisStreamResult suggests handling raw binary data. This could improve performance but makes the data format less obvious.

Consider adding a comment explaining the expected binary format and why it's preferred over string representation.


63-73: LGTM! Consider enhancing debug logging.

The changes improve code readability and add helpful debug logging. The error handling is more straightforward.

Consider adding more context to the debug logs:

-debug!("Processing update: {:?}", u);
+debug!("Processing update for project {}: {:?}", project_id, u);
-debug!("Last stream id: {}", last_stream_id);
+debug!("Completed processing updates. Last stream id: {}", last_stream_id);

108-113: Consider optimizing string cloning.

The code looks good, but there's a potential performance optimization opportunity.

Consider moving the update_id.clone() outside the inner loop since it's the same for all items:

 for (update_id, items) in stream_items {
+    let stream_id = Some(update_id.clone());
     for (updated_by, update_data) in items {
         updates.push(FlowUpdate {
-            stream_id: Some(update_id.clone()),
+            stream_id: stream_id.clone(),
             update: update_data,
             updated_by: Some(updated_by),
         });
     }
 }
websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (3)

104-131: Add debug logging for state update processing.

The method handles empty entries well, but could benefit from additional debug logging to help with troubleshooting.

 async fn get_state_update_in_redis(
     &self,
     project_id: &str,
 ) -> Result<Option<Vec<Vec<u8>>>, FlowProjectRedisDataManagerError> {
     let state_update_key = self.key_manager.state_updates_key(project_id)?;
     debug!(
         "Getting state update from Redis stream: {}",
         state_update_key
     );

     let entries = self.xread_map(&state_update_key, "0").await?;
     debug!("State update entries: {:?}", entries);

     if entries.is_empty() {
+        debug!("No state updates found for project: {}", project_id);
         return Ok(None);
     }
     let mut updates = Vec::new();
     for (_, fields) in entries {
         if let Some((_, update_data)) = fields.first() {
+            debug!("Processing state update of size: {}", update_data.len());
             updates.push(update_data.clone());
         }
     }

     if updates.is_empty() {
+        debug!("No valid updates found in entries for project: {}", project_id);
         Ok(None)
     } else {
+        debug!("Found {} valid updates for project: {}", updates.len(), project_id);
         Ok(Some(updates))
     }
 }

185-198: Add validation for merged updates.

The merge updates logic should validate the merged state to ensure consistency.

 async fn execute_merge_updates(
     &self,
     project_id: &str,
 ) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> {
     let state_updates = self.get_state_update_in_redis(project_id).await?;
     let mut merged_update: Vec<u8> = Vec::new();
     let mut updates_by: Vec<String> = Vec::new();
     if let Some(state_updates) = state_updates {
         for state_update in state_updates {
+            // Validate update before merging
+            if let Err(e) = Update::decode_v2(&state_update) {
+                debug!("Invalid update found: {:?}", e);
+                continue;
+            }
             let (new_merged_update, new_updates_by) = self
                 .update_manager
                 .merge_updates(project_id, Some(state_update))
                 .await?;
             merged_update = new_merged_update;
             updates_by = new_updates_by;
         }
     }

Line range hint 275-284: Add size validation for update data.

Consider adding validation for the update data size to prevent memory issues with large updates.

+const MAX_UPDATE_SIZE: usize = 1024 * 1024; // 1MB limit

 let fields = &[(updated_by.as_deref().unwrap_or(""), &update_data)];
+if update_data.len() > MAX_UPDATE_SIZE {
+    debug!("Update size {} exceeds limit of {}", update_data.len(), MAX_UPDATE_SIZE);
+    return Err(FlowProjectRedisDataManagerError::InvalidInput(
+        "Update size exceeds maximum allowed size".to_string(),
+    ));
+}
 debug!("Pushing update to Redis stream: {:?}", fields);
websocket/crates/infra/src/persistence/project_repository.rs (3)

45-51: LGTM: Enhanced concurrency with connection pooling

The transition to a connection pool improves concurrency control by efficiently managing Redis connections. The removal of the generic type parameter also simplifies the implementation.

The connection pooling approach is more scalable as it:

  • Reduces connection overhead
  • Prevents connection exhaustion under high load
  • Provides automatic connection management

56-64: Consider optimizing connection usage

While the implementation is correct, consider using a scoped block to ensure the connection is released back to the pool as soon as possible:

 async fn get_project(&self, project_id: &str) -> Result<Option<Project>, Self::Error> {
-    let mut conn = self.redis_pool.get().await?;
-    let key = format!("project:{}", project_id);
-    let project: Option<String> = conn.get(&key).await?;
-    Ok(project.map(|p| serde_json::from_str(&p)).transpose()?)
+    let key = format!("project:{}", project_id);
+    let project = {
+        let mut conn = self.redis_pool.get().await?;
+        conn.get(&key).await?
+    };
+    Ok(project.map(|p| serde_json::from_str(&p)).transpose()?)
 }

Line range hint 74-87: Consider using Redis transactions for atomic operations

The session creation process involves multiple Redis operations. Consider using transactions to ensure atomicity:

 async fn create_session(
     &self,
     mut session: ProjectEditingSession,
 ) -> Result<String, Self::Error> {
     let mut conn = self.redis_pool.get().await?;

     let session_id = session
         .session_id
         .get_or_insert_with(|| generate_id!("editor-session:"))
         .clone();

     let session_key = format!("session:{}", session_id);
     let active_session_key = format!("project:{}:active_session", session.project_id);

     let session_json = serde_json::to_string(&session)?;
-    let _: () = conn.set(&session_key, session_json).await?;
-    let _: () = conn.set(&active_session_key, &session_id).await?;
+    redis::pipe()
+        .atomic()
+        .set(&session_key, session_json)
+        .set(&active_session_key, &session_id)
+        .query_async(&mut conn)
+        .await?;

     Ok(session_id)
 }

Similar transaction-based approaches should be applied to update_session for atomic operations.

websocket/crates/domain/src/editing_session.rs (1)

Line range hint 291-334: Enhance error handling with contextual information

While the error handling is functional, adding context about failed operations would improve debugging capabilities.

Consider wrapping errors with additional context:

 let (state, edits) = redis_data_manager
     .merge_updates(&self.project_id, true)
     .await
-    .map_err(ProjectEditingSessionError::redis)?;
+    .map_err(|e| {
+        debug!("[end_session] Failed to merge updates: {}", e);
+        ProjectEditingSessionError::redis(format!("Failed to merge updates: {}", e))
+    })?;

 if save_changes {
     let snapshot = snapshot_repo
         .get_latest_snapshot(&self.project_id)
         .await
-        .map_err(ProjectEditingSessionError::snapshot)?;
+        .map_err(|e| {
+            debug!("[end_session] Failed to get latest snapshot: {}", e);
+            ProjectEditingSessionError::snapshot(format!("Failed to get latest snapshot: {}", e))
+        })?;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 69a2ee0 and 7ed1f34.

⛔ Files ignored due to path filters (1)
  • websocket/Cargo.lock is excluded by !**/*.lock, !**/*.lock
📒 Files selected for processing (10)
  • websocket/crates/domain/src/editing_session.rs (1 hunks)
  • websocket/crates/infra/src/persistence/project_repository.rs (4 hunks)
  • websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (7 hunks)
  • websocket/crates/infra/src/persistence/redis/mod.rs (1 hunks)
  • websocket/crates/infra/src/persistence/redis/types.rs (1 hunks)
  • websocket/crates/infra/src/persistence/redis/updates.rs (5 hunks)
  • websocket/crates/infra/src/persistence/redis/utils.rs (0 hunks)
  • websocket/crates/services/Cargo.toml (1 hunks)
  • websocket/crates/services/examples/edit_session_service.rs (3 hunks)
  • websocket/crates/services/src/manage_project_edit_session.rs (0 hunks)
💤 Files with no reviewable changes (2)
  • websocket/crates/infra/src/persistence/redis/utils.rs
  • websocket/crates/services/src/manage_project_edit_session.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • websocket/crates/infra/src/persistence/redis/mod.rs
🔇 Additional comments (8)
websocket/crates/infra/src/persistence/redis/types.rs (1)

16-19: Verify compatibility with existing stream consumers.

The change from String to Vec<u8> in StreamEntry is a breaking change that requires careful verification of all consumers.

Let's check for potential compatibility issues:

✅ Verification successful

Let me gather more information about the actual usage and data handling to better understand the impact.


Based on the gathered evidence, I can now generate the final response:

Breaking change is properly handled with appropriate data conversion

The transition from String to Vec<u8> in StreamEntry is safely managed through proper data conversion mechanisms:

  • The codebase uses String::from_utf8_lossy for reading binary data
  • JSON serialization/deserialization is handled appropriately
  • Stream operations maintain compatibility through proper value handling

The implementation shows careful consideration of binary data handling while maintaining string compatibility where needed. No compatibility issues are apparent.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for StreamEntry usage to identify affected code
echo "Searching for StreamEntry usage..."
rg "StreamEntry" -A 3

# Search for direct Redis stream operations that might expect string data
echo "Checking Redis stream operations..."
rg "XADD|XREAD|XRANGE|XREVRANGE" -A 3

Length of output: 1879


Script:

#!/bin/bash
# Check for string-to-bytes conversion patterns and data handling
echo "Checking data conversion patterns..."
rg "as_bytes|String::from_utf8|from_utf8|to_string" -A 3 websocket/crates/infra/src/persistence/redis/

# Look for Redis value handling
echo "Checking Redis value handling..."
ast-grep --pattern 'impl $_ {
  $$$
  fn parse_stream_entry($$$) {
    $$$
  }
  $$$
}'

Length of output: 2666

websocket/crates/services/Cargo.toml (1)

34-34: Verify the architectural impact of introducing CRDT capabilities.

The addition of the yrs crate (a CRDT framework) follows the correct workspace dependency pattern. However, this seems to introduce significant real-time collaboration capabilities that weren't mentioned in the PR objectives focused on Redis lock handling.

Let's verify the scope and impact of this change:

Consider:

  1. Documenting the CRDT strategy in the architecture documentation
  2. Explaining how this relates to the Redis lock handler improvements
  3. Adding migration guides if this affects existing real-time collaboration features
websocket/crates/services/examples/edit_session_service.rs (2)

1-2: LGTM: Dependencies properly imported

The new imports for Redis connection pooling (bb8::Pool, bb8_redis::RedisConnectionManager) and Y.js document handling (yrs::{Doc, Text, Transact}) are correctly specified and align with the PR's objectives.

Also applies to: 14-14


102-107: Verify Y.js update size limits

While the update commands are correctly implemented, consider adding size checks for the Y.js encoded updates to prevent potential issues with very large documents.

Also applies to: 116-121

websocket/crates/infra/src/persistence/redis/updates.rs (1)

127-127: Review debug logging security implications.

The debug log for merged stream updates might expose sensitive data in the logs.

Consider either:

  1. Redacting sensitive information
  2. Using a custom Debug implementation that only shows metadata
websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (1)

41-53: Consider adding validation for field names in parse_entry_fields.

The method successfully parses Redis bulk strings into key-value pairs but doesn't validate field names. Consider adding validation to ensure critical fields like "value" are present and properly formatted.

 fn parse_entry_fields(fields: &[redis::Value]) -> Option<Vec<(String, Vec<u8>)>> {
     fields
         .chunks(2)
         .filter(|chunk| chunk.len() == 2)
         .filter_map(|chunk| match (&chunk[0], &chunk[1]) {
             (redis::Value::BulkString(key), redis::Value::BulkString(val)) => {
+                let key_str = String::from_utf8_lossy(key);
+                if !["value", "updated_by"].contains(&key_str.as_ref()) {
+                    return None;
+                }
-                Some((String::from_utf8_lossy(key).into_owned(), val.clone()))
+                Some((key_str.into_owned(), val.clone()))
             }
             _ => None,
         })
         .collect::<Vec<_>>()
         .into()
 }
websocket/crates/infra/src/persistence/project_repository.rs (2)

20-22: LGTM: Improved error handling with connection pooling

The addition of Redis connection pooling and corresponding error variants enhances the error handling capabilities, providing better granularity for debugging Redis-related issues.

Also applies to: 38-41


96-104: Previous review comment about session data inconsistency is still applicable

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (15)
websocket/crates/services/src/error.rs (1)

14-16: LGTM! Good architectural decision moving to infrastructure layer.

The migration of ProjectEditingSessionError from domain to infrastructure layer (flow_websocket_infra::persistence::editing_session) is a good architectural decision. This change:

  • Properly separates persistence-related errors into the infrastructure layer
  • Maintains consistent error handling structure
  • Aligns with the PR's goal of improving Redis-related functionality
websocket/crates/infra/src/persistence/repository.rs (1)

Line range hint 1-89: Positive architectural direction with module reorganization and Redis improvements.

The trait definitions in this file provide a solid foundation for the project's data access layer. The module reorganization, combined with the broader changes to Redis connection management (using bb8 connection pooling), demonstrates good architectural practices:

  • Clear separation of concerns with domain types and persistence logic
  • Well-defined trait boundaries for different operations
  • Proper error handling with associated types
  • Improved connection management with pooling

Consider documenting these architectural decisions in a README or architecture.md file to help future contributors understand the design choices.

websocket/crates/services/examples/edit_session_service.rs (1)

32-39: Consider documenting the concurrent editing architecture.

The transition to connection pooling and Y.js integration significantly improves the concurrent editing capabilities. Consider adding documentation that explains:

  1. How the Redis connection pool handles concurrent sessions
  2. The role of Y.js in managing concurrent document updates
  3. The conflict resolution strategy

Would you like me to help create a detailed architecture document?

Also applies to: 96-119

websocket/app/src/handler.rs (3)

14-14: Consider architectural implications of moving User type to infrastructure layer.

Moving the User type from flow_websocket_domain to flow_websocket_infra appears to violate clean architecture principles, where domain entities should be independent of infrastructure concerns. This could make the codebase harder to test and maintain.

Consider keeping the User type in the domain layer and using an adapter pattern if infrastructure-specific user functionality is needed.


Line range hint 252-321: Implement Redis-based distributed locking for room operations.

The current implementation uses in-memory mutex (try_lock()) for room operations, which doesn't provide distributed concurrency control in a multi-instance environment. This could lead to race conditions when:

  • Multiple instances try to modify the same room
  • Room state becomes inconsistent across instances
  • User sessions conflict across different servers

Consider implementing Redis-based distributed locking for room operations:

  1. Use the new Redis connection pool from AppState
  2. Implement distributed locks for critical sections
  3. Add proper error handling for lock acquisition failures

Example pattern:

async fn join(&self, room_id: &str, user_id: &str) -> Result<(), WsError> {
    let lock_key = format!("room_lock:{}", room_id);
    let _lock = self.redis_pool.get()
        .await?
        .set_nx_ex(&lock_key, "1", LOCK_TIMEOUT)
        .await?;
    
    // Existing room operations with proper error handling
    let mut rooms = self.rooms.try_lock()?;
    // ...
}

Line range hint 142-251: Improve error handling and add timeouts for distributed operations.

The current error handling has several potential issues:

  1. try_lock() calls could lead to deadlocks
  2. Missing timeout handling for async operations
  3. Inconsistent error propagation

Consider these improvements:

async fn handle_message(
    msg: Message,
    addr: SocketAddr,
    room_id: &str,
    project_id: Option<String>,
    state: Arc<AppState>,
    user: User,
) -> Result<Option<Message>, WsError> {
    // Add timeout for all async operations
    let timeout = Duration::from_secs(5);
    
    match msg {
        Message::Binary(d) => {
            trace!("{} sent {} bytes: {:?}", addr, d.len(), d);
            if d.len() < 3 {
                return Ok(None);
            };

            // Use timeout for lock acquisition
            let rooms = tokio::time::timeout(
                timeout,
                async { state.rooms.try_lock().map_err(WsError::from) }
            ).await??;

            let room = rooms
                .get(room_id)
                .ok_or_else(|| WsError::RoomNotFound(room_id.to_string()))?;

            // Add timeout for command channel
            if let Some(project_id) = project_id {
                tokio::time::timeout(
                    timeout,
                    state.command_tx.send(SessionCommand::PushUpdate {
                        project_id,
                        update: d,
                        updated_by: Some(user.name.clone()),
                    })
                ).await??;
            }

            Ok(None)
        }
        // ... rest of the match arms
    }
}
websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (3)

42-54: Add debug logging for parsing failures.

While the parsing logic is correct, adding debug logging when field parsing fails would help with troubleshooting.

 fn parse_entry_fields(fields: &[redis::Value]) -> Option<Vec<(String, Vec<u8>)>> {
+    debug!("Parsing Redis entry fields: {:?}", fields);
     fields
         .chunks(2)
         .filter(|chunk| chunk.len() == 2)
         .filter_map(|chunk| match (&chunk[0], &chunk[1]) {
             (redis::Value::BulkString(key), redis::Value::BulkString(val)) => {
                 Some((String::from_utf8_lossy(key).into_owned(), val.clone()))
             }
-            _ => None,
+            _ => {
+                debug!("Skipping invalid field pair: {:?}", chunk);
+                None
+            },
         })
         .collect::<Vec<_>>()
         .into()
 }

169-175: Make lock timeout configurable.

The lock timeout is hardcoded to 5000ms. In high-latency scenarios or when processing large updates, this might not be sufficient. Consider making this value configurable through the constructor.

 pub struct FlowProjectRedisDataManager {
     redis_pool: Pool<RedisConnectionManager>,
     global_lock: FlowProjectLock,
     update_manager: Arc<UpdateManager>,
     key_manager: Arc<DefaultKeyManager>,
+    lock_timeout_ms: u64,
 }

 impl FlowProjectRedisDataManager {
-    pub async fn new(redis_url: &str) -> Result<Self, FlowProjectRedisDataManagerError> {
+    pub async fn new(redis_url: &str, lock_timeout_ms: Option<u64>) -> Result<Self, FlowProjectRedisDataManagerError> {
         let manager = RedisConnectionManager::new(redis_url)?;
         let redis_pool = Pool::builder().build(manager).await?;
         let global_lock = FlowProjectLock::new(redis_url);
         let key_manager = Arc::new(DefaultKeyManager);

         let instance = Self {
             redis_pool: redis_pool.clone(),
             global_lock,
             update_manager: Arc::new(UpdateManager::new(redis_pool.clone(), key_manager.clone())),
             key_manager,
+            lock_timeout_ms: lock_timeout_ms.unwrap_or(5000),
         };

         Ok(instance)
     }
 }

Then update the lock calls to use the configured timeout:

-                .lock_state(project_id, 5000, move |_lock_guard| {
+                .lock_state(project_id, self.lock_timeout_ms, move |_lock_guard| {

276-276: Use a more descriptive default value for updated_by.

The current default value is an empty string when updated_by is None. Consider using a more descriptive default value to indicate that the update was made by an anonymous user.

-        let fields = &[(updated_by.as_deref().unwrap_or(""), &update_data)];
+        let fields = &[(updated_by.as_deref().unwrap_or("anonymous"), &update_data)];
websocket/crates/infra/src/persistence/project_repository.rs (1)

57-60: Optimize connection usage across multiple Redis operations

Consider using redis::pipe() to batch multiple Redis commands into a single round trip where possible. This would reduce network overhead and improve performance.

Example for the create_session method:

 async fn create_session(
     &self,
     mut session: ProjectEditingSession,
 ) -> Result<String, Self::Error> {
     let mut conn = self.redis_pool.get().await?;

     let session_id = session
         .session_id
         .get_or_insert_with(|| generate_id!("editor-session:"))
         .clone();

     let session_key = format!("session:{}", session_id);
     let active_session_key = format!("project:{}:active_session", session.project_id);

     let session_json = serde_json::to_string(&session)?;
-    let _: () = conn.set(&session_key, session_json).await?;
-    let _: () = conn.set(&active_session_key, &session_id).await?;
+    redis::pipe()
+        .set(&session_key, session_json)
+        .set(&active_session_key, &session_id)
+        .query_async(&mut conn)
+        .await?;

     Ok(session_id)
 }

Also applies to: 72-84, 93-101, 108-122, 126-129

websocket/crates/services/src/project.rs (3)

2-10: Architectural change: Domain types moved to infrastructure layer

The migration of types from flow_websocket_domain to flow_websocket_infra suggests a significant architectural change. While this consolidates the types, it might blur the boundary between domain and infrastructure concerns, potentially affecting the separation of concerns.

Consider maintaining a clear separation between domain and infrastructure layers by:

  1. Keeping domain types in a separate module
  2. Using trait abstractions in the domain layer
  3. Implementing these traits in the infrastructure layer

Line range hint 14-24: Enhance Redis operations with explicit locking mechanisms

The current implementation lacks explicit concurrency control for Redis operations. Methods like push_update_to_redis_stream and get_current_state could benefit from proper locking mechanisms to prevent race conditions in concurrent scenarios.

Consider implementing the following improvements:

  1. Add distributed locking using Redis's SETNX command
  2. Implement retry mechanisms with exponential backoff
  3. Add timeouts to prevent deadlocks

Example implementation for push_update_to_redis_stream:

 pub async fn push_update_to_redis_stream(
     &self,
     project_id: &str,
     update: Vec<u8>,
     updated_by: Option<String>,
 ) -> Result<(), ProjectServiceError> {
+    // Acquire lock with timeout
+    let lock_key = format!("lock:project:{}", project_id);
+    let lock = self.redis_data_manager.acquire_lock(&lock_key, 30).await?;
+
+    // Execute update with the lock
     let result = self
         .redis_data_manager
         .push_update(project_id, update, updated_by)
         .await;
+
+    // Release lock
+    self.redis_data_manager.release_lock(lock).await?;
+
     Ok(result?)
 }

Also applies to: 71-75


Line range hint 153-155: Uncomment and enhance test coverage

The comprehensive test suite is currently commented out. Given the focus on concurrency control improvements, these tests should be uncommented and enhanced to verify proper locking behavior.

Consider adding the following test scenarios:

  1. Concurrent access to shared resources
  2. Lock acquisition and release
  3. Lock timeout handling
  4. Race condition prevention

Example test case:

#[tokio::test]
async fn test_concurrent_push_updates() {
    let service = setup_service();
    let handles: Vec<_> = (0..10).map(|i| {
        let service = service.clone();
        tokio::spawn(async move {
            service
                .push_update_to_redis_stream(
                    "project_123",
                    vec![i as u8],
                    Some("user1".to_string()),
                )
                .await
        })
    }).collect();

    let results = futures::future::join_all(handles).await;
    assert!(results.iter().all(|r| r.as_ref().unwrap().is_ok()));
}
websocket/crates/infra/src/persistence/editing_session.rs (2)

250-255: Remove unnecessary debug statements

The debug statements at lines 250 and 255:

debug!("state: {:?}", state);
debug!("3-------------");

may be residual from development or debugging sessions. Consider removing or adjusting these logs to prevent cluttering the output and potentially exposing sensitive information.


250-250: Consider logging levels for sensitive or large data

Logging the entire state object with debug!("state: {:?}", state); may not be ideal if state is large or contains sensitive information. Consider adjusting the logging level or summarizing the output.

Apply this diff to modify the debug statement:

-debug!("state: {:?}", state);
+debug!("Merged state for project {}", self.project_id);
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 7ed1f34 and 06b2c74.

⛔ Files ignored due to path filters (1)
  • websocket/Cargo.lock is excluded by !**/*.lock, !**/*.lock
📒 Files selected for processing (19)
  • websocket/Cargo.toml (0 hunks)
  • websocket/app/Cargo.toml (1 hunks)
  • websocket/app/src/handler.rs (1 hunks)
  • websocket/crates/domain/Cargo.toml (0 hunks)
  • websocket/crates/domain/src/lib.rs (0 hunks)
  • websocket/crates/domain/src/utils.rs (0 hunks)
  • websocket/crates/infra/Cargo.toml (0 hunks)
  • websocket/crates/infra/src/lib.rs (1 hunks)
  • websocket/crates/infra/src/persistence/editing_session.rs (10 hunks)
  • websocket/crates/infra/src/persistence/mod.rs (1 hunks)
  • websocket/crates/infra/src/persistence/project_repository.rs (4 hunks)
  • websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (6 hunks)
  • websocket/crates/infra/src/persistence/repository.rs (1 hunks)
  • websocket/crates/infra/src/utils.rs (1 hunks)
  • websocket/crates/services/Cargo.toml (1 hunks)
  • websocket/crates/services/examples/edit_session_service.rs (3 hunks)
  • websocket/crates/services/src/error.rs (2 hunks)
  • websocket/crates/services/src/manage_project_edit_session.rs (1 hunks)
  • websocket/crates/services/src/project.rs (1 hunks)
💤 Files with no reviewable changes (5)
  • websocket/Cargo.toml
  • websocket/crates/domain/Cargo.toml
  • websocket/crates/domain/src/lib.rs
  • websocket/crates/domain/src/utils.rs
  • websocket/crates/infra/Cargo.toml
✅ Files skipped from review due to trivial changes (3)
  • websocket/crates/infra/src/lib.rs
  • websocket/crates/infra/src/persistence/mod.rs
  • websocket/crates/infra/src/utils.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • websocket/app/Cargo.toml
  • websocket/crates/services/Cargo.toml
  • websocket/crates/services/src/manage_project_edit_session.rs
🔇 Additional comments (15)
websocket/crates/services/src/error.rs (1)

2-5: LGTM! Well-organized imports for persistence-related errors.

The imports are properly organized and clearly show the dependency on infrastructure layer components for persistence-related errors.

websocket/crates/services/examples/edit_session_service.rs (2)

1-17: LGTM! Dependencies are properly structured for the new architecture.

The transition from direct Redis client to connection pooling is reflected correctly in the imports. The addition of Y.js imports supports the new document handling functionality.

✅ Verification successful

All imported types are actively used in the codebase

The verification confirms that all imported types from the new dependencies are properly utilized in the code:

  • Pool and RedisConnectionManager are used for Redis connection pooling setup
  • Doc, Text, and Transact from Y.js are used for document operations

The imports are clean with no unused dependencies.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify all imports are used
rg --type rust -w "Pool|RedisConnectionManager|Doc|Text|Transact" websocket/crates/services/examples/edit_session_service.rs

Length of output: 359


120-125: Verify the complete session lifecycle and clean up commented code.

The update handling looks correct, but there are commented-out sections for task removal and session completion. These should either be implemented or removed to maintain code clarity.

Let's verify the session lifecycle implementation:

websocket/app/src/handler.rs (1)

Line range hint 1-321: Verify completeness of Redis integration changes.

The PR objectives mention updating the Redis lock handler, but the changes in this file are minimal. Let's verify if related changes exist in other files.

✅ Verification successful

Redis integration appears complete and properly implemented

The verification shows that Redis integration is properly implemented across the codebase:

  • AppState includes Redis connection pool and session repository
  • Redis lock implementation exists in flow_project_lock.rs
  • Redis connection is properly initialized in AppState::new
  • Redis-related components are present in multiple layers (infra, services)

The changes in the reviewed file are consistent with the broader Redis integration in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for Redis-related changes in the codebase

echo "Searching for Redis connection pool configuration..."
rg -l "bb8::Pool|RedisConnectionManager" 

echo "Searching for Redis lock implementation..."
rg -l "Redis.*Lock|distributed.*lock"

echo "Searching for AppState Redis integration..."
ast-grep --pattern 'struct AppState {
  $$$
  redis_pool: $_,
  $$$
}'

Length of output: 939


Script:

#!/bin/bash
# Let's check the Redis integration in state.rs and the lock implementation

echo "Checking AppState structure in state.rs..."
rg -A 10 "struct AppState" websocket/app/src/state.rs

echo -e "\nChecking Redis lock implementation..."
rg -A 10 "Redis.*Lock" websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs

echo -e "\nChecking Redis connection pool usage in state.rs..."
rg "redis_pool" websocket/app/src/state.rs

Length of output: 1486


Script:

#!/bin/bash
# Let's check the actual Redis lock implementation and its usage

echo "Checking FlowProjectLock implementation..."
rg -A 20 "struct FlowProjectLock" websocket/crates/infra/src/persistence/redis/flow_project_lock.rs

echo -e "\nChecking lock usage in project repository..."
rg -B 5 -A 10 "lock.*project" websocket/crates/infra/src/persistence/project_repository.rs

echo -e "\nChecking Redis connection initialization..."
rg -A 10 "impl AppState" websocket/app/src/state.rs

Length of output: 1610

websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (1)

15-15: LGTM! Good use of type alias.

The StreamEntry type alias improves code readability and makes the Redis stream entry structure more explicit.

websocket/crates/infra/src/persistence/project_repository.rs (5)

35-38: LGTM: Comprehensive error handling for Redis connection pooling

The error handling for Redis operations has been properly updated to handle both Redis errors and connection pool errors using transparent error forwarding.


42-43: LGTM: Improved connection management with bb8 pool

The switch to connection pooling is a significant improvement that will:

  • Better handle concurrent Redis operations
  • Automatically manage connection lifecycle
  • Prevent connection leaks

47-48: LGTM: Clean constructor with externalized pool configuration

The constructor is well-designed, accepting a pre-configured connection pool which allows for flexible pool configuration at the application level.


93-101: Previous review comment is still applicable

The session consistency issue mentioned in the previous review remains valid. The code should handle cases where session data might be missing despite having a valid session ID.


108-122: Previous review comment is still applicable

The suggestion to use atomic transactions for session updates remains valid and should be implemented.

websocket/crates/infra/src/persistence/editing_session.rs (5)

35-38: Simplify error handling using transparent error variants

The use of #[error(transparent)] and #[from] attributes in the ProjectEditingSessionError enum effectively simplifies error propagation by automatically converting underlying errors into the enum variants. This reduces boilerplate code and enhances readability.


72-74: Consistent error type specifications in trait bounds

Specifying the Error associated type in the trait bounds for E, S, and R ensures that each implementation aligns with the expected error types. This promotes consistency across different implementations and simplifies error handling throughout the methods.


137-139: Proper use of asynchronous mutex for synchronization

The use of tokio::sync::Mutex to synchronize access within asynchronous contexts is appropriate. Acquiring the lock with let _lock = self.session_lock.lock().await; ensures thread-safe operations when merging updates.


7-9: Ensure all necessary imports are included

The added imports for ObjectTenant and User are appropriate and necessary for the functionality. They contribute to better code organization and readability.


240-241: Consistent error type specifications in trait bounds

In the end_session method, specifying the Error associated type for R and S aligns with the error handling strategy used throughout the code, ensuring consistency and clarity.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 06b2c74 and cece791.

📒 Files selected for processing (1)
  • websocket/crates/infra/src/persistence/project_repository.rs (6 hunks)
🔇 Additional comments (5)
websocket/crates/infra/src/persistence/project_repository.rs (5)

35-38: LGTM: Error handling properly updated for connection pooling

The error enum has been correctly updated to handle both Redis errors and connection pool errors, maintaining proper error propagation.


42-48: LGTM: Clean transition to connection pooling

The repository structure has been simplified by removing the generic type parameter and properly implementing connection pooling.


57-60: LGTM: Proper connection pool usage for project retrieval

The implementation correctly acquires a connection from the pool and performs the read operation.


93-101: Previous review comment about session data inconsistency still applies


108-122: Previous review comment about atomic transactions still applies

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (3)
websocket/crates/services/examples/project_service.rs (3)

14-14: Enhance the example documentation.

While the command to run the example is provided, it would be more helpful to include additional context about prerequisites (e.g., Redis setup) and expected behavior.

-///RUST_LOG=debug cargo run --example project_service
+/// Example demonstrating ProjectService functionality with Redis integration
+/// 
+/// Prerequisites:
+/// - Redis server running locally (default: redis://localhost:6379)
+/// 
+/// Usage:
+/// ```bash
+/// RUST_LOG=debug cargo run --example project_service
+/// ```
+/// 
+/// This example demonstrates:
+/// - Redis connection pooling
+/// - Project session management
+/// - State and snapshot operations

44-52: Improve test data management.

Consider extracting test data to a configuration file or environment variables, especially for sensitive information like email and tenant_id. This would make the example more flexible and secure.

-let project_id = "test_project_123";
+let project_id = std::env::var("TEST_PROJECT_ID")
+    .unwrap_or_else(|_| "test_project_123".to_string());

-let test_user = User {
-    id: "user_123".to_string(),
-    email: "test@example.com".to_string(),
-    name: "Test User".to_string(),
-    tenant_id: "tenant_123".to_string(),
-};
+let test_user = User {
+    id: std::env::var("TEST_USER_ID")
+        .unwrap_or_else(|_| "user_123".to_string()),
+    email: std::env::var("TEST_USER_EMAIL")
+        .unwrap_or_else(|_| "test@example.com".to_string()),
+    name: std::env::var("TEST_USER_NAME")
+        .unwrap_or_else(|_| "Test User".to_string()),
+    tenant_id: std::env::var("TEST_TENANT_ID")
+        .unwrap_or_else(|_| "tenant_123".to_string()),
+};

16-104: Consider separating example configurations and adding integration tests.

While this example effectively demonstrates the functionality, consider:

  1. Moving configuration to a separate module for reuse across examples
  2. Adding integration tests based on this example workflow
  3. Implementing proper cleanup of Redis resources after the example runs
  4. Adding error recovery scenarios and reconnection logic for Redis failures

This would make the example more valuable for both documentation and testing purposes.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between cece791 and dc80b3b.

📒 Files selected for processing (1)
  • websocket/crates/services/examples/project_service.rs (1 hunks)
🔇 Additional comments (1)
websocket/crates/services/examples/project_service.rs (1)

98-102: Validate session state before ending.

The session ending operation should validate the session state and handle cleanup properly.

 debug!("Ending session...");
+debug!("Validating session state before ending...");
+if session.session_id.is_none() {
+    return Err("Invalid session state".into());
+}
 service
     .end_session("test_snapshot".to_string(), session)
-    .await?;
+    .await
+    .map_err(|e| format!("Failed to end session: {}", e))?;
+debug!("Session ended successfully");

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (5)
websocket/crates/services/examples/project_service.rs (5)

27-28: Enhance Redis URL validation.

The Redis URL fallback is good, but consider validating the URL format to prevent runtime connection errors.

-let redis_url =
-    std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379/0".to_string());
+let redis_url = std::env::var("REDIS_URL")
+    .unwrap_or_else(|_| "redis://localhost:6379/0".to_string());
+if !redis_url.starts_with("redis://") {
+    return Err("Invalid Redis URL format. Must start with 'redis://'".into());
+}

32-42: Consider making the local storage path configurable.

The local storage path is hardcoded, which might cause issues when running the example in different environments.

-let local_storage = ProjectLocalRepository::new("./local_storage".into()).await?;
+let storage_path = std::env::var("LOCAL_STORAGE_PATH")
+    .unwrap_or_else(|_| "./local_storage".to_string());
+let local_storage = ProjectLocalRepository::new(storage_path.into()).await?;

48-53: Use clearly marked test data.

For example code, use obviously fake data to prevent confusion with real credentials.

 let test_user = User {
-    id: "user_123".to_string(),
-    email: "test@example.com".to_string(),
-    name: "Test User".to_string(),
-    tenant_id: "tenant_123".to_string(),
+    id: "example_user_id".to_string(),
+    email: "example@test.local".to_string(),
+    name: "Example User".to_string(),
+    tenant_id: "example_tenant_id".to_string(),
 };

56-93: Enhance error handling with operation-specific contexts.

Consider wrapping errors with operation-specific context to make debugging easier.

-if let Some(project) = service.get_project(project_id).await? {
+if let Some(project) = service.get_project(project_id)
+    .await
+    .map_err(|e| format!("Failed to get project {}: {}", project_id, e))? {

107-132: Consider extracting state handling logic.

The state handling logic is complex and could benefit from being extracted into a separate function for better readability and reusability.

+async fn handle_current_state(
+    state: Vec<u8>,
+) -> Result<Option<String>, Box<dyn std::error::Error>> {
+    if state.is_empty() {
+        debug!("Received empty state update, skipping...");
+        return Ok(None);
+    }
+
+    debug!("---------------------");
+    debug!("state: {:?}", state);
+    debug!("------------");
+
+    let doc = Doc::new();
+    let update = Update::decode_v2(&state)?;
+    doc.transact_mut().apply_update(update);
+
+    let text = doc.get_or_insert_text("content");
+    let content = {
+        let txn = doc.transact();
+        text.get_string(&txn)
+    };
+
+    info!("Current document content: {}", content);
+    info!("Current state size: {} bytes", state.len());
+    
+    Ok(Some(content))
+}
+
 if let Some(state) = service
     .get_current_state(project_id, session.session_id.as_deref())
     .await?
 {
-    if !state.is_empty() {
-        debug!("---------------------");
-        debug!("state: {:?}", state);
-        debug!("------------");
-
-        // Create a new doc to apply the state
-        let doc = Doc::new();
-        let update = Update::decode_v2(&state).map_err(Box::new)?;
-        doc.transact_mut().apply_update(update);
-
-        let text = doc.get_or_insert_text("content");
-        let content = {
-            let txn = doc.transact();
-            text.get_string(&txn)
-        };
-
-        info!("Current document content: {}", content);
-        info!("Current state size: {} bytes", state.len());
-    } else {
-        debug!("Received empty state update, skipping...");
-    }
+    if let Some(content) = handle_current_state(state).await? {
+        debug!("Successfully processed state update");
+    }
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between dc80b3b and 4a1115f.

📒 Files selected for processing (3)
  • websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (6 hunks)
  • websocket/crates/infra/src/persistence/redis/updates.rs (6 hunks)
  • websocket/crates/services/examples/project_service.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • websocket/crates/infra/src/persistence/redis/updates.rs
🔇 Additional comments (6)
websocket/crates/services/examples/project_service.rs (1)

1-14: LGTM! Well-organized imports.

The imports are logically grouped and align with the new architecture using bb8 for Redis connection pooling.

websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (5)

42-54: LGTM! Well-structured entry field parsing.

The implementation safely handles Redis bulk strings and properly filters invalid entries.


56-70: LGTM! Robust stream entry parsing.

Good validation and error handling for Redis stream entries.


109-135: LGTM! Improved state update handling with proper error cases.

The implementation properly handles empty entries and updates, with good error handling and logging.


190-202: LGTM! Well-structured merge updates implementation.

Good separation of concerns between locking and update execution, with proper handling of multiple updates.


Line range hint 280-290: LGTM! Well-implemented update pushing with proper logging.

Good use of debug logging and proper handling of optional fields.

@kasugamirai kasugamirai requested a review from pyshx November 8, 2024 18:51
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (4)
websocket/crates/services/examples/edit_session_service.rs (1)

Line range hint 1-139: Consider implementing connection pool metrics.

For better observability of the concurrency improvements, consider adding metrics collection for the connection pool:

  • Pool utilization
  • Wait times
  • Connection errors
  • Pool exhaustion events

This will help monitor the effectiveness of the concurrency control improvements.

websocket/crates/infra/src/persistence/redis/updates.rs (2)

43-47: LGTM! Debug logging enhances observability.

The added debug logging and empty update check improve troubleshooting capabilities and efficiency.

Consider using structured logging fields for better log aggregation:

-debug!("update: {:?}", update);
+debug!(update_id = ?update.stream_id, size = update.update.len(), "Processing update");

Line range hint 14-19: Consider Redis cluster scenarios in the connection management.

While the connection pooling is implemented well, consider adding support for Redis cluster scenarios:

  1. Handle cluster redirections
  2. Add retry policies for temporary failures
  3. Consider implementing circuit breakers for downstream Redis calls

Would you like me to provide example implementations for these patterns?

websocket/crates/infra/src/persistence/editing_session.rs (1)

254-255: Improve debug log messages.

The current debug messages like "3-------------" are not descriptive. Consider using more meaningful messages that provide context about what's being logged.

Replace with more descriptive messages:

-    debug!("3-------------");
-    debug!("snapshot: {:?}", snapshot);
+    debug!("Retrieved latest snapshot for project {}: {:?}", self.project_id, snapshot);
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 4a1115f and b81cb35.

📒 Files selected for processing (3)
  • websocket/crates/infra/src/persistence/editing_session.rs (10 hunks)
  • websocket/crates/infra/src/persistence/redis/updates.rs (6 hunks)
  • websocket/crates/services/examples/edit_session_service.rs (4 hunks)
🔇 Additional comments (9)
websocket/crates/services/examples/edit_session_service.rs (4)

1-17: LGTM: Dependencies align with architectural goals.

The addition of bb8 for connection pooling and yrs for collaborative editing supports the PR's objective of improving concurrency control.


32-39: ⚠️ Potential issue

Add error handling and pool configuration.

The Redis connection pool setup needs improvement in two areas:

  1. Error handling for pool creation
  2. Pool configuration parameters (as mentioned in previous review)

Apply this diff to add error handling:

 // Initialize Redis connection pool
-let manager = RedisConnectionManager::new(&*redis_url)?;
-let redis_pool = Pool::builder().build(manager).await?;
+let manager = RedisConnectionManager::new(&*redis_url)
+    .map_err(|e| {
+        error!("Failed to create Redis connection manager: {:?}", e);
+        e
+    })?;
+let redis_pool = Pool::builder()
+    .build(manager)
+    .await
+    .map_err(|e| {
+        error!("Failed to create Redis connection pool: {:?}", e);
+        e
+    })?;

96-119: ⚠️ Potential issue

Add resource cleanup for Y.js document.

While the Y.js implementation follows the previous error handling suggestion, it's missing proper resource cleanup.

Consider wrapping the document in a drop guard:

+struct DocGuard(Doc);
+
+impl Drop for DocGuard {
+    fn drop(&mut self) {
+        // Ensure all resources are properly freed
+        self.0.gc();
+    }
+}
+
-let doc = Doc::new();
+let doc = DocGuard(Doc::new());

135-139: Verify successful task removal.

The task removal should verify that all associated resources are properly cleaned up to prevent resource leaks.

✅ Verification successful

Let me analyze the implementation of RemoveTask further to understand the cleanup process better.


Let me check one more aspect to understand the task data structure and any associated resources.


Task removal implementation is properly handled with appropriate cleanup

The task removal process is correctly implemented with proper resource management:

  • Tasks are stored in a thread-safe HashMap using Arc<Mutex>
  • The RemoveTask command properly removes the task data from the map
  • Associated resources (client count, timestamps, etc.) are automatically cleaned up as they're part of the task data structure
  • The service has a background task that automatically ends editing sessions when conditions are met (no clients, timeout reached)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any cleanup or verification mechanisms in task removal
rg -A 5 "RemoveTask|cleanup|verify" --type rust

Length of output: 2794


Script:

#!/bin/bash
# Search for task-related data structures and their usage
ast-grep --pattern 'struct Task {
  $$$
}'

# Look for any task-related cleanup or resource management
rg -B 3 -A 5 "tasks\." --type rust

Length of output: 3674


Script:

#!/bin/bash
# Search for ManageProjectEditSessionTaskData structure and its implementation
rg -B 2 -A 10 "ManageProjectEditSessionTaskData" --type rust

# Look for any session cleanup related code
rg -B 2 -A 10 "end_editing_session" --type rust

Length of output: 15212

websocket/crates/infra/src/persistence/redis/updates.rs (2)

133-134: LGTM! Good use of spawn_blocking for CPU-intensive work.

The debug logging addition and proper use of spawn_blocking for CPU-intensive merge operations show good consideration for performance and observability.


12-12: Verify binary data handling across consumers.

The change from String to Vec<u8> in RedisStreamResult suggests a shift to binary data handling. While this is more efficient, we should ensure all consumers are updated to handle binary data correctly.

✅ Verification successful

Binary data handling is consistently implemented across the codebase

The type change from String to Vec<u8> in RedisStreamResult is properly aligned with the rest of the codebase. All related interfaces and implementations consistently use Vec<u8> for update data:

  • Service interfaces in project.rs and manage_project_edit_session.rs use Vec<u8> for update parameters
  • Redis stream processing in updates.rs correctly handles binary data with RedisStreamResult
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any remaining String-based update handling that might need conversion
rg -A 3 "String.*update.*redis" --type rust

Length of output: 45


Script:

#!/bin/bash
# Let's search for usages of RedisStreamResult and Vec<u8> in update handling
rg -A 5 "RedisStreamResult|Vec<u8>.*update" --type rust

# Also search for any Redis stream processing code
ast-grep --pattern 'fn $_ ($$$) -> Result<$_, $_> {
  $$$
  redis $$$
  $$$
}'

Length of output: 3170

websocket/crates/infra/src/persistence/editing_session.rs (3)

Line range hint 4-38: LGTM! Improved error handling with transparent error propagation.

The error handling improvements using #[error(transparent)] and From trait implementations provide better error context while simplifying error propagation.


72-74: LGTM! Improved type safety with explicit error handling in trait bounds.

The updated trait bounds properly constrain the error types and ensure type-safe error propagation throughout the implementation.

Also applies to: 107-108, 132-132, 148-148, 169-169, 188-188, 203-203


253-261: ⚠️ Potential issue

Critical: Handle missing snapshot case as suggested in previous review.

The code still assumes a snapshot exists when save_changes is true. While debug logs have been added, they don't address the core issue. As suggested in the previous review, we should handle the case where no snapshot exists.

Apply this fix as suggested in the previous review:

 if save_changes {
-    let snapshot = snapshot_repo.get_latest_snapshot(&self.project_id).await?;
-    debug!("3-------------");
-    debug!("snapshot: {:?}", snapshot);
+    let snapshot = snapshot_repo.get_latest_snapshot(&self.project_id).await?;
+
+    if snapshot.is_none() {
+        return Err(ProjectEditingSessionError::SnapshotNotFound(self.project_id.clone()));
+    }
+
     if let Some(mut snapshot) = snapshot {
         snapshot.data = state;
         snapshot.info.changes_by = edits;
         snapshot.metadata.name = Some(snapshot_name);
         snapshot_repo.update_latest_snapshot(snapshot).await?;
     }
 }

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (4)
websocket/crates/services/examples/project_service.rs (3)

27-28: Enhance environment variable handling.

The Redis URL fallback to localhost is good for development, but consider validating the URL format and providing more context in error messages for production use.

-let redis_url =
-    std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379/0".to_string());
+let redis_url = std::env::var("REDIS_URL")
+    .unwrap_or_else(|e| {
+        debug!("REDIS_URL not set ({}), using default", e);
+        "redis://localhost:6379/0".to_string()
+    });
+if !redis_url.starts_with("redis://") {
+    return Err("Invalid Redis URL format. Must start with 'redis://'".into());
+}

48-53: Use non-PII data in example code.

While this is example code, it's best practice to avoid using email-like patterns in examples as they might be mistaken for real PII in logs or documentation.

 let test_user = User {
     id: "user_123".to_string(),
-    email: "test@example.com".to_string(),
+    email: "user-123-email".to_string(),
     name: "Test User".to_string(),
     tenant_id: "tenant_123".to_string(),
 };

117-126: Consider implementing Drop for resource cleanup.

Multiple Doc instances are created but not explicitly cleaned up. While Rust's drop system will handle this, for larger applications, consider implementing explicit cleanup or using a resource pool.

websocket/crates/infra/src/persistence/editing_session.rs (1)

19-21: Add documentation for SessionLockGuard

The RAII-style lock implementation is good, but it would benefit from documentation explaining:

  • The purpose of the guard
  • When and how it should be used
  • The guarantees it provides for concurrent access

Add documentation like this:

+/// A RAII guard that automatically releases the session lock when dropped.
+/// Used to ensure exclusive access during critical operations.
 struct SessionLockGuard<'a> {
     _lock: tokio::sync::MutexGuard<'a, ()>,
 }

+/// Acquires the session lock, returning a guard that automatically releases the lock when dropped.
 async fn acquire_lock(&self) -> SessionLockGuard<'_> {

Also applies to: 56-60

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b81cb35 and 7d9de01.

📒 Files selected for processing (4)
  • websocket/crates/infra/src/persistence/editing_session.rs (11 hunks)
  • websocket/crates/infra/src/persistence/redis/connection.rs (1 hunks)
  • websocket/crates/infra/src/persistence/redis/errors.rs (1 hunks)
  • websocket/crates/services/examples/project_service.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • websocket/crates/infra/src/persistence/redis/errors.rs
🔇 Additional comments (6)
websocket/crates/infra/src/persistence/redis/connection.rs (2)

12-12: LGTM! Clean and idiomatic implementation.

The simplified implementation using the ? operator is more idiomatic Rust and correctly propagates connection pool errors.


10-10: LGTM! Verify error handling in dependent code.

The switch to using bb8::RunError directly is a good choice as it leverages the standard error type from the connection pooling library.

Let's verify that all consumers properly handle the new error type:

✅ Verification successful

Error handling changes are properly propagated

The switch to bb8::RunError<redis::RedisError> is properly handled throughout the codebase:

  • ProjectRepositoryError includes Pool(#[from] bb8::RunError<redis::RedisError>)
  • FlowProjectRedisDataManagerError includes PoolRunError(#[from] bb8::RunError<redis::RedisError>)
  • Application errors in app/src/errors.rs handle Redis errors appropriately

The error type change is well-integrated into the error handling hierarchy with proper error conversion implementations.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any remaining references to the old error type
rg "FlowProjectRedisDataManagerError" 

# Find all places where get_connection is called to verify error handling
ast-grep --pattern 'get_connection().await'

Length of output: 9021


Script:

#!/bin/bash
# Let's find actual usages of get_connection to verify error handling
ast-grep --pattern 'get_connection('

# Also search for bb8::RunError to see if it's properly handled
rg "bb8::RunError"

# Look for Redis error handling patterns
rg "RedisError"

Length of output: 1097

websocket/crates/services/examples/project_service.rs (1)

1-14: LGTM! Well-organized imports.

The imports are properly structured and include all necessary dependencies for Redis connection pooling and websocket functionality.

websocket/crates/infra/src/persistence/editing_session.rs (3)

4-6: LGTM: Improved error handling with transparent error propagation

The error handling improvements using #[error(transparent)] and #[from] traits provide better error context while simplifying error propagation. This change aligns with Rust best practices for error handling.

Also applies to: 16-17, 39-42


232-256: Enhance end_session implementation

The snapshot handling issue from the previous review is still valid. Additionally:

  1. Consider adding more context to debug logs
  2. The lock acquisition is good, but ensure proper error handling if the lock acquisition fails

Improve the debug logging:

-            debug!("state: {:?}", state);
+            debug!("Merged state for project {}: {:?}", self.project_id, state);
-            debug!("snapshot: {:?}", snapshot);
+            debug!("Latest snapshot for project {}: {:?}", self.project_id, snapshot);

Line range hint 147-165: Consider adding lock protection to get_state_update

The get_state_update method reads the current state but doesn't use the session lock. While Redis operations are atomic, consider adding lock protection to ensure consistency with other operations.

Add lock protection:

 pub async fn get_state_update<R>(
     &self,
     redis_data_manager: &R,
 ) -> Result<Vec<u8>, ProjectEditingSessionError>
 where
     R: RedisDataManagerImpl<Error = FlowProjectRedisDataManagerError>,
 {
     self.check_session_setup()?;
+    let _guard = self.acquire_lock().await;

     let current_state = redis_data_manager
         .get_current_state(&self.project_id, self.session_id.as_deref())
         .await?;

Comment on lines +107 to +110
if let Some(state) = service
.get_current_state(project_id, session.session_id.as_deref())
.await?
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error context for state operations.

The state retrieval and application operations could benefit from better error context and validation.

 if let Some(state) = service
     .get_current_state(project_id, session.session_id.as_deref())
-    .await?
+    .await
+    .map_err(|e| format!("Failed to get current state: {}", e))?
 {
     if !state.is_empty() {
         // Create a new doc to apply the state
         let doc = Doc::new();
-        let update = Update::decode_v2(&state).map_err(Box::new)?;
+        let update = Update::decode_v2(&state)
+            .map_err(|e| format!("Failed to decode state update: {}", e))?;
         doc.transact_mut().apply_update(update);

Also applies to: 116-120

Comment on lines +81 to +93
let doc = Doc::new();
let text = doc.get_or_insert_text("content");
let update = {
let mut txn = doc.transact_mut();
text.push(&mut txn, "Initial content");
txn.encode_update_v2()
};

debug!("Pushing initial update...");
service
.push_update_to_redis_stream(project_id, update, Some(test_user.name.clone()))
.await?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add size validation for document updates.

The updates are pushed to Redis without size validation, which could lead to memory issues with large documents.

+const MAX_UPDATE_SIZE: usize = 1024 * 1024; // 1MB limit
+
+fn validate_update_size(update: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
+    if update.len() > MAX_UPDATE_SIZE {
+        return Err(format!("Update size {} exceeds limit of {} bytes", 
+            update.len(), MAX_UPDATE_SIZE).into());
+    }
+    Ok(())
+}
+
 let update = {
     let mut txn = doc.transact_mut();
     text.push(&mut txn, "Initial content");
     txn.encode_update_v2()
 };
 
+validate_update_size(&update)?;
 debug!("Pushing initial update...");
 service
     .push_update_to_redis_stream(project_id, update, Some(test_user.name.clone()))
     .await?;

Also applies to: 95-104

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants