Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a channel buffer to decouple abci and grpc streaming #1530

Merged
merged 13 commits into from
May 28, 2024

Conversation

jonfung-dydx
Copy link
Contributor

@jonfung-dydx jonfung-dydx commented May 16, 2024

Adds a buffered go channel between the abci execution code and the grpc stream emission in order to not block abci execution code on grpc stream emission

Summary by CodeRabbit

  • New Features

    • Added support for configuring GRPC streaming buffer size.
    • Introduced a new method to stop GRPC streaming manager.
    • Enhanced logging to include buffer size information when GRPC streaming is enabled.
  • Bug Fixes

    • Fixed an issue where the GRPC streaming manager was not properly stopped when a new instance was created.
  • Improvements

    • Improved orderbook update handling by grouping updates and enqueuing them efficiently.
    • Enhanced test coverage for new GRPC streaming buffer size configuration.
  • Metrics

    • Added new metrics for GRPC streaming buffer size and number of connections.

Copy link
Contributor

coderabbitai bot commented May 16, 2024

Walkthrough

The recent updates enhance the GRPC streaming capabilities by introducing a buffer for orderbook updates, ensuring smoother handling of updates. Key changes include adding a Stop method to the GrpcStreamingManager, logging buffer size, and adjusting the Flags struct to manage the GRPC streaming buffer size. These modifications improve the robustness and configurability of the GRPC streaming system.

Changes

File(s) Change Summary
protocol/app/app.go Added a conditional check and method call to stop the GrpcStreamingManager if it is not nil. Added a log message with buffer size information when GRPC streaming is enabled.
protocol/app/flags/flags.go Added GrpcStreamingBufferSize field to Flags struct, updated related constants and default values, and modified functions to handle this new field.
protocol/app/flags/flags_test.go Updated tests to include the new GrpcStreamingBufferSize flag and related assertions.
protocol/lib/metrics/metric_keys.go Added constants GrpcStreamingBufferSize and GrpcStreamingNumConnections.
protocol/streaming/grpc/noop_streaming_manager.go Modified SendOrderbookFillUpdates method by removing the ctx sdk.Context parameter. Added a new Stop method.
protocol/streaming/grpc/types/manager.go Added Stop() method to GrpcStreamingManager interface and removed ctx sdk.Context parameter from SendOrderbookFillUpdates method.
protocol/x/clob/keeper/keeper.go Removed ctx parameter from SendOrderbookFillUpdates function call in Keeper struct.
protocol/streaming/grpc/grpc_streaming_manager.go Added logger field, updateBuffer channel, bufferInternalResponse struct, sendUpdateResponse method, and Stop method. Modified NewGrpcStreamingManager method to initialize updateBuffer and start worker goroutine. Updated SendOrderbookFillUpdates and SendOrderbookUpdates methods to enqueue updates to updateBuffer.

Sequence Diagram(s) (Beta)

sequenceDiagram
    participant App as Application
    participant Flags as Flags
    participant GrpcMgr as GrpcStreamingManager
    participant Logger as Logger

    App->>Flags: Initialize with GrpcStreamingBufferSize
    Flags->>App: Provide GrpcStreamingBufferSize
    App->>GrpcMgr: NewGrpcStreamingManager(logger, bufferSize)
    GrpcMgr->>Logger: Log buffer size
    App->>GrpcMgr: Check if GrpcStreamingManager is not nil
    alt GrpcStreamingManager not nil
        App->>GrpcMgr: Stop()
    end
    GrpcMgr->>App: Return new instance
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to full the review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Out of diff range and nitpick comments (1)
protocol/app/app.go (1)

Line range hint 743-743: Insecure gRPC server initialization. Include credentials derived from an SSL certificate to create a secure gRPC connection.

- grpc.NewServer(),
+ grpc.NewServer(grpc.Creds(credentials.NewServerTLSFromFile("cert.pem", "cert.key"))),
Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 430c208 and 68742ad.
Files selected for processing (2)
  • protocol/app/app.go (1 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (5 hunks)
Files not reviewed due to errors (1)
  • protocol/streaming/grpc/grpc_streaming_manager.go (no review received)
Additional Context Used
Path-based Instructions (2)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/app/app.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (1)
protocol/app/app.go (1)

461-463: Properly stopping the existing GrpcStreamingManager before instantiating a new one is a good practice to avoid resource leaks.

@jonfung-dydx jonfung-dydx force-pushed the jonfung/goroutineGRPC branch from 68742ad to a1372ae Compare May 16, 2024 18:38
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Out of diff range and nitpick comments (1)
protocol/app/app.go (1)

Line range hint 743-743: Insecure gRPC server initialization. Include credentials derived from an SSL certificate to create a secure gRPC connection.

- grpc.NewServer(),
+ grpc.NewServer(grpc.Creds(credentials.NewServerTLSFromFile("cert.pem", "cert.key"))),
Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 68742ad and a1372ae.
Files selected for processing (4)
  • protocol/app/app.go (2 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
  • protocol/streaming/grpc/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/grpc/types/manager.go (1 hunks)
Additional Context Used
Path-based Instructions (4)
protocol/streaming/grpc/types/manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/streaming/grpc/noop_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/app/app.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (10)
protocol/streaming/grpc/types/manager.go (1)

25-25: LGTM! The Stop method is correctly added to the GrpcStreamingManager interface.

protocol/streaming/grpc/noop_streaming_manager.go (1)

49-50: LGTM! The Stop method is correctly added to the NoopGrpcStreamingManager struct.

protocol/streaming/grpc/grpc_streaming_manager.go (7)

78-80: LGTM! The Stop method is correctly implemented to close the updateBuffer channel.


82-104: LGTM! The sendUpdateResponse method is correctly implemented to send updates to subscribers and remove failed subscriptions.


Line range hint 135-180: LGTM! The SendOrderbookFillUpdates method is correctly implemented to group fills by their clob pair IDs and enqueue messages to be sent to subscribers.


182-231: LGTM! The SendOrderbookUpdates method is correctly implemented to group updates by their clob pair IDs and enqueue messages to be sent to subscribers.


233-243: LGTM! The mustEnqueueOrderbookUpdate method is correctly implemented to try to enqueue an orderbook update to the buffer via non-blocking send and clear all subscriptions if the buffer is full.


Line range hint 245-259: LGTM! The GetUninitializedClobPairIds method is correctly implemented to return the clob pair IDs that have not been initialized.


Line range hint 261-278: LGTM! The GetOffchainUpdatesV1 method is correctly implemented to unmarshal messages in offchain updates to OffchainUpdateV1.

protocol/app/app.go (1)

461-463: Properly stopping the existing GrpcStreamingManager before instantiating a new one is a good practice to avoid resource leaks.

Comment on lines 54 to 79
func NewGrpcStreamingManager(logger log.Logger) *GrpcStreamingManagerImpl {
grpcStreamingManager := &GrpcStreamingManagerImpl{
logger: logger.With("module", "grpc-streaming"),
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

// TODO prime the buffer size
updateBuffer: make(chan bufferInternalResponse, 1000),
}

// Worker goroutine to consistently read from channel and send out updates
go func() {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.sendUpdateResponse(internalResponse)
}
}()

return grpcStreamingManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the buffer size configurable.

The buffer size is hardcoded to 1000. Consider making it configurable to allow flexibility based on different use cases.

- updateBuffer: make(chan bufferInternalResponse, 1000),
+ updateBuffer: make(chan bufferInternalResponse, bufferSize),

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func NewGrpcStreamingManager(logger log.Logger) *GrpcStreamingManagerImpl {
grpcStreamingManager := &GrpcStreamingManagerImpl{
logger: logger.With("module", "grpc-streaming"),
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,
// TODO prime the buffer size
updateBuffer: make(chan bufferInternalResponse, 1000),
}
// Worker goroutine to consistently read from channel and send out updates
go func() {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.sendUpdateResponse(internalResponse)
}
}()
return grpcStreamingManager
func NewGrpcStreamingManager(logger log.Logger, bufferSize int) *GrpcStreamingManagerImpl {
grpcStreamingManager := &GrpcStreamingManagerImpl{
logger: logger.With("module", "grpc-streaming"),
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,
// TODO prime the buffer size
updateBuffer: make(chan bufferInternalResponse, bufferSize),
}
// Worker goroutine to consistently read from channel and send out updates
go func() {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.sendUpdateResponse(internalResponse)
}
}()
return grpcStreamingManager

@jonfung-dydx jonfung-dydx requested a review from jayy04 May 16, 2024 19:22
@jonfung-dydx jonfung-dydx force-pushed the jonfung/goroutineGRPC branch from a1372ae to 93ba3a3 Compare May 16, 2024 21:28
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between a1372ae and 93ba3a3.
Files selected for processing (7)
  • protocol/app/app.go (3 hunks)
  • protocol/app/flags/flags.go (6 hunks)
  • protocol/app/flags/flags_test.go (6 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
  • protocol/streaming/grpc/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/grpc/types/manager.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (1 hunks)
Files skipped from review due to trivial changes (1)
  • protocol/x/clob/keeper/keeper.go
Files skipped from review as they are similar to previous changes (4)
  • protocol/app/app.go
  • protocol/streaming/grpc/grpc_streaming_manager.go
  • protocol/streaming/grpc/noop_streaming_manager.go
  • protocol/streaming/grpc/types/manager.go
Additional Context Used
Path-based Instructions (2)
protocol/app/flags/flags_test.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/app/flags/flags.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (9)
protocol/app/flags/flags_test.go (3)

35-37: LGTM! The new flags for gRPC streaming are correctly added and tested.


69-72: LGTM! The new test cases for gRPC streaming validation are correctly added and cover both success and failure scenarios.


Line range hint 114-123: LGTM! The new flags for gRPC streaming are correctly retrieved from options and tested.

Also applies to: 133-141

protocol/app/flags/flags.go (6)

24-26: LGTM! The new fields for gRPC streaming are correctly added to the Flags struct.


41-42: LGTM! The new constants for gRPC streaming flags are correctly added.


56-58: LGTM! The new default values for gRPC streaming flags are correctly added.


92-96: LGTM! The new flags for gRPC streaming are correctly added to the AddFlagsToCmd function.


136-138: LGTM! The new flags for gRPC streaming are correctly added to the GetFlagValuesFromOptions function.


184-188: LGTM! The new flags for gRPC streaming are correctly added to the GetFlagValuesFromOptions function.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 93ba3a3 and 8d67fbf.
Files selected for processing (2)
  • .github/workflows/protocol-build-and-push-snapshot.yml (1 hunks)
  • .github/workflows/protocol-build-and-push.yml (1 hunks)
Files skipped from review due to trivial changes (2)
  • .github/workflows/protocol-build-and-push-snapshot.yml
  • .github/workflows/protocol-build-and-push.yml

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 8d67fbf and 3813218.
Files selected for processing (3)
  • protocol/app/flags/flags.go (6 hunks)
  • protocol/lib/metrics/metric_keys.go (1 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
Files skipped from review as they are similar to previous changes (1)
  • protocol/app/flags/flags.go
Additional Context Used
Path-based Instructions (2)
protocol/lib/metrics/metric_keys.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (8)
protocol/lib/metrics/metric_keys.go (1)

71-71: The addition of GrpcStreamingBufferSize metric key aligns well with the new functionality introduced in the PR.

protocol/streaming/grpc/grpc_streaming_manager.go (7)

63-63: The buffer size is now configurable, which is a flexible improvement. Ensure that bufferWindow is correctly set and passed from the configuration.


66-71: The implementation of the worker goroutine for processing updates from the buffer is correctly done and aligns with asynchronous processing requirements.


80-82: The implementation of the Stop method, which closes the updateBuffer channel, is correct and ensures a clean shutdown process.


84-105: The error handling in the sendUpdateResponse method, which removes failing subscriptions, is robust and ensures that only active and functioning subscriptions remain.


Line range hint 137-182: The changes in the SendOrderbookFillUpdates method, which groups fills by clob pair ids and enqueues them for streaming, align well with the intended enhancements to GRPC streaming functionality.


184-233: The changes in the SendOrderbookUpdates method, which groups updates by clob pair ids and enqueues them for streaming, align well with the intended enhancements to GRPC streaming functionality.


235-246: The implementation of the mustEnqueueOrderbookUpdate method, which handles full buffer scenarios by clearing all subscriptions, is crucial for maintaining system stability under high load conditions.

@jonfung-dydx jonfung-dydx force-pushed the jonfung/goroutineGRPC branch from 3813218 to 0ada61e Compare May 17, 2024 21:28
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 3813218 and 0ada61e.
Files selected for processing (6)
  • protocol/app/app.go (5 hunks)
  • protocol/app/flags/flags.go (6 hunks)
  • protocol/lib/metrics/metric_keys.go (1 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
  • protocol/streaming/grpc/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/grpc/types/manager.go (1 hunks)
Files skipped from review as they are similar to previous changes (3)
  • protocol/app/flags/flags.go
  • protocol/streaming/grpc/noop_streaming_manager.go
  • protocol/streaming/grpc/types/manager.go
Additional Context Used
Path-based Instructions (3)
protocol/lib/metrics/metric_keys.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/app/app.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (3)
protocol/lib/metrics/metric_keys.go (1)

71-72: The new metric keys for GRPC streaming buffer size and number of connections are correctly named and follow the established guidelines.

protocol/streaming/grpc/grpc_streaming_manager.go (1)

64-64: The buffer size is now configurable, addressing previous concerns about flexibility. Good improvement!

protocol/app/app.go (1)

462-464: Ensure graceful shutdown of GrpcStreamingManager.

This change ensures that the GrpcStreamingManager is stopped during the application's shutdown process, which is crucial for releasing resources and preventing potential memory leaks or other issues.

Comment on lines 242 to 269
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
return
default:
sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
for k := range sm.orderbookSubscriptions {
delete(sm.orderbookSubscriptions, k)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider implementing a more graceful handling mechanism when the GRPC streaming buffer is full, rather than shutting down all subscriptions. This could potentially disrupt all clients unexpectedly.

- sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
- for k := range sm.orderbookSubscriptions {
-     delete(sm.orderbookSubscriptions, k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
return
default:
sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
for k := range sm.orderbookSubscriptions {
delete(sm.orderbookSubscriptions, k)
}
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
return
default:
sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Out of diff range and nitpick comments (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Line range hint 167-212: Consider optimizing the grouping of fills by clobPairId to reduce complexity and improve performance.

- for _, orderbookFill := range orderbookFills {
-   clobPairId := orderbookFill.Orders[0].OrderId.ClobPairId
-   if _, ok := updatesByClobPairId[clobPairId]; !ok {
-     updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
-   }
-   streamUpdate := clobtypes.StreamUpdate{
-     UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
-       OrderFill: &orderbookFill,
-     },
-   }
-   updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
- }
+ updatesByClobPairId := groupFillsByClobPairId(orderbookFills)
Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 0ada61e and 4319852.
Files selected for processing (1)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
Additional Context Used
Path-based Instructions (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (4)
protocol/streaming/grpc/grpc_streaming_manager.go (4)

56-75: The implementation of NewGrpcStreamingManager correctly initializes the updateBuffer and starts a worker goroutine.


82-84: Properly implemented Stop method to close the updateBuffer.


86-89: Metrics emission for buffer size and number of connections is a good practice for monitoring.


162-162: Logging the establishment of a new GRPC stream connection is useful for debugging and monitoring.

Comment on lines 101 to 131
func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
sample_rate := 0.01
log := false
if rand.Float64() < sample_rate {
log = true
}

// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)

for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if clobPairId == internalResponse.clobPairId {
if log {
sm.logger.Info("sending out update")
}
if err := subscription.srv.Send(
&internalResponse.response,
); err != nil {
sm.logger.Error("Error sending out update", "err", err)
subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
}
if log {
sm.logger.Info("finished sending out update")
}
}
}
}
// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range subscriptionIdsToRemove {
sm.removeSubscription(id)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review the error handling in sendUpdateResponse. Consider adding more robust error recovery or retry mechanisms.

- subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
+ if retrySuccessful {
+   sm.logger.Info("Retry successful")
+ } else {
+   subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
+ }

Comment on lines 265 to 269
// mustEnqueueOrderbookUpdate tries to enqueue an orderbook update to the buffer via non-blocking send.
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
return
default:
sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
for k := range sm.orderbookSubscriptions {
sm.removeSubscription(k)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling of a full buffer by shutting down all subscriptions might be too drastic. Consider a more graceful degradation strategy.

- sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
- for k := range sm.orderbookSubscriptions {
-     delete(sm.orderbookSubscriptions, k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Comment on lines 23 to 33
logger log.Logger
sync.Mutex

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32

// Readonly buffer to enqueue orderbook updates before pushing them through grpc streams.
// Decouples the execution of abci logic with full node streaming.
updateBuffer chan bufferInternalResponse
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Initialization of updateBuffer is present, but cleanup is missing.

  • Initialization found at protocol/streaming/grpc/grpc_streaming_manager.go:65.
  • No cleanup (i.e., close(updateBuffer)) found in the codebase.

Please review the cleanup process for updateBuffer to ensure proper resource management.

Analysis chain

Ensure proper initialization and cleanup of updateBuffer.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that `updateBuffer` is properly initialized and cleaned up across all relevant methods.

# Test: Search for initialization and cleanup of `updateBuffer`.
ast-grep --lang go --pattern $'func $_($_) {
  $$$
  make(chan bufferInternalResponse, $_)
  $$$
  close($_)
  $$$
}'

Length of output: 118



Script:

#!/bin/bash
# Description: Verify that `updateBuffer` is properly initialized.

# Test: Search for initialization of `updateBuffer`.
ast-grep --lang go --pattern $'make(chan bufferInternalResponse, $_)'

# Description: Verify that `updateBuffer` is properly cleaned up.

# Test: Search for cleanup of `updateBuffer`.
ast-grep --lang go --pattern $'close(updateBuffer)'

Length of output: 243

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 4319852 and cca844b.
Files selected for processing (1)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
Additional Context Used
Path-based Instructions (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (5)
protocol/streaming/grpc/grpc_streaming_manager.go (5)

65-73: Initialization of updateBuffer with a worker goroutine is correctly implemented.


82-84: Properly closing the updateBuffer channel in the Stop method.


163-163: Logging the establishment of a new GRPC stream connection is a good practice for monitoring.


101-137: Refactor sendUpdateResponse to handle errors more gracefully.

- panic(err)
+ sm.logger.Error("Failed to unmarshal offchain updates", "error", err)
+ return

23-33: Ensure proper cleanup of updateBuffer to avoid resource leaks.

Comment on lines +215 to +258
// SendOrderbookUpdates groups updates by their clob pair ids and
// enqueues messages to be sent to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)
sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}
// Group updates by clob pair id.
updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = clobtypes.NewOffchainUpdates()
}
updatesByClobPairId[clobPairId].Messages = append(updatesByClobPairId[clobPairId].Messages, message)
}

if len(streamUpdatesForSubscription) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
// Unmarshal messages to v1 updates and enqueue in buffer to be sent.
for clobPairId, update := range updatesByClobPairId {
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
},
}
sm.mustEnqueueOrderbookUpdate(bufferInternalResponse{
response: clobtypes.StreamOrderbookUpdatesResponse{
Updates: []clobtypes.StreamUpdate{streamUpdate},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
clobPairId: clobPairId,
})
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling of a full buffer by shutting down all subscriptions might be too drastic. Consider a more graceful degradation strategy.

- sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
- for k := range sm.orderbookSubscriptions {
-     delete(sm.orderbookSubscriptions, k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Comment on lines 266 to 273
// mustEnqueueOrderbookUpdate tries to enqueue an orderbook update to the buffer via non-blocking send.
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
sm.logger.Info("successfully enqueue buffer")
default:
sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
for k := range sm.orderbookSubscriptions {
sm.removeSubscription(k)
}
}
sm.EmitMetrics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method mustEnqueueOrderbookUpdate should handle full buffer situations more gracefully.

- sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
- for k := range sm.orderbookSubscriptions {
-     delete(sm.orderbookSubscriptions, k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

@jonfung-dydx jonfung-dydx force-pushed the jonfung/goroutineGRPC branch from cca844b to 1066d2b Compare May 24, 2024 20:22
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between cca844b and 1066d2b.
Files selected for processing (10)
  • .github/workflows/protocol-build-and-push-snapshot.yml (1 hunks)
  • .github/workflows/protocol-build-and-push.yml (1 hunks)
  • protocol/app/app.go (5 hunks)
  • protocol/app/flags/flags.go (6 hunks)
  • protocol/app/flags/flags_test.go (6 hunks)
  • protocol/lib/metrics/metric_keys.go (1 hunks)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
  • protocol/streaming/grpc/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/grpc/types/manager.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (1 hunks)
Files skipped from review as they are similar to previous changes (8)
  • .github/workflows/protocol-build-and-push-snapshot.yml
  • .github/workflows/protocol-build-and-push.yml
  • protocol/app/flags/flags.go
  • protocol/app/flags/flags_test.go
  • protocol/lib/metrics/metric_keys.go
  • protocol/streaming/grpc/noop_streaming_manager.go
  • protocol/streaming/grpc/types/manager.go
  • protocol/x/clob/keeper/keeper.go
Additional Context Used
Path-based Instructions (2)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

protocol/app/app.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (6)
protocol/streaming/grpc/grpc_streaming_manager.go (5)

22-33: The addition of updateBuffer and updateBufferWindowSize fields to manage the buffer size dynamically is a good approach to handle streaming data efficiently.


56-79: The constructor NewGrpcStreamingManager now initializes the updateBuffer with a size specified by bufferWindow. This change aligns with the PR's objective to decouple ABCI execution from gRPC streaming.


86-88: The implementation of the Stop method to close the updateBuffer channel is crucial for proper resource management and to prevent goroutine leaks.


209-258: The SendOrderbookUpdates method has been updated to handle offchain updates more efficiently by grouping them before sending. This should help in managing large volumes of data more effectively.


Line range hint 162-207: The method SendOrderbookFillUpdates now groups fills by their clob pair ids and enqueues them for streaming. This is a significant change that should improve the efficiency of data handling. Ensure that the locking mechanism is correctly implemented to avoid data races.

Verification successful

The method SendOrderbookFillUpdates in the GrpcStreamingManagerImpl class correctly implements the locking mechanism by using sm.Lock() and sm.Unlock() to ensure thread safety. This mechanism is crucial to avoid data races when accessing shared resources.

  • The method definition starts with acquiring the lock using sm.Lock().
  • The lock is released at the end of the method using defer sm.Unlock().

This ensures that the critical section of the code is protected, and no data races will occur.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify proper locking mechanism in `SendOrderbookFillUpdates`.

# Test: Search for locking and unlocking patterns in the method.
ast-grep --lang go --pattern $'func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates($_, $_, $_) {
  $$$
  sm.Lock()
  $$$
  sm.Unlock()
  $$$
}'

Length of output: 153



Script:

#!/bin/bash
# Description: Verify proper locking mechanism in `SendOrderbookFillUpdates`.

# Test: Search for locking and unlocking patterns in the method.
ast-grep --lang go --pattern $'func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates($_, $_, $_, $_) {
  $$$
  sm.Lock()
  $$$
  sm.Unlock()
  $$$
}'

Length of output: 157



Script:

#!/bin/bash
# Description: Verify proper locking mechanism in `SendOrderbookFillUpdates`.

# Step 1: Search for the method definition.
rg 'func (sm \*GrpcStreamingManagerImpl) SendOrderbookFillUpdates' -A 20 protocol/streaming/grpc/grpc_streaming_manager.go

# Step 2: Inspect the locking mechanism within the method.

Length of output: 124



Script:

#!/bin/bash
# Description: Verify proper locking mechanism in `SendOrderbookFillUpdates`.

# Step 1: Search for the method definition across the entire repository.
rg 'func (sm \*GrpcStreamingManagerImpl) SendOrderbookFillUpdates' -A 50

# Step 2: Inspect the locking mechanism within the method.

Length of output: 74



Script:

#!/bin/bash
# Description: Verify proper locking mechanism in `SendOrderbookFillUpdates`.

# Step 1: Locate the file containing the method definition.
fd 'grpc_streaming_manager.go'

# Step 2: Search for the method definition within the located file.
rg 'func (sm \*GrpcStreamingManagerImpl) SendOrderbookFillUpdates' -A 50 $(fd 'grpc_streaming_manager.go')

Length of output: 237



Script:

#!/bin/bash
# Description: Verify proper locking mechanism in `SendOrderbookFillUpdates`.

# Step 1: Search for all method definitions within the file.
ast-grep --lang go --pattern $'func $_($_, $_, $_, $_) {
  $$$
}' protocol/streaming/grpc/grpc_streaming_manager.go

Length of output: 116



Script:

#!/bin/bash
# Description: Verify proper locking mechanism in `SendOrderbookFillUpdates`.

# Step 1: Search for all method definitions within the file.
rg 'func ' protocol/streaming/grpc/grpc_streaming_manager.go -A 50

Length of output: 8090

protocol/app/app.go (1)

462-464: Ensure graceful shutdown of GrpcStreamingManager.

This change correctly checks if GrpcStreamingManager is not nil before calling its Stop method, which is a safe way to handle potential nil pointers.

Comment on lines +1939 to +1941
grpcStreamingBufferSize := uint32(appFlags.GrpcStreamingBufferSize)
logger.Info(fmt.Sprintf("GRPC streaming is enabled with buffer size %d", grpcStreamingBufferSize))
return streaming.NewGrpcStreamingManager(logger, grpcStreamingBufferSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Secure the gRPC server to prevent unauthorized access.

The gRPC server is initialized without any credentials, which could expose it to unauthorized access. Consider using SSL/TLS to secure the connection.

- return streaming.NewGrpcStreamingManager(logger, grpcStreamingBufferSize)
+ creds, err := credentials.NewServerTLSFromFile("cert.pem", "cert.key")
+ if err != nil {
+     panic(fmt.Sprintf("Failed to generate gRPC credentials: %v", err))
+ }
+ return streaming.NewGrpcStreamingManager(logger, grpcStreamingBufferSize, grpc.Creds(creds))

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
grpcStreamingBufferSize := uint32(appFlags.GrpcStreamingBufferSize)
logger.Info(fmt.Sprintf("GRPC streaming is enabled with buffer size %d", grpcStreamingBufferSize))
return streaming.NewGrpcStreamingManager(logger, grpcStreamingBufferSize)
grpcStreamingBufferSize := uint32(appFlags.GrpcStreamingBufferSize)
logger.Info(fmt.Sprintf("GRPC streaming is enabled with buffer size %d", grpcStreamingBufferSize))
creds, err := credentials.NewServerTLSFromFile("cert.pem", "cert.key")
if err != nil {
panic(fmt.Sprintf("Failed to generate gRPC credentials: %v", err))
}
return streaming.NewGrpcStreamingManager(logger, grpcStreamingBufferSize, grpc.Creds(creds))

Comment on lines 260 to 273
// mustEnqueueOrderbookUpdate tries to enqueue an orderbook update to the buffer via non-blocking send.
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
default:
sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
for k := range sm.orderbookSubscriptions {
sm.removeSubscription(k)
}
// Clear out the buffer
sm.updateBuffer = make(chan bufferInternalResponse, sm.updateBufferWindowSize)
}
sm.EmitMetrics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mustEnqueueOrderbookUpdate method now handles the situation where the buffer is full by clearing all subscriptions. This approach might be too drastic and could lead to data loss or client disruptions. Consider implementing a more graceful degradation strategy.

- sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
- for k := range sm.orderbookSubscriptions {
-     delete(sm.orderbookSubscriptions, k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Committable suggestion was skipped due low confidence.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 1066d2b and d69c744.
Files selected for processing (1)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
Additional Context Used
Path-based Instructions (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (2)
protocol/streaming/grpc/grpc_streaming_manager.go (2)

56-81: Initialization of GrpcStreamingManagerImpl looks good. The use of a goroutine to handle streaming updates is appropriate.


Line range hint 164-209: The methods for sending orderbook updates are well-implemented. Ensure that the handling of full buffer situations is aligned with the suggested changes in mustEnqueueOrderbookUpdate.

Also applies to: 211-260

Comment on lines 262 to 276
// mustEnqueueOrderbookUpdate tries to enqueue an orderbook update to the buffer via non-blocking send.
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
default:
sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
for k := range sm.orderbookSubscriptions {
sm.removeSubscription(k)
}
// Clear out the buffer
close(sm.updateBuffer)
sm.updateBuffer = make(chan bufferInternalResponse, sm.updateBufferWindowSize)
}
sm.EmitMetrics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling of a full buffer by shutting down all subscriptions might be too drastic. Consider a more graceful degradation strategy.

- sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
- for k := range sm.orderbookSubscriptions {
-     sm.removeSubscription(k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Committable suggestion was skipped due low confidence.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range and nitpick comments (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

22-33: Ensure proper documentation for all fields in GrpcStreamingManagerImpl.

Consider adding more detailed comments for each field in the GrpcStreamingManagerImpl struct, especially for updateBuffer and updateBufferWindowSize, to explain their roles and how they interact with other components.

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between d69c744 and 040ad24.
Files selected for processing (1)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
Additional Context Used
Path-based Instructions (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (2)
protocol/streaming/grpc/grpc_streaming_manager.go (2)

65-66: Initialization of updateBuffer now uses a configurable size.

This change aligns with the previous suggestion to make the buffer size configurable, enhancing flexibility based on different use cases.


85-87: Add a safety check before closing updateBuffer.

+ if sm.updateBuffer != nil {
+     close(sm.updateBuffer)
+     sm.updateBuffer = nil
+ }

This change prevents potential runtime panics from closing an already closed channel, ensuring more robust error handling.

Likely invalid or redundant comment.

Comment on lines 260 to 272
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
default:
sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
for k := range sm.orderbookSubscriptions {
sm.removeSubscription(k)
}
// Clear out the buffer
for len(sm.updateBuffer) > 0 {
<-sm.updateBuffer
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling of a full buffer by shutting down all subscriptions might be too drastic.

- sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
- for k := range sm.orderbookSubscriptions {
-     sm.removeSubscription(k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Consider implementing a more graceful degradation strategy when the GRPC streaming buffer is full, rather than shutting down all subscriptions. This could potentially disrupt all clients unexpectedly.

Committable suggestion was skipped due low confidence.

Comment on lines 104 to 130
func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)

for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if clobPairId == internalResponse.clobPairId {
sm.logger.Info("sending out update")
if err := subscription.srv.Send(
&internalResponse.response,
); err != nil {
sm.logger.Error("Error sending out update", "err", err)
subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
}
sm.logger.Info("finished sending out update")
}
}
}
// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range subscriptionIdsToRemove {
sm.removeSubscription(id)
}
sm.EmitMetrics()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance error handling in sendUpdateResponse.

+ if err := subscription.srv.Send(&internalResponse.response); err != nil {
+     sm.logger.Error("Error sending out update, attempting retry", "err", err)
+     // Implement retry logic here
+     if retrySuccessful {
+         sm.logger.Info("Retry successful")
+     } else {
+         subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
+     }
+ } else {
+     sm.logger.Info("Update sent successfully")
+ }

Adding retry logic or more detailed error recovery mechanisms can help manage transient errors more effectively, improving the reliability of the streaming service.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if clobPairId == internalResponse.clobPairId {
sm.logger.Info("sending out update")
if err := subscription.srv.Send(
&internalResponse.response,
); err != nil {
sm.logger.Error("Error sending out update", "err", err)
subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
}
sm.logger.Info("finished sending out update")
}
}
}
// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range subscriptionIdsToRemove {
sm.removeSubscription(id)
}
sm.EmitMetrics()
}
func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if clobPairId == internalResponse.clobPairId {
sm.logger.Info("sending out update")
if err := subscription.srv.Send(
&internalResponse.response,
); err != nil {
sm.logger.Error("Error sending out update, attempting retry", "err", err)
// Implement retry logic here
if retrySuccessful {
sm.logger.Info("Retry successful")
} else {
subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
}
} else {
sm.logger.Info("Update sent successfully")
}
sm.logger.Info("finished sending out update")
}
}
}
// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range subscriptionIdsToRemove {
sm.removeSubscription(id)
}
sm.EmitMetrics()
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 040ad24 and d55c4ae.
Files selected for processing (1)
  • protocol/streaming/grpc/grpc_streaming_manager.go (6 hunks)
Additional Context Used
Path-based Instructions (1)
protocol/streaming/grpc/grpc_streaming_manager.go (1)

Pattern **/**: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.

Additional comments not posted (3)
protocol/streaming/grpc/grpc_streaming_manager.go (3)

22-33: The struct GrpcStreamingManagerImpl has been updated to include a buffered channel updateBuffer and a window size updateBufferWindowSize. This is a good use of Go's channel capabilities to manage asynchronous data flow.


56-76: The constructor NewGrpcStreamingManager properly initializes the updateBuffer with the size provided by bufferWindow. This dynamic allocation based on input allows for flexible buffer sizing, which is a good practice.


204-253: The methods SendOrderbookFillUpdates and SendOrderbookUpdates effectively group updates by clob pair IDs and enqueue them for streaming. This structured approach helps in managing updates efficiently.
[APROVED]

Comment on lines +83 to +85
func (sm *GrpcStreamingManagerImpl) Stop() {
close(sm.updateBuffer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that updateBuffer is not closed multiple times to avoid runtime panics. Consider adding a check to ensure it's only closed if it's not already closed.

+ if sm.updateBuffer != nil {
+     close(sm.updateBuffer)
+     sm.updateBuffer = nil
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func (sm *GrpcStreamingManagerImpl) Stop() {
close(sm.updateBuffer)
}
func (sm *GrpcStreamingManagerImpl) Stop() {
if sm.updateBuffer != nil {
close(sm.updateBuffer)
sm.updateBuffer = nil
}
}

Comment on lines +256 to +271
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
default:
sm.logger.Error("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. " +
"Disconnect all clients and increase buffer size via the `grpc-streaming-buffer-size flag.")
for k := range sm.orderbookSubscriptions {
sm.removeSubscription(k)
}
// Clear out the buffer
for len(sm.updateBuffer) > 0 {
<-sm.updateBuffer
}
}
sm.EmitMetrics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method mustEnqueueOrderbookUpdate handles the situation where the buffer is full by shutting down all subscriptions. This approach might be too drastic and could lead to data loss or client disruptions. Consider implementing a more graceful degradation strategy.

- sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
- for k := range sm.orderbookSubscriptions {
-     sm.removeSubscription(k)
- }
+ sm.logger.Warn("GRPC Streaming buffer approaching full capacity. Consider increasing buffer size or reducing incoming update rate.")

Committable suggestion was skipped due low confidence.

Comment on lines +102 to +126
func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)

for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if clobPairId == internalResponse.clobPairId {
if err := subscription.srv.Send(
&internalResponse.response,
); err != nil {
sm.logger.Error("Error sending out update", "err", err)
subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
}
}
}
}
// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range subscriptionIdsToRemove {
sm.removeSubscription(id)
}
sm.EmitMetrics()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method sendUpdateResponse handles sending updates to subscribers and removes subscriptions on errors. Consider enhancing error handling by implementing retry mechanisms or more detailed error recovery strategies to improve the robustness of the streaming service.

+ if err := subscription.srv.Send(&internalResponse.response); err != nil {
+     sm.logger.Error("Error sending out update, attempting retry", "err", err)
+     // Implement retry logic here
+     if retrySuccessful {
+         sm.logger.Info("Retry successful")
+     } else {
+         subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
+     }
+ } else {
+     sm.logger.Info("Update sent successfully")
+ }

Committable suggestion was skipped due low confidence.

// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)

for id, subscription := range sm.orderbookSubscriptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need locking here? or sync.Map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonfung-dydx jonfung-dydx merged commit bd91a73 into main May 28, 2024
18 checks passed
@jonfung-dydx jonfung-dydx deleted the jonfung/goroutineGRPC branch May 28, 2024 18:52
@jonfung-dydx
Copy link
Contributor Author

@Mergifyio backport v4.1.x

Copy link
Contributor

mergify bot commented May 28, 2024

backport v4.1.x

❌ No backport have been created

  • Backport to branch v4.1.x failed

GitHub error: Branch not found

@jonfung-dydx
Copy link
Contributor Author

https://github.com/Mergifyio backport release/protocol/v4.1.x

Copy link
Contributor

mergify bot commented May 28, 2024

backport release/protocol/v4.1.x

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request May 28, 2024
(cherry picked from commit bd91a73)

# Conflicts:
#	protocol/app/app.go
#	protocol/app/flags/flags.go
jonfung-dydx added a commit that referenced this pull request May 28, 2024
) (#1595)

Co-authored-by: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com>
Co-authored-by: Jonathan Fung <jonathan@dydx.exchange>
jonfung-dydx added a commit that referenced this pull request May 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

2 participants