-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(websocket): add local storage support for snapshots #505
Conversation
WalkthroughThe changes introduced in this pull request include updates to the Changes
Possibly related PRs
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
✅ Deploy Preview for reearth-flow canceled.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Outside diff range and nitpick comments (3)
websocket/crates/infra/src/persistence/local_storage/mod.rs (1)
1-2
: Consider adding module-level documentation for clarity.Including Rustdoc comments for the
local_storage
module and the re-exportedLocalClient
will enhance readability and help other developers understand the purpose and usage of this component.websocket/crates/infra/src/persistence/local_storage/local_client.rs (2)
128-169
: Improve Test Reliability by Eliminating Fixed DelaysThe
test_concurrent_access
test usestokio::time::sleep
to simulate concurrent access, which can lead to flaky tests due to timing issues. Relying on fixed delays may not guarantee that the upload and download operations overlap as intended.Use synchronization primitives like
tokio::sync::Barrier
to coordinate the concurrent tasks more reliably:use tokio::sync::Barrier; #[tokio::test] async fn test_concurrent_access() -> io::Result<()> { // ... let barrier = Arc::new(Barrier::new(2)); let upload_task = { let client = client.clone(); let barrier = barrier.clone(); tokio::spawn(async move { barrier.wait().await; client .upload("concurrent_test.json".to_string(), &test_data) .await }) }; let download_task = { let client = client.clone(); let barrier = barrier.clone(); tokio::spawn(async move { barrier.wait().await; client .download::<TestData>("concurrent_test.json".to_string()) .await }) }; upload_task.await??; let downloaded_data = download_task.await??; assert_eq!(test_data, downloaded_data); Ok(()) }This approach ensures both tasks start concurrently without relying on arbitrary delays.
85-89
: Simplify Test Code by DerivingClone
forTestData
In the tests,
TestData
instances are manually recreated instead of cloned, leading to code duplication.Derive the
Clone
trait forTestData
to enable easy cloning:#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[derive(Clone)] struct TestData { field1: String, field2: i32, }
Then, you can clone the existing instance:
let test_data_clone = test_data.clone();This reduces code repetition and makes the test code cleaner.
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files ignored due to path filters (1)
websocket/Cargo.lock
is excluded by!**/*.lock
,!**/*.lock
Files selected for processing (7)
- websocket/Cargo.toml (2 hunks)
- websocket/crates/domain/src/project.rs (1 hunks)
- websocket/crates/infra/Cargo.toml (1 hunks)
- websocket/crates/infra/src/persistence/local_storage/local_client.rs (1 hunks)
- websocket/crates/infra/src/persistence/local_storage/mod.rs (1 hunks)
- websocket/crates/infra/src/persistence/mod.rs (1 hunks)
- websocket/crates/infra/src/persistence/project_repository.rs (2 hunks)
Files skipped from review due to trivial changes (4)
- websocket/Cargo.toml
- websocket/crates/domain/src/project.rs
- websocket/crates/infra/Cargo.toml
- websocket/crates/infra/src/persistence/mod.rs
Additional comments not posted (1)
websocket/crates/infra/src/persistence/project_repository.rs (1)
92-93
: Question on Necessity ofArc
aroundLocalClient
In
ProjectLocalRepository
, theclient
field is wrapped in anArc
:client: Arc<LocalClient>,If
ProjectLocalRepository
is not being shared across threads or does not require multiple ownership, theArc
may be unnecessary. Removing it can simplify the code and reduce overhead.Please verify if
Arc
is needed in this context. If not, consider refactoring to useLocalClient
directly.
fn get_full_path(&self, path: &str) -> PathBuf { | ||
self.base_path.join(path) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Security Risk: Path Traversal Vulnerability
The get_full_path
method directly joins user-provided paths without sanitization. This could allow for path traversal attacks, where an attacker provides input like "../../sensitive_file"
to access or modify files outside the intended directory.
To mitigate this risk, sanitize the input path to prevent directory traversal:
fn get_full_path(&self, path: &str) -> PathBuf {
let sanitized_path = Path::new(path)
.components()
.filter(|c| matches!(c, std::path::Component::Normal(_)))
.collect::<PathBuf>();
self.base_path.join(sanitized_path)
}
This code filters out any .
or ..
components, ensuring that the path remains within the base_path
directory.
async fn lock_file(&self, path: &Path) { | ||
let mut locks = self.file_locks.lock().await; | ||
locks.entry(path.to_path_buf()).or_insert(()); | ||
} | ||
|
||
async fn unlock_file(&self, path: &PathBuf) { | ||
let mut locks = self.file_locks.lock().await; | ||
locks.remove(path); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improper File Locking Mechanism Leading to Race Conditions
The current implementation of lock_file
and unlock_file
does not provide actual synchronization for file access. Merely inserting and removing entries in a HashMap
does not prevent concurrent access to the same file, potentially causing data races and inconsistent data states.
To properly synchronize file access, consider using per-file mutexes:
pub struct LocalClient {
base_path: PathBuf,
- file_locks: Mutex<HashMap<PathBuf, ()>>,
+ file_locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
}
impl LocalClient {
// ...
- async fn lock_file(&self, path: &Path) {
+ async fn lock_file(&self, path: &Path) -> Arc<Mutex<()>> {
let mut locks = self.file_locks.lock().await;
- locks.entry(path.to_path_buf()).or_insert(());
+ let lock = locks
+ .entry(path.to_path_buf())
+ .or_insert_with(|| Arc::new(Mutex::new(())))
+ .clone();
+ drop(locks); // Release the lock on the HashMap before acquiring the file lock
+ lock.lock().await;
+ lock
}
- async fn unlock_file(&self, path: &PathBuf) {
- let mut locks = self.file_locks.lock().await;
- locks.remove(path);
- }
// ...
}
// Update the `upload` method:
pub async fn upload<T: Serialize + Send + Sync>(
&self,
path: String,
data: &T,
) -> io::Result<()> {
let full_path = self.get_full_path(&path);
- self.lock_file(&full_path).await;
+ let file_lock = self.lock_file(&full_path).await;
let result = async {
// Existing code...
}
.await;
// The file lock will be released when `file_lock` goes out of scope
- self.unlock_file(&full_path).await;
result
}
// Similarly update the `download` method
This ensures that each file access is protected by its own mutex, preventing simultaneous reads or writes to the same file.
Committable suggestion was skipped due to low confidence.
use tokio::test; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect Test Annotations for Asynchronous Tests
The test functions are asynchronous but are annotated with #[test]
. To run async tests with Tokio, you should use #[tokio::test]
instead. Additionally, the import use tokio::test;
is unnecessary.
Apply this diff to fix the test annotations and remove the unnecessary import:
- use tokio::test;
+ // The above import is unnecessary and can be removed.
#[test]
- async fn test_get_project_non_existent() -> Result<(), Box<dyn std::error::Error>> {
+ #[tokio::test]
+ async fn test_get_project_non_existent() -> Result<(), Box<dyn std::error::Error>> {
#[test]
- async fn test_get_project_existing() -> Result<(), Box<dyn std::error::Error>> {
+ #[tokio::test]
+ async fn test_get_project_existing() -> Result<(), Box<dyn std::error::Error>> {
#[test]
- async fn test_create_and_get_snapshot() -> Result<(), Box<dyn std::error::Error>> {
+ #[tokio::test]
+ async fn test_create_and_get_snapshot() -> Result<(), Box<dyn std::error::Error>> {
#[test]
- async fn test_get_latest_snapshot_state() -> Result<(), Box<dyn std::error::Error>> {
+ #[tokio::test]
+ async fn test_get_latest_snapshot_state() -> Result<(), Box<dyn std::error::Error>> {
Ensure that all async test functions are annotated with #[tokio::test]
to properly utilize the Tokio runtime.
Also applies to: 172-172, 180-180, 199-199, 216-216
#[async_trait] | ||
impl ProjectRepository for ProjectLocalRepository { | ||
async fn get_project( | ||
&self, | ||
project_id: &str, | ||
) -> Result<Option<Project>, Box<dyn std::error::Error>> { | ||
let path = format!("projects/{}", project_id); | ||
match self.client.download::<Project>(path).await { | ||
Ok(project) => Ok(Some(project)), | ||
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), | ||
Err(e) => Err(Box::new(e)), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use More Specific Error Types Instead of Box<dyn std::error::Error>
Currently, the methods return Result<..., Box<dyn std::error::Error>>
, which can make error handling less precise. Using a concrete error type or a custom error enum can improve error clarity and make handling errors more manageable.
Consider defining a custom error type for your repository operations. This can encapsulate different kinds of errors (e.g., I/O errors, serialization errors) and provide more context.
Example:
#[derive(Debug)]
pub enum RepositoryError {
IoError(io::Error),
SerializationError(serde_json::Error),
// Add other variants as needed
}
impl std::fmt::Display for RepositoryError {
// Implement display formatting
}
impl Error for RepositoryError {
// Implement source error retrieval if necessary
}
Then, adjust your method signatures:
- ) -> Result<Option<Project>, Box<dyn std::error::Error>> {
+ ) -> Result<Option<Project>, RepositoryError> {
And handle errors accordingly within your methods.
|
||
pub struct ProjectLocalRepository { | ||
client: Arc<LocalClient>, | ||
} | ||
|
||
impl ProjectLocalRepository { | ||
pub async fn new(base_path: PathBuf) -> io::Result<Self> { | ||
Ok(Self { | ||
client: Arc::new(LocalClient::new(base_path).await?), | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider Implementing Clone
for ProjectLocalRepository
If instances of ProjectLocalRepository
need to be cloned, especially when shared across different parts of your application, consider deriving the Clone
trait. This is particularly useful if LocalClient
implements Clone
.
Apply this diff to derive Clone
:
+#[derive(Clone)]
pub struct ProjectLocalRepository {
client: Arc<LocalClient>,
}
This change allows for easier duplication of the repository instance when needed.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub struct ProjectLocalRepository { | |
client: Arc<LocalClient>, | |
} | |
impl ProjectLocalRepository { | |
pub async fn new(base_path: PathBuf) -> io::Result<Self> { | |
Ok(Self { | |
client: Arc::new(LocalClient::new(base_path).await?), | |
}) | |
} | |
} | |
#[derive(Clone)] | |
pub struct ProjectLocalRepository { | |
client: Arc<LocalClient>, | |
} | |
impl ProjectLocalRepository { | |
pub async fn new(base_path: PathBuf) -> io::Result<Self> { | |
Ok(Self { | |
client: Arc::new(LocalClient::new(base_path).await?), | |
}) | |
} | |
} |
#[async_trait] | ||
impl ProjectSnapshotRepository for ProjectLocalRepository { | ||
async fn create_snapshot( | ||
&self, | ||
snapshot: ProjectSnapshot, | ||
) -> Result<(), Box<dyn std::error::Error>> { | ||
let path = format!("snapshots/{}", snapshot.metadata.id); | ||
self.client.upload(path, &snapshot).await?; | ||
|
||
// Update latest snapshot | ||
let latest_path = format!("latest_snapshots/{}", snapshot.metadata.project_id); | ||
self.client.upload(latest_path, &snapshot).await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn get_latest_snapshot( | ||
&self, | ||
project_id: &str, | ||
) -> Result<Option<ProjectSnapshot>, Box<dyn std::error::Error>> { | ||
let path = format!("latest_snapshots/{}", project_id); | ||
match self.client.download::<ProjectSnapshot>(path).await { | ||
Ok(snapshot) => Ok(Some(snapshot)), | ||
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), | ||
Err(e) => Err(Box::new(e)), | ||
} | ||
} | ||
|
||
async fn get_latest_snapshot_state( | ||
&self, | ||
project_id: &str, | ||
) -> Result<Vec<u8>, Box<dyn std::error::Error>> { | ||
let path = format!("latest_snapshots/{}", project_id); | ||
match self.client.download::<ProjectSnapshot>(path).await { | ||
Ok(snapshot) => Ok(serde_json::to_vec(&snapshot)?), | ||
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(vec![]), | ||
Err(e) => Err(Box::new(e)), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent Handling of Non-existent Snapshots
In the get_latest_snapshot_state
method, when a snapshot is not found, you're returning an empty Vec<u8>
(Ok(vec![])
), whereas in get_latest_snapshot
, you return Ok(None)
for the same scenario. This inconsistency can lead to confusion and bugs, as callers might interpret an empty vector differently than a None
value.
Consider modifying get_latest_snapshot_state
to return Result<Option<Vec<u8>>, Box<dyn std::error::Error>>
and returning Ok(None)
when the snapshot is not found. This aligns the behavior with get_latest_snapshot
and provides a clear indication of the absence of data.
Apply this diff to adjust the method signature and return value:
- async fn get_latest_snapshot_state(
+ async fn get_latest_snapshot_state(
&self,
project_id: &str,
- ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
+ ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
let path = format!("latest_snapshots/{}", project_id);
match self.client.download::<ProjectSnapshot>(path).await {
- Ok(snapshot) => Ok(serde_json::to_vec(&snapshot)?),
- Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(vec![]),
+ Ok(snapshot) => Ok(Some(serde_json::to_vec(&snapshot)?)),
+ Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(Box::new(e)),
}
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#[async_trait] | |
impl ProjectSnapshotRepository for ProjectLocalRepository { | |
async fn create_snapshot( | |
&self, | |
snapshot: ProjectSnapshot, | |
) -> Result<(), Box<dyn std::error::Error>> { | |
let path = format!("snapshots/{}", snapshot.metadata.id); | |
self.client.upload(path, &snapshot).await?; | |
// Update latest snapshot | |
let latest_path = format!("latest_snapshots/{}", snapshot.metadata.project_id); | |
self.client.upload(latest_path, &snapshot).await?; | |
Ok(()) | |
} | |
async fn get_latest_snapshot( | |
&self, | |
project_id: &str, | |
) -> Result<Option<ProjectSnapshot>, Box<dyn std::error::Error>> { | |
let path = format!("latest_snapshots/{}", project_id); | |
match self.client.download::<ProjectSnapshot>(path).await { | |
Ok(snapshot) => Ok(Some(snapshot)), | |
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), | |
Err(e) => Err(Box::new(e)), | |
} | |
} | |
async fn get_latest_snapshot_state( | |
&self, | |
project_id: &str, | |
) -> Result<Vec<u8>, Box<dyn std::error::Error>> { | |
let path = format!("latest_snapshots/{}", project_id); | |
match self.client.download::<ProjectSnapshot>(path).await { | |
Ok(snapshot) => Ok(serde_json::to_vec(&snapshot)?), | |
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(vec![]), | |
Err(e) => Err(Box::new(e)), | |
} | |
} | |
#[async_trait] | |
impl ProjectSnapshotRepository for ProjectLocalRepository { | |
async fn create_snapshot( | |
&self, | |
snapshot: ProjectSnapshot, | |
) -> Result<(), Box<dyn std::error::Error>> { | |
let path = format!("snapshots/{}", snapshot.metadata.id); | |
self.client.upload(path, &snapshot).await?; | |
// Update latest snapshot | |
let latest_path = format!("latest_snapshots/{}", snapshot.metadata.project_id); | |
self.client.upload(latest_path, &snapshot).await?; | |
Ok(()) | |
} | |
async fn get_latest_snapshot( | |
&self, | |
project_id: &str, | |
) -> Result<Option<ProjectSnapshot>, Box<dyn std::error::Error>> { | |
let path = format!("latest_snapshots/{}", project_id); | |
match self.client.download::<ProjectSnapshot>(path).await { | |
Ok(snapshot) => Ok(Some(snapshot)), | |
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), | |
Err(e) => Err(Box::new(e)), | |
} | |
} | |
async fn get_latest_snapshot_state( | |
&self, | |
project_id: &str, | |
) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> { | |
let path = format!("latest_snapshots/{}", project_id); | |
match self.client.download::<ProjectSnapshot>(path).await { | |
Ok(snapshot) => Ok(Some(serde_json::to_vec(&snapshot)?)), | |
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), | |
Err(e) => Err(Box::new(e)), | |
} | |
} |
409536a
to
3033b4c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range and nitpick comments (2)
websocket/crates/infra/src/persistence/project_repository.rs (2)
109-114
: Add logging for error casesIn methods like
get_project
andget_latest_snapshot
, when an error occurs (other thanNotFound
), the error is returned but not logged. Adding logging statements in the error handling branches can aid in debugging and provide valuable information during runtime.Also applies to: 138-143
229-251
: Enhance test utility by externalizingcreate_test_snapshot
functionThe
create_test_snapshot
function is currently defined within the test module. If this function is or will be used across multiple test files, consider moving it to a common test utilities module to promote code reuse and improve maintainability.
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files ignored due to path filters (1)
websocket/Cargo.lock
is excluded by!**/*.lock
,!**/*.lock
Files selected for processing (6)
- websocket/Cargo.toml (2 hunks)
- websocket/crates/infra/Cargo.toml (1 hunks)
- websocket/crates/infra/src/persistence/local_storage/local_client.rs (1 hunks)
- websocket/crates/infra/src/persistence/local_storage/mod.rs (1 hunks)
- websocket/crates/infra/src/persistence/mod.rs (1 hunks)
- websocket/crates/infra/src/persistence/project_repository.rs (2 hunks)
Files skipped from review due to trivial changes (4)
- websocket/Cargo.toml
- websocket/crates/infra/Cargo.toml
- websocket/crates/infra/src/persistence/local_storage/local_client.rs
- websocket/crates/infra/src/persistence/mod.rs
Files skipped from review as they are similar to previous changes (1)
- websocket/crates/infra/src/persistence/local_storage/mod.rs
pub async fn new(base_path: PathBuf) -> io::Result<Self> { | ||
Ok(Self { | ||
client: Arc::new(LocalClient::new(base_path).await?), | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making ProjectLocalRepository::new
synchronous
The new
method is currently asynchronous due to the call to LocalClient::new(base_path).await?
. If possible, consider making new
synchronous to align with Rust conventions, as constructors are typically synchronous. This can simplify the API and usage of ProjectLocalRepository
.
let path = format!("snapshots/{}", snapshot.metadata.id); | ||
self.client.upload(path, &snapshot).await?; | ||
|
||
// Update latest snapshot | ||
let latest_path = format!("latest_snapshots/{}", snapshot.metadata.project_id); | ||
self.client.upload(latest_path, &snapshot).await?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure atomicity when updating snapshots
In the create_snapshot
method, if the first upload succeeds but the second one fails, the system might end up in an inconsistent state where the snapshot is stored but the latest snapshot reference isn't updated. Consider implementing a transactional mechanism or error handling strategy to ensure both operations succeed together or handle failures appropriately.
3033b4c
to
28cfb13
Compare
Overview
What I've done
What I haven't done
How I tested
Screenshot
Which point I want you to review particularly
Memo
Summary by CodeRabbit
New Features
LocalClient
for asynchronous file storage operations, allowing users to upload and download files safely.ProjectLocalRepository
for managing project data and snapshots locally.Chores
tempfile
andlru
, to enhance file management and caching capabilities.Tests