diff --git a/protocol/app/app.go b/protocol/app/app.go index 6784357901a..07eebcd98ca 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1933,7 +1933,7 @@ func getGrpcStreamingManagerFromOptions( ) (manager streamingtypes.GrpcStreamingManager) { if appFlags.GrpcStreamingEnabled { logger.Info("GRPC streaming is enabled") - return streaming.NewGrpcStreamingManager() + return streaming.NewGrpcStreamingManager(logger) } return streaming.NewNoopGrpcStreamingManager() } diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index db9d3c15363..0a8efca5680 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" @@ -17,6 +18,7 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) // GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions. type GrpcStreamingManagerImpl struct { + logger log.Logger sync.Mutex // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. @@ -49,13 +51,14 @@ type OrderbookSubscription struct { srv clobtypes.Query_StreamOrderbookUpdatesServer } -func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { +func NewGrpcStreamingManager(logger log.Logger) *GrpcStreamingManagerImpl { grpcStreamingManager := &GrpcStreamingManagerImpl{ + logger: logger.With("module", "grpc-streaming"), orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), nextSubscriptionId: 0, // TODO prime the buffer size - updateBuffer: make(chan bufferInternalResponse, 100), + updateBuffer: make(chan bufferInternalResponse, 1000), } // Worker goroutine to consistently read from channel and send out updates @@ -131,7 +134,6 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( // SendOrderbookFillUpdates groups fills by their clob pair ids and // enqueues messages to be sent to the subscribers. func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( - ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32, execMode sdk.ExecMode, @@ -169,10 +171,11 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( BlockHeight: blockHeight, ExecMode: uint32(execMode), } - sm.updateBuffer <- bufferInternalResponse{ + + sm.mustEnqueueOrderbookUpdate(bufferInternalResponse{ response: streamResponse, clobPairId: clobPairId, - } + }) } } @@ -216,13 +219,27 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( }, }, } - sm.updateBuffer <- bufferInternalResponse{ + sm.mustEnqueueOrderbookUpdate(bufferInternalResponse{ response: clobtypes.StreamOrderbookUpdatesResponse{ Updates: []clobtypes.StreamUpdate{streamUpdate}, BlockHeight: blockHeight, ExecMode: uint32(execMode), }, clobPairId: clobPairId, + }) + } +} + +// mustEnqueueOrderbookUpdate tries to enqueue an orderbook update to the buffer via non-blocking send. +// If the buffer is full, *all* streaming subscriptions will be shut down. +func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) { + select { + case sm.updateBuffer <- internalResponse: + return + default: + sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions") + for k := range sm.orderbookSubscriptions { + delete(sm.orderbookSubscriptions, k) } } } diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 424871b4c37..fdce4bfb522 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -36,7 +36,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( } func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( - ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32, execMode sdk.ExecMode, @@ -46,3 +45,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 { return []uint32{} } + +func (sm *NoopGrpcStreamingManager) Stop() { +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 9b5af0c0932..198493f5079 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -22,8 +22,8 @@ type GrpcStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + Stop() SendOrderbookFillUpdates( - ctx sdk.Context, orderbookFills []clobtypes.StreamOrderbookFill, blockHeight uint32, execMode sdk.ExecMode,