Skip to content

Commit

Permalink
non blocking sends
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed May 16, 2024
1 parent 87fa3b4 commit a1372ae
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
2 changes: 1 addition & 1 deletion protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,7 @@ func getGrpcStreamingManagerFromOptions(
) (manager streamingtypes.GrpcStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager()
return streaming.NewGrpcStreamingManager(logger)
}
return streaming.NewNoopGrpcStreamingManager()
}
29 changes: 23 additions & 6 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"time"

"cosmossdk.io/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
Expand All @@ -17,6 +18,7 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)

// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions.
type GrpcStreamingManagerImpl struct {
logger log.Logger
sync.Mutex

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
Expand Down Expand Up @@ -49,13 +51,14 @@ type OrderbookSubscription struct {
srv clobtypes.Query_StreamOrderbookUpdatesServer
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
func NewGrpcStreamingManager(logger log.Logger) *GrpcStreamingManagerImpl {
grpcStreamingManager := &GrpcStreamingManagerImpl{
logger: logger.With("module", "grpc-streaming"),
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

// TODO prime the buffer size
updateBuffer: make(chan bufferInternalResponse, 100),
updateBuffer: make(chan bufferInternalResponse, 1000),
}

// Worker goroutine to consistently read from channel and send out updates
Expand Down Expand Up @@ -131,7 +134,6 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
// SendOrderbookFillUpdates groups fills by their clob pair ids and
// enqueues messages to be sent to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
Expand Down Expand Up @@ -169,10 +171,11 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
sm.updateBuffer <- bufferInternalResponse{

sm.mustEnqueueOrderbookUpdate(bufferInternalResponse{
response: streamResponse,
clobPairId: clobPairId,
}
})
}
}

Expand Down Expand Up @@ -216,13 +219,27 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
},
},
}
sm.updateBuffer <- bufferInternalResponse{
sm.mustEnqueueOrderbookUpdate(bufferInternalResponse{
response: clobtypes.StreamOrderbookUpdatesResponse{
Updates: []clobtypes.StreamUpdate{streamUpdate},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
clobPairId: clobPairId,
})
}
}

// mustEnqueueOrderbookUpdate tries to enqueue an orderbook update to the buffer via non-blocking send.
// If the buffer is full, *all* streaming subscriptions will be shut down.
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
return
default:
sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
for k := range sm.orderbookSubscriptions {
delete(sm.orderbookSubscriptions, k)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
Expand All @@ -46,3 +45,6 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
}

func (sm *NoopGrpcStreamingManager) Stop() {
}
2 changes: 1 addition & 1 deletion protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type GrpcStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
Stop()
SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
Expand Down

0 comments on commit a1372ae

Please sign in to comment.