-
Notifications
You must be signed in to change notification settings - Fork 3
fix(websocket): use thiserror replace box err fix thread safety #533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes introduced in this pull request primarily focus on enhancing error handling across various components of the websocket project. A new dependency, Changes
Possibly related PRs
Suggested labels
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
✅ Deploy Preview for reearth-flow canceled.
|
22c1c83
to
0f4848b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🧹 Outside diff range and nitpick comments (11)
websocket/crates/infra/src/persistence/gcs/gcs_client.rs (2)
8-18
: LGTM: Well-structured custom error typeThe
GcsError
enum is well-designed and covers the main error scenarios for GCS operations. The use ofthiserror
attributes for error messages and automatic conversions is excellent.Consider adding a catch-all variant for unexpected errors:
#[error("Unexpected error: {0}")] Other(#[from] Box<dyn std::error::Error + Send + Sync>),This would allow for handling of any unforeseen error types that might occur during GCS operations.
33-33
: LGTM: Consistent error handling improvementsThe updates to the
upload
anddownload
methods' return types are good improvements. They now use the customGcsError
type, providing more specific error information and enhancing the overall error handling capabilities of theGcsClient
struct.For the
upload
method, consider adding a content type to theUploadObjectRequest
. This can be useful for clients consuming the uploaded data. For example:UploadObjectRequest { bucket: self.bucket.clone(), content_type: Some("application/json".to_string()), ..Default::default() }Also applies to: 53-53
websocket/crates/infra/src/persistence/redis/redis_client.rs (1)
14-20
: LGTM: Well-structured custom error typeThe
RedisClientError
enum is well-defined and properly encapsulates the potential errors that can occur in theRedisClient
. The use of#[from]
attributes allows for easy error conversion, which will simplify error handling throughout the code.Consider adding a generic
Other
variant to handle any unexpected errors:#[error("Unexpected error: {0}")] Other(String),This would make the error type more flexible for future extensions.
websocket/crates/infra/src/persistence/redis/flow_project_lock.rs (1)
32-32
: Good update to error handling inwith_lock
method.The changes to use
GlobalLockError
are consistent with the new error handling approach. The use ofmap_err(GlobalLockError::from)
is a clean way to convert the error type.A minor suggestion for improvement:
Consider using the
?
operator instead ofmap_err
for slightly more concise code:let lock = self .lock_manager .lock(&resource_bytes, duration_ms as usize) .await?;This works because of the
From<LockError> for GlobalLockError
implementation.Also applies to: 41-42
websocket/crates/domain/src/project.rs (2)
121-125
: Handle potential errors frommerge_updates
before creating a snapshotIn the
create_snapshot_internal
method,self.merge_updates().await?;
is called before creating a snapshot. Ensure that any errors from merging updates are appropriately handled or logged to prevent silent failures.Consider adding error handling or logging:
// Existing code self.merge_updates().await?; // Possible enhancement if let Err(e) = self.merge_updates().await { // Log the error or handle it accordingly return Err(ProjectEditingSessionError::Other(format!("Failed to merge updates: {}", e))); }
Populate
changes_by
field appropriately inProjectSnapshot
The
changes_by
field inProjectSnapshot
is currently initialized as an empty vector. Ensure it is populated with the relevant data to track users who have made changes.🔗 Analysis chain
Line range hint
125-142
: Populatechanges_by
field appropriately inProjectSnapshot
In the
create_snapshot_internal
method, thechanges_by
field inProjectSnapshot
is currently initialized as an empty vector. If this field is intended to track users who have made changes, ensure it is populated with the relevant data.If you intend to populate
changes_by
, verify whether the necessary information is available:
Apologies for the error in the previous script. Let's run a corrected version to search for assignments to
changes_by
in the codebase.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for usage of `changes_by` data in related code. # Search for assignments to `changes_by` in the codebase. rg --type rust 'changes_by' websocket/crates/domain/Length of output: 237
Script:
#!/bin/bash # Description: Check for usage of `changes_by` data in related code. # Search for assignments to `changes_by` in the codebase. rg --type rust 'changes_by' websocket/crates/domain/Length of output: 370
websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (5)
Line range hint
89-91
: Avoid usingunwrap()
to prevent potential panicsThe methods
encode_state_data
anddecode_state_data
useunwrap()
, which can cause the application to panic if serialization or deserialization fails. It's safer to handle these errors using theResult
type to propagate errors appropriately.Refactor the methods to return
Result
types:fn encode_state_data(data: Vec<u8>) -> Result<String, serde_json::Error> { - serde_json::to_string(&data).unwrap() + serde_json::to_string(&data) } fn decode_state_data(data_string: String) -> Result<Vec<u8>, serde_json::Error> { - serde_json::from_str(&data_string).unwrap() + serde_json::from_str(&data_string) }And update the places where these methods are called to handle the
Result
.Also applies to: 93-95
Line range hint
261-266
: Handle Redis command errors separatelyIn
set_state_data_internal
, both Redisset
commands are executed sequentially. If the firstset
succeeds and the second fails, the state could be left inconsistent. Consider using a Redis transaction (e.g.,MULTI
/EXEC
) to ensure atomicity of the operations.Refactor to use a Redis transaction for atomic operations:
async fn set_state_data_internal( &self, encoded_state_update: &str, updated_by_json: &str, ) -> Result<(), FlowProjectRedisDataManagerError> { let connection = self.redis_client.connection(); let mut connection_guard = connection.lock().await; + let mut pipe = redis::pipe(); + pipe.set(self.state_key(), encoded_state_update) + .ignore() + .set(self.state_updated_by_key(), updated_by_json) + .ignore(); + pipe.query_async(&mut *connection_guard).await?; - let _: () = connection_guard - .set(self.state_key(), encoded_state_update) - .await?; - let _: () = connection_guard - .set(self.state_updated_by_key(), updated_by_json) - .await?; Ok(()) }
Line range hint
90-91
: Potential inconsistency withunwrap()
in production codeUsing
unwrap()
in production code can cause panics and crash the application if an error occurs. Inencode_state_data
andset_state_data
, consider handling errors gracefully instead of unwrapping.Ensure that errors are propagated or handled appropriately instead of using
unwrap()
. Refer to the previous suggestion on adjustingencode_state_data
to return aResult
, and propagate the error handling toset_state_data
and related methods.Also applies to: 278-296
Line range hint
116-132
: Simplify error handling when unknown format is encounteredIn
get_flow_updates_from_stream
, if theformat
is not"json"
, an emptyFlowEncodedUpdate
is created. This may hide issues if an unexpected format is received. Consider returning an error or at least logging a warning.Modify the code to handle unknown formats explicitly:
if format == "json" { serde_json::from_str(value)? } else { - FlowEncodedUpdate { - update: String::new(), - updated_by: None, - } + return Err(FlowProjectRedisDataManagerError::Unknown(format!("Unsupported format: {}", format))); }
291-296
: Unnecessary cloning of project IDIn
set_state_data
, theproject_id
is cloned unnecessarily. Sinceproject_id
is aString
, and&self
is available, you can borrow it instead.Adjust the code to use a reference:
- let project_id = self.project_id.clone(); self.global_lock - .lock_state(&project_id, 5000, move |_lock_guard| { + .lock_state(&self.project_id, 5000, move |_lock_guard| { Box::pin(async move { self.set_state_data_internal(&encoded_state_update, &updated_by_json) .await?; Ok::<(), FlowProjectRedisDataManagerError>(()) }) })
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
⛔ Files ignored due to path filters (1)
websocket/Cargo.lock
is excluded by!**/*.lock
,!**/*.lock
📒 Files selected for processing (10)
- websocket/Cargo.toml (1 hunks)
- websocket/crates/domain/Cargo.toml (1 hunks)
- websocket/crates/domain/src/project.rs (6 hunks)
- websocket/crates/domain/src/repository.rs (1 hunks)
- websocket/crates/infra/Cargo.toml (1 hunks)
- websocket/crates/infra/src/persistence/gcs/gcs_client.rs (3 hunks)
- websocket/crates/infra/src/persistence/project_repository.rs (5 hunks)
- websocket/crates/infra/src/persistence/redis/flow_project_lock.rs (7 hunks)
- websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (16 hunks)
- websocket/crates/infra/src/persistence/redis/redis_client.rs (7 hunks)
🔇 Additional comments (24)
websocket/crates/domain/Cargo.toml (1)
19-19
: Excellent addition of thethiserror
crate!The inclusion of
thiserror
as a workspace dependency is a positive change that aligns well with the PR objectives. This addition will enable more robust and maintainable error handling across the project, potentially addressing the mentioned thread safety issues.Benefits of using
thiserror
:
- Simplified creation of custom error types
- Improved error messages and debugging
- Better integration with Rust's error handling ecosystem
This change is minimal and straightforward, which reduces the risk of unintended side effects while providing significant improvements to the project's error handling capabilities.
websocket/crates/infra/Cargo.toml (1)
33-33
: Excellent addition of thethiserror
crate!Adding
thiserror
as a workspace dependency is a great step towards improving error handling across the project. This change aligns well with the PR objectives of replacing box errors and enhancing thread safety.Benefits of this addition include:
- More expressive and type-safe error handling
- Simplified error creation and management
- Improved consistency in error handling across the codebase
This change sets a good foundation for the error handling improvements mentioned in the PR objectives.
websocket/Cargo.toml (1)
62-62
: Addition ofthiserror
dependency looks good.The addition of the
thiserror
crate (version 1.0.64) is a positive change for improving error handling in the project. This crate allows for more ergonomic and type-safe error definitions, which aligns well with the PR objective of replacing box errors and improving thread safety.A few points to consider:
- The version
1.0.64
is not the latest (current latest is 1.0.56 as of September 2024). However, it's a stable version, and the 1.0.x releases are generally backwards compatible.- Make sure to update the relevant code to utilize
thiserror
for defining custom error types.To ensure this addition doesn't conflict with existing error handling mechanisms and to find potential areas for improvement, let's run the following script:
This script will help us identify areas where
thiserror
can be applied and ensure it's not conflicting with existing implementations.✅ Verification successful
Addition of
thiserror
dependency is verified and appropriate.The
thiserror
crate is already utilized across multiple modules in the project, and adding it as a dependency inCargo.toml
ensures consistent and standardized error handling. The specified version1.0.64
aligns with the project's requirements and maintains compatibility with existing implementations.Next Steps:
- Ensure that all error definitions leverage
thiserror
for consistency.- Verify that there are no version conflicts in
Cargo.lock
related tothiserror
.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for existing error handling patterns and potential areas to apply thiserror # Test 1: Check for existing error types that could be replaced with thiserror echo "Existing error types that could potentially use thiserror:" rg --type rust -e 'struct.*Error' -e 'enum.*Error' # Test 2: Check for usage of std::error::Error trait echo "\nUsage of std::error::Error trait:" rg --type rust -e 'impl.*std::error::Error' # Test 3: Check for Box<dyn Error> usage which could be replaced echo "\nUsage of Box<dyn Error> which could potentially be replaced:" rg --type rust -e 'Box<dyn.*Error>' # Test 4: Check if thiserror is already imported anywhere (shouldn't be if this is a new addition) echo "\nExisting imports of thiserror (should be none if this is a new addition):" rg --type rust -e 'use.*thiserror'Length of output: 12542
websocket/crates/infra/src/persistence/gcs/gcs_client.rs (2)
6-6
: LGTM: Appropriate use of thiserror crateThe addition of the
thiserror
crate is a good choice for implementing custom error types. This aligns well with the PR objective to improve error handling in the websocket implementation.
27-27
: LGTM: Improved error handling in new methodThe update to the
new
method's return type is a good improvement. It now uses the customGcsError
type, which provides more specific error information. This change enhances the error handling capabilities of theGcsClient
struct.websocket/crates/infra/src/persistence/redis/redis_client.rs (8)
5-5
: LGTM: Appropriate use of thiserror crateThe addition of
use thiserror::Error;
is appropriate for defining custom error types. This aligns well with the PR objective of improving error handling.
23-23
: LGTM: Consistent error handling in constructorThe
new
method signature has been appropriately updated to use the newRedisClientError
type. This change ensures consistent error handling throughout theRedisClient
struct.
36-36
: LGTM: Improved error handling in set methodThe
set
method now returns aResult
with theRedisClientError
type, which is consistent with the new error handling strategy. This change improves type safety and error clarity.
45-45
: LGTM: Consistent error handling in get methodThe
get
method signature has been updated to useRedisClientError
, maintaining consistency with the new error handling approach. This change enhances the method's error reporting capabilities.
54-54
: LGTM: Improved error handling in keys methodThe
keys
method now returns aResult
with theRedisClientError
type, aligning with the new error handling strategy. This change provides more specific error information.
65-65
: LGTM: Consistent error handling in xadd methodThe
xadd
method signature has been appropriately updated to use theRedisClientError
type, ensuring consistency with the new error handling approach across theRedisClient
struct.
75-75
: LGTM: Consistent error handling across Redis stream operationsThe
xread
,xtrim
, andxdel
methods have all been updated to useRedisClientError
in their return types. This change ensures consistent error handling across all Redis stream operations in theRedisClient
struct.Also applies to: 81-81, 87-87
Line range hint
1-96
: Overall assessment: Significant improvement in error handlingThe changes made to this file represent a substantial improvement in error handling for the
RedisClient
struct. By introducing a customRedisClientError
type and consistently applying it across all public methods, the code now provides more specific and informative error reporting. This aligns perfectly with the PR's objective of enhancing error handling and indirectly contributes to improved thread safety by providing clearer error boundaries.Key improvements:
- Introduction of a custom error type using
thiserror
.- Consistent use of
Result<T, RedisClientError>
in all public method signatures.- Proper error propagation using the
?
operator throughout the implementation.These changes will make it easier for consumers of the
RedisClient
to handle errors more effectively and will improve the overall robustness of the code.websocket/crates/infra/src/persistence/redis/flow_project_lock.rs (5)
2-3
: Excellent use ofthiserror
for custom error handling!The introduction of
GlobalLockError
using thethiserror
crate is a great improvement. It provides:
- More specific error types for the module.
- Automatic
Display
trait implementation for better error messages.- Easy conversion from
LockError
toGlobalLockError
with theFrom
implementation.This change enhances error handling and improves code readability.
Also applies to: 5-13
55-55
: Consistent error type update inlock_state
method.The change in return type to
Result<T, GlobalLockError>
is consistent with the new error handling approach and correctly propagates the new error type to the public API of the struct.
69-69
: Consistent error type update inlock_updates
method.The change in return type to
Result<T, GlobalLockError>
maintains consistency with the new error handling approach across the struct's public API.
89-89
: Consistent error type update inlock_snapshots
method.The change in return type to
Result<T, GlobalLockError>
continues the consistent application of the new error handling approach across the struct's public API.
Line range hint
1-124
: Overall, excellent improvements to error handling and type safety.The changes in this file demonstrate a consistent and well-thought-out approach to improving error handling:
- Introduction of
GlobalLockError
usingthiserror
provides more specific and informative error types.- Consistent update of all public method signatures to return
Result<T, GlobalLockError>
.- Updated error handling in the
with_lock
method to use the new error type.These changes bring several benefits:
- Improved type safety by using a more specific error type.
- Better error reporting and handling capabilities.
- Consistent error handling approach across the entire struct.
Great job on enhancing the robustness and maintainability of this code!
websocket/crates/infra/src/persistence/project_repository.rs (4)
1-2
: Appropriate inclusion of specific error typesThe addition of
GcsError
andRedisClientError
enhances error handling by allowing more granular error information from the GCS and Redis clients.
10-10
: Correct usage ofthiserror::Error
for custom error definitionsIncluding
thiserror::Error
is appropriate for deriving error types that implement theError
trait, simplifying custom error management.
12-20
: Well-defined custom error enumProjectRepositoryError
Defining
ProjectRepositoryError
with variants for Redis, GCS, and serialization errors improves error specificity and readability. This allows for more precise error handling in repository operations.
33-40
: Consistent error handling in repository implementationsThe method signatures have been correctly updated to return
ProjectRepositoryError
, enhancing error specificity and type safety across the repository implementations. Ensure that all internal error propagations use the updated error type for consistency.Also applies to: 85-110
websocket/crates/domain/src/project.rs (2)
34-43
: Good use ofthiserror
for defining custom error typesIntroducing
ProjectEditingSessionError
with thethiserror
crate enhances error handling by providing specific, descriptive error variants. This approach improves error management by making it clearer and more maintainable.
156-158
: Implement logic for ending the session inend_session
methodThe
end_session
method currently contains a placeholder comment// Logic to end the session
. Ensure that the necessary steps to properly end a session are implemented, such as releasing resources or updating session states.To ensure that the session-ending logic is implemented, check related code:
#!/bin/bash # Description: Verify implementation of session-ending logic. # Search for implementations related to ending sessions. rg --type rust 'end_session' websocket/crates/domain/
@@ -112,7 +107,7 @@ impl FlowProjectLock { | |||
project_id: &str, | |||
duration_ms: u64, | |||
callback: F, | |||
) -> Result<T, LockError> | |||
) -> Result<T, GlobalLockError> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consistent error type update in lock_session
method, but potential for refactoring.
The change in return type to Result<T, GlobalLockError>
maintains consistency with the new error handling approach across the struct's public API.
However, I noticed that the implementation of lock_session
is identical to lock_snapshots
.
Consider refactoring to eliminate this duplication. You could create a private method that both lock_snapshots
and lock_session
call, or use a macro to generate these methods if they need to remain separate for API reasons.
Example refactor:
async fn lock_all<F, T>(&self, project_id: &str, duration_ms: u64, callback: F) -> Result<T, GlobalLockError>
where
F: FnOnce(&LockGuard) -> T + Send,
T: Send,
{
let state_lock_key = format!("{}:locks:state", project_id);
let updates_lock_key = format!("{}:locks:updates", project_id);
let snapshots_lock_key = format!("{}:locks:snapshots", project_id);
self.with_lock(
vec![state_lock_key, updates_lock_key, snapshots_lock_key],
duration_ms,
callback,
)
.await
}
pub async fn lock_snapshots<F, T>(&self, project_id: &str, duration_ms: u64, callback: F) -> Result<T, GlobalLockError>
where
F: FnOnce(&LockGuard) -> T + Send,
T: Send,
{
self.lock_all(project_id, duration_ms, callback).await
}
pub async fn lock_session<F, T>(&self, project_id: &str, duration_ms: u64, callback: F) -> Result<T, GlobalLockError>
where
F: FnOnce(&LockGuard) -> T + Send,
T: Send,
{
self.lock_all(project_id, duration_ms, callback).await
}
This refactoring would reduce code duplication and make future changes easier to manage.
pub trait ProjectRepository<E: Error + Send + Sync> { | ||
async fn get_project(&self, project_id: &str) -> Result<Option<Project>, E>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider Defining a Concrete Error Type Instead of Using a Generic Type Parameter
Using a generic type parameter E
for errors in your trait definitions increases complexity and can make implementations more cumbersome, as every implementer must specify the error type. It might be more practical to define a concrete error type specific to your domain, such as DomainError
, leveraging the thiserror
crate to derive error implementations. This approach can simplify your trait definitions and make error handling more consistent across the codebase.
Also applies to: 11-17, 21-24
session: ProjectEditingSession, | ||
) -> Result<(), ProjectRepositoryError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid unwrapping Option
values to prevent potential panics
In the create_session
and update_session
methods, unwrapping session.session_id
can cause a panic if session_id
is None
. It's safer to handle the Option
properly to avoid runtime panics.
Apply this diff to handle the None
case gracefully:
async fn create_session(
&self,
session: ProjectEditingSession,
) -> Result<(), ProjectRepositoryError> {
- let key = format!("session:{}", session.session_id.as_ref().unwrap());
+ let session_id = match &session.session_id {
+ Some(id) => id,
+ None => {
+ return Err(ProjectRepositoryError::InvalidSessionId);
+ }
+ };
+ let key = format!("session:{}", session_id);
self.redis_client.set(key, &session).await?;
Ok(())
}
async fn update_session(
&self,
session: ProjectEditingSession,
) -> Result<(), ProjectRepositoryError> {
- let key = format!("session:{}", session.session_id.as_ref().unwrap());
+ let session_id = match &session.session_id {
+ Some(id) => id,
+ None => {
+ return Err(ProjectRepositoryError::InvalidSessionId);
+ }
+ };
+ let key = format!("session:{}", session_id);
self.redis_client.set(key, &session).await?;
Ok(())
}
You'll need to add a new variant to ProjectRepositoryError
:
#[derive(Error, Debug)]
pub enum ProjectRepositoryError {
#[error("Redis error: {0}")]
Redis(#[from] RedisClientError),
#[error("GCS error: {0}")]
Gcs(#[from] GcsError),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
+ #[error("Invalid session ID")]
+ InvalidSessionId,
}
Also applies to: 64-67
fn new(client: GcsClient) -> Self { | ||
Self { client } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the new
method public for consistent access
The new
method in ProjectGcsRepository
is currently private. If it's intended to be constructed outside of its module (as with ProjectRedisRepository
), consider making it public for consistency.
Apply this diff to make the constructor public:
- fn new(client: GcsClient) -> Self {
+ pub fn new(client: GcsClient) -> Self {
Self { client }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn new(client: GcsClient) -> Self { | |
Self { client } | |
} | |
pub fn new(client: GcsClient) -> Self { | |
Self { client } | |
} |
pub async fn start_or_join_session<E: Error + Send + Sync>( | ||
&mut self, | ||
snapshot_repo: &impl ProjectSnapshotRepository, | ||
) -> Result<String, Box<dyn Error>> { | ||
snapshot_repo: &impl ProjectSnapshotRepository<E>, | ||
) -> Result<String, ProjectEditingSessionError<E>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify method signatures by parameterizing the impl
block
Currently, each method that requires E
is individually parameterized with <E: Error + Send + Sync>
. You can simplify the code by parameterizing the entire impl
block, reducing repetition and improving readability.
Implement this change by adjusting the impl
block:
-impl ProjectEditingSession {
+impl<E: Error + Send + Sync> ProjectEditingSession {
pub async fn start_or_join_session(
&mut self,
snapshot_repo: &impl ProjectSnapshotRepository<E>,
) -> Result<String, ProjectEditingSessionError<E>> {
// method body
}
// Update other methods similarly
}
This way, you declare the generic parameter E
once at the impl
level, and all methods within the block can use it without redeclaring.
Also applies to: 71-74, 80-82, 88-90, 96-100, 106-111, 121-125, 156-158
pub enum ProjectEditingSessionError<E: Error + Send + Sync> { | ||
#[error("Session not setup")] | ||
SessionNotSetup, | ||
#[error("Snapshot repository error: {0}")] | ||
SnapshotRepositoryError(#[from] E), | ||
#[error("Other error: {0}")] | ||
Other(String), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider removing the generic parameter E
from ProjectEditingSessionError
Using a generic parameter E
in ProjectEditingSessionError<E>
can complicate error handling, especially when matching on error variants or when the error type isn't needed for all variants. It may be more ergonomic to define ProjectEditingSessionError
without the generic parameter and box the inner errors as needed.
You can modify the error enum as follows:
#[derive(Error, Debug)]
-pub enum ProjectEditingSessionError<E: Error + Send + Sync> {
+pub enum ProjectEditingSessionError {
#[error("Session not setup")]
SessionNotSetup,
#[error("Snapshot repository error: {0}")]
- SnapshotRepositoryError(#[from] E),
+ SnapshotRepositoryError(#[from] Box<dyn Error + Send + Sync>),
#[error("Other error: {0}")]
Other(String),
}
This change simplifies the error type and avoids propagating the generic parameter throughout your code.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub enum ProjectEditingSessionError<E: Error + Send + Sync> { | |
#[error("Session not setup")] | |
SessionNotSetup, | |
#[error("Snapshot repository error: {0}")] | |
SnapshotRepositoryError(#[from] E), | |
#[error("Other error: {0}")] | |
Other(String), | |
} | |
#[derive(Error, Debug)] | |
pub enum ProjectEditingSessionError { | |
#[error("Session not setup")] | |
SessionNotSetup, | |
#[error("Snapshot repository error: {0}")] | |
SnapshotRepositoryError(#[from] Box<dyn Error + Send + Sync>), | |
#[error("Other error: {0}")] | |
Other(String), | |
} |
pub async fn get_state_update<E: Error + Send + Sync>( | ||
&self, | ||
) -> Result<Vec<u8>, ProjectEditingSessionError<E>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider returning a reference to avoid cloning in get_state_update
In the get_state_update
method, if the state data is large, returning a reference might be more efficient than returning a cloned Vec<u8>
.
Modify the method signature and return type:
- pub async fn get_state_update<E: Error + Send + Sync>(
+ pub async fn get_state_update<E: Error + Send + Sync>(
&self,
-) -> Result<Vec<u8>, ProjectEditingSessionError<E>> {
+) -> Result<&[u8], ProjectEditingSessionError<E>> {
self.check_session_setup()?;
// Logic to get the state update
- Ok(vec![])
+ Ok(&self.state_data)
}
Ensure that self.state_data
is defined appropriately and has the correct lifetime.
Committable suggestion was skipped due to low confidence.
pub async fn push_update<E: Error + Send + Sync>( | ||
&self, | ||
_update: Vec<u8>, | ||
_updated_by: Option<String>, | ||
) -> Result<(), Box<dyn Error>> { | ||
) -> Result<(), ProjectEditingSessionError<E>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Error Handling Not Implemented in push_update
Method
- The
push_update
method only propagates errors fromcheck_session_setup
but lacks error handling for the update logic.
🔗 Analysis chain
Ensure proper error handling in push_update
method
The push_update
method currently has a placeholder comment // Logic to push an update
. Ensure that any errors arising from this logic are correctly propagated using the ProjectEditingSessionError
variants.
To confirm that errors are handled appropriately, you can search for proper error handling in the method:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that errors in `push_update` are correctly propagated.
# Search for error handling in the `push_update` method.
rg --type rust -A5 -B5 'async fn push_update' websocket/crates/domain/src/project.rs
Length of output: 412
update: Vec<u8>, | ||
updated_by: String, | ||
) -> Result<(), FlowProjectRedisDataManagerError> { | ||
let update_data = FlowEncodedUpdate { | ||
update: Self::encode_state_data(update), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Possible performance improvement by batching Redis commands
In push_update
, multiple Redis commands (xadd
and set
) are sent separately. Consider batching these operations using a pipeline to reduce network overhead.
Use a Redis pipeline to batch commands:
let connection = self.redis_client.connection();
let mut connection_guard = connection.lock().await;
-let _: () = connection_guard
- .xadd(self.state_updates_key(), "*", fields)
- .await?;
-
-let timestamp = chrono::Utc::now().timestamp().to_string();
-let _: () = connection_guard
- .set(self.last_updated_at_key(), ×tamp)
- .await?;
+let timestamp = chrono::Utc::now().timestamp().to_string();
+
+let mut pipe = redis::pipe();
+pipe.cmd("XADD")
+ .arg(self.state_updates_key())
+ .arg("*")
+ .arg(fields)
+ .ignore()
+ .set(self.last_updated_at_key(), ×tamp)
+ .ignore();
+pipe.query_async(&mut *connection_guard).await?;
Ok(())
Committable suggestion was skipped due to low confidence.
async fn lock_and_execute_merge_updates( | ||
&self, | ||
) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> { | ||
self.global_lock | ||
.lock_updates(&self.project_id, 5000, |_| { | ||
Box::pin(async move { Ok::<(), BoxError>(()) }) | ||
Box::pin(async move { Ok::<(), FlowProjectRedisDataManagerError>(()) }) | ||
}) | ||
.await | ||
.map_err(|e| Box::new(GlobalLockError(e)) as BoxError)? | ||
.map_err(FlowProjectRedisDataManagerError::from)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing execution of execute_merge_updates
within the lock
In lock_and_execute_merge_updates
, the method acquires a lock using global_lock.lock_updates
but does not execute the execute_merge_updates
function within the lock. This means that execute_merge_updates
is not being called, which likely causes the merge operations to be skipped when locking is required.
To fix this issue, ensure that execute_merge_updates
is called after acquiring the lock.
Apply this diff to correct the issue:
async fn lock_and_execute_merge_updates(
&self,
) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> {
self.global_lock
.lock_updates(&self.project_id, 5000, |_| {
- Box::pin(async move { Ok::<(), FlowProjectRedisDataManagerError>(()) })
+ let this = self.clone();
+ Box::pin(async move { this.execute_merge_updates().await })
})
.await
.map_err(FlowProjectRedisDataManagerError::from)?
- .await?;
-
- self.execute_merge_updates().await
+ .await
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn lock_and_execute_merge_updates( | |
&self, | |
) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> { | |
self.global_lock | |
.lock_updates(&self.project_id, 5000, |_| { | |
Box::pin(async move { Ok::<(), BoxError>(()) }) | |
Box::pin(async move { Ok::<(), FlowProjectRedisDataManagerError>(()) }) | |
}) | |
.await | |
.map_err(|e| Box::new(GlobalLockError(e)) as BoxError)? | |
.map_err(FlowProjectRedisDataManagerError::from)? | |
async fn lock_and_execute_merge_updates( | |
&self, | |
) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> { | |
self.global_lock | |
.lock_updates(&self.project_id, 5000, |_| { | |
let this = self.clone(); | |
Box::pin(async move { this.execute_merge_updates().await }) | |
}) | |
.await | |
.map_err(FlowProjectRedisDataManagerError::from)? | |
.await | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (2)
34-50
: Approval: Well-structured custom error typeThe
FlowProjectRedisDataManagerError
enum is well-defined and covers a good range of potential error scenarios. The use ofthiserror
attributes for custom error messages is excellent.Consider adding a
#[from]
attribute to theUnknown
variant to allow for easy conversion fromString
errors:#[error("Unknown error: {0}")] -Unknown(String), +Unknown(#[from] String),This would allow you to use
?
operator withString
errors as well.
438-446
: Approval: Improved error handling with a suggestionThe
lock_and_execute_merge_updates
method has been correctly updated to use the newFlowProjectRedisDataManagerError
type. The error handling is consistent with the new approach.Consider simplifying the closure by removing the unnecessary
Result
wrapping:self.global_lock .lock_updates(&self.project_id, 5000, |_| { - Box::pin(async move { Ok::<(), FlowProjectRedisDataManagerError>(()) }) + Box::pin(async move { Ok(()) }) }) .await .map_err(FlowProjectRedisDataManagerError::from)? .await?;This change maintains the same behavior while reducing unnecessary code.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
- websocket/crates/infra/src/persistence/gcs/gcs_client.rs (3 hunks)
- websocket/crates/infra/src/persistence/project_repository.rs (7 hunks)
- websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (16 hunks)
- websocket/crates/infra/src/persistence/redis/redis_client.rs (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- websocket/crates/infra/src/persistence/gcs/gcs_client.rs
- websocket/crates/infra/src/persistence/project_repository.rs
- websocket/crates/infra/src/persistence/redis/redis_client.rs
🔇 Additional comments (10)
websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs (10)
5-5
: Approval: Good choice for error handlingThe addition of the
thiserror
crate is a positive change. It will allow for more expressive and maintainable custom error types, which aligns well with Rust's strong error handling practices.
108-113
: Approval: Consistent error type updateThe method signature has been correctly updated to use the new
FlowProjectRedisDataManagerError
type. This change is consistent with the improved error handling approach and maintains the existing logic.
116-118
: Approval: Consistent error type updateThe method signature for
get_flow_updates_from_stream
has been correctly updated to use the newFlowProjectRedisDataManagerError
type. This change aligns with the improved error handling strategy while preserving the existing functionality.
145-145
: Approval: Consistent error type updateThe method signature for
get_merged_update_from_stream
has been appropriately updated to use the newFlowProjectRedisDataManagerError
type. This change is in line with the improved error handling approach and maintains the existing functionality.
169-171
: Approval: Consistent error type updateThe method signature for
get_state_update_in_redis
has been correctly updated to use the newFlowProjectRedisDataManagerError
type. This change aligns with the improved error handling strategy while preserving the existing functionality.
180-186
: Approval: Improved error handlingThe
active_editing_session_id
method has been updated correctly to use the newFlowProjectRedisDataManagerError
type. The use ofmap_err
to convert the Redis error to the custom error type is a good practice and aligns well with Rust's error handling patterns.
189-191
: Approval: Consistent error type updateThe method signature for
get_current_state_update
has been appropriately updated to use the newFlowProjectRedisDataManagerError
type. This change is in line with the improved error handling approach and maintains the existing functionality.
208-210
: Approval: Consistent error type updateThe method signature for
get_current_state_updated_by
has been correctly updated to use the newFlowProjectRedisDataManagerError
type. This change aligns with the improved error handling strategy while preserving the existing functionality.
Line range hint
225-237
: Approval: Improved error handling and clarityThe
start_editing_session
method has been updated correctly to use the newFlowProjectRedisDataManagerError
type. The use of the specificEditingSessionInProgress
error variant enhances error clarity and makes it easier for callers to handle this particular error case.
Line range hint
1-450
: Summary: Comprehensive improvement in error handlingThe changes in this file represent a significant improvement in error handling for the
FlowProjectRedisDataManager
. The introduction of theFlowProjectRedisDataManagerError
enum usingthiserror
provides more expressive and context-rich error types. The consistent update of method signatures and error handling across the file enhances the overall robustness of the code.These changes will make it easier for callers to handle specific error cases and should improve the maintainability of the codebase. The refactoring maintains the existing logic while focusing on enhancing the error handling capabilities.
Great job on improving the error handling in this module!
This reverts commit 6908273.
Overview
What I've done
What I haven't done
How I tested
Screenshot
Which point I want you to review particularly
Memo
Summary by CodeRabbit
New Features
thiserror
crate to improve error management capabilities.Bug Fixes
Documentation
GcsClient
,ProjectRepository
, andRedisClient
.