Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug fix]Use global sync dag store #4347

Merged
merged 2 commits into from
Dec 13, 2024
Merged

[Bug fix]Use global sync dag store #4347

merged 2 commits into from
Dec 13, 2024

Conversation

jackzhhuang
Copy link
Collaborator

@jackzhhuang jackzhhuang commented Dec 13, 2024

Pull request type

Please check the type of change your PR introduces:

  • Bugfix
  • Feature
  • Code style update (formatting, renaming)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • Documentation content changes
  • Other (please describe):

What is the current behavior?

Issue Number: N/A

What is the new behavior?

Other information

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced concurrency support by introducing shared ownership of the synchronization DAG store across various components using Arc<SyncDagStore>.
  • Bug Fixes

    • Improved error handling in block fetching and processing methods to ensure robust synchronization.
    • Introduced a timeout mechanism for waiting on parent blocks, enhancing error handling in scenarios where waiting time is exceeded.
  • Documentation

    • Updated method signatures and internal documentation to reflect changes in ownership and concurrency management for the synchronization DAG store.

Copy link

coderabbitai bot commented Dec 13, 2024

Walkthrough

The pull request introduces changes across multiple files to update the handling of the sync_dag_store field, transitioning its type from SyncDagStore to Arc<SyncDagStore>. This modification enhances thread safety and shared ownership capabilities in various components, including DagBlockSender, SyncService, BlockCollector, and others. The constructor methods and relevant method signatures have been adjusted accordingly to accommodate the new type, ensuring consistent usage throughout the synchronization process while maintaining existing logic.

Changes

File Path Change Summary
sync/src/parallel/sender.rs Updated sync_dag_store field type to Arc<SyncDagStore> in DagBlockSender and modified constructor. Cloned in process_absent_blocks method.
sync/src/sync.rs Added sync_dag_store: Arc<SyncDagStore> to SyncService. Updated new and check_and_start_sync methods to use the new field.
sync/src/tasks/block_sync_task.rs Changed sync_dag_store field type to Arc<SyncDagStore> in BlockCollector. Updated new_with_handle and execute_absent_block methods.
sync/src/tasks/continue_execute_absent_block.rs Updated sync_dag_store field type to Arc<SyncDagStore> in ContinueExecuteAbsentBlock. Adjusted constructor and method signatures.
sync/src/tasks/inner_sync_task.rs Changed sync_dag_store field type to Arc<SyncDagStore> in InnerSyncTask. Updated constructor accordingly.
sync/src/tasks/mock.rs Updated sync_dag_store field type to Arc<SyncDagStore> in SyncNodeMocker. Modified constructors to reflect this change.
sync/src/tasks/mod.rs Changed sync_dag_store parameter type to Arc<SyncDagStore> in full_sync_task function.
sync/src/tasks/tests.rs Updated sync_dag_store variable to Arc<SyncDagStore> in test_failed_block and test_sync_target functions.
sync/src/parallel/executor.rs Added constant MAX_TOTAL_WAITING_TIME for timeout in start_to_execute method of DagBlockExecutor.

Possibly related PRs

Suggested reviewers

  • sanlee42
  • yourmoonlight
  • nkysg
  • simonjiao

Poem

🐇 In the land of code where the bunnies play,
We wrapped our stores in an Arc today.
With threads now safe, our blocks will flow,
Sharing the load, watch our system grow!
Hopping along, with each little change,
Together we sync, it's never too strange! 🐰


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@jackzhhuang jackzhhuang self-assigned this Dec 13, 2024
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
sync/src/tasks/tests.rs (1)

920-920: Consider consistent error handling pattern

For consistency with line 57, consider using the ? operator instead of expect.

-Arc::new(SyncDagStore::create_for_testing().expect("failed to create the sync dag store")),
+Arc::new(SyncDagStore::create_for_testing()?),
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d69e48b and b734204.

📒 Files selected for processing (8)
  • sync/src/parallel/sender.rs (2 hunks)
  • sync/src/sync.rs (4 hunks)
  • sync/src/tasks/block_sync_task.rs (2 hunks)
  • sync/src/tasks/continue_execute_absent_block.rs (1 hunks)
  • sync/src/tasks/inner_sync_task.rs (2 hunks)
  • sync/src/tasks/mock.rs (6 hunks)
  • sync/src/tasks/mod.rs (1 hunks)
  • sync/src/tasks/tests.rs (2 hunks)
🔇 Additional comments (12)
sync/src/tasks/block_sync_task.rs (1)

217-217: Update sync_dag_store to Arc<SyncDagStore> in BlockCollector

Changing sync_dag_store to Arc<SyncDagStore> allows for shared ownership and thread-safe access, which is appropriate for concurrent environments.

Also applies to: 267-267

sync/src/tasks/inner_sync_task.rs (1)

41-41: Update sync_dag_store to Arc<SyncDagStore> in InnerSyncTask

Using Arc<SyncDagStore> in both the struct and constructor facilitates thread-safe shared ownership, aligning with concurrent usage patterns.

Also applies to: 61-61

sync/src/tasks/continue_execute_absent_block.rs (1)

22-22: Update sync_dag_store to Arc<SyncDagStore> in ContinueExecuteAbsentBlock

Transitioning sync_dag_store to Arc<SyncDagStore> enables shared ownership and thread-safe access, which is beneficial for concurrent execution.

Also applies to: 29-29

sync/src/parallel/sender.rs (1)

29-29: Update sync_dag_store to Arc<SyncDagStore> in DagBlockSender

Modifying sync_dag_store to use Arc<SyncDagStore> facilitates thread-safe shared ownership, enhancing concurrency management within DagBlockSender.

Also applies to: 41-41

sync/src/tasks/mock.rs (3)

138-138: LGTM! Thread-safe field declaration.

The change from SyncDagStore to Arc<SyncDagStore> enhances thread safety by enabling shared ownership of the store across multiple threads.


158-161: LGTM! Proper initialization of Arc.

The initialization properly wraps the SyncDagStore in an Arc and includes appropriate error handling.


191-191: LGTM! Consistent Arc usage across constructors.

The new_with_storage and new_with_strategy methods consistently initialize the sync_dag_store as Arc<SyncDagStore>.

Also applies to: 211-211

sync/src/sync.rs (3)

65-65: LGTM! Thread-safe service field.

The addition of sync_dag_store as Arc<SyncDagStore> to the service struct enables proper shared ownership.


87-93: LGTM! Proper initialization with configuration.

The initialization properly uses configuration parameters for the sync directory and cache size, ensuring flexibility and configurability.


238-238: LGTM! Proper cloning of Arc in check_and_start_sync.

The sync_dag_store is properly cloned before being passed to the sync task, maintaining the shared ownership semantics.

sync/src/tasks/mod.rs (1)

625-625: LGTM! Thread-safe parameter type.

The change to accept Arc<SyncDagStore> aligns with the broader effort to ensure thread-safe access to the sync DAG store across the codebase.

sync/src/tasks/tests.rs (1)

57-57: LGTM!

The change correctly wraps SyncDagStore in an Arc for thread-safe reference counting, which aligns with the broader changes to enhance thread safety.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
sync/src/parallel/executor.rs (2)

17-17: Use Duration for time constants to improve readability

Consider defining MAX_TOTAL_WAITING_TIME using Duration to enhance readability and prevent unit confusion.

Apply this diff:

-use std::sync::Arc;
+use std::sync::Arc;
+use tokio::time::Duration;

 const MAX_TOTAL_WAITING_TIME: u64 = 3600000; // an hour
+const MAX_TOTAL_WAITING_TIME: Duration = Duration::from_secs(3600); // an hour

101-102: Use Duration for time variables and calculations

For consistency and clarity, consider using Duration for total_waiting_time and waiting_per_time. This approach improves readability and leverages Duration's built-in methods.

Apply this diff:

 let handle = tokio::spawn(async move {
     let mut chain = None;
     loop {
         match self.receiver.recv().await {
             Some(op_block) => {
                 let block = match op_block {
                     Some(block) => block,
                     None => {
                         info!("sync worker channel closed");
                         drop(self.sender);
                         return;
                     }
                 };
                 let header = block.header().clone();

                 info!(
                     "sync parallel worker {:p} received block: {:?}",
                     &self,
                     block.header().id()
                 );

-                let mut total_waiting_time: u64 = 0;
-                let waiting_per_time: u64 = 100;
+                let mut total_waiting_time = Duration::ZERO;
+                let waiting_per_time = Duration::from_millis(100);
                 loop {
                     match Self::waiting_for_parents(
                         &self.dag,
                         self.storage.clone(),
                         block.header().parents_hash(),
                     ) {
                         Ok(true) => break,
                         Ok(false) => {
                             if total_waiting_time >= MAX_TOTAL_WAITING_TIME {
                                 error!(
                                     "failed to check parents: {:?}, for reason: timeout",
                                     header
                                 );
                                 match self
                                     .sender
                                     .send(ExecuteState::Error(Box::new(header.clone())))
                                     .await
                                 {
                                     Ok(_) => (),
                                     Err(e) => {
                                         error!(
                                             "failed to send error state: {:?}, for reason: {:?}",
                                             header, e
                                         );
                                         return;
                                     }
                                 }
                                 return;
                             }
                             tokio::task::yield_now().await;
                             tokio::time::sleep(waiting_per_time).await;
-                            total_waiting_time =
-                                total_waiting_time.saturating_add(waiting_per_time);
+                            total_waiting_time = total_waiting_time.checked_add(waiting_per_time).unwrap_or(MAX_TOTAL_WAITING_TIME);
                         }
                         Err(e) => {
                             error!(
                                 "failed to check parents: {:?}, for reason: {:?}",
                                 header, e
                             );
                             match self
                                 .sender
                                 .send(ExecuteState::Error(Box::new(header.clone())))
                                 .await
                             {
                                 Ok(_) => (),
                                 Err(e) => {
                                     error!(
                                         "failed to send error state: {:?}, for reason: {:?}",
                                         header, e
                                     );
                                     return;
                                 }
                             }
                             return;
                         }
                     }
                 }

Also applies to: 133-138

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b734204 and cfaa041.

📒 Files selected for processing (1)
  • sync/src/parallel/executor.rs (3 hunks)
🔇 Additional comments (1)
sync/src/parallel/executor.rs (1)

111-131: Reconsider the timeout duration for waiting on parent blocks

The MAX_TOTAL_WAITING_TIME is set to one hour. Depending on the application's requirements and typical network conditions, this duration might be too long, potentially delaying error handling and resource cleanup.

Please confirm that a one-hour timeout is appropriate for your use case. If not, consider adjusting it to a shorter duration that aligns with expected parent block availability.

@jackzhhuang jackzhhuang merged commit 33523f0 into dag-master Dec 13, 2024
3 of 5 checks passed
@jackzhhuang jackzhhuang deleted the sync-store-fix branch December 13, 2024 08:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants