Skip to content

Commit

Permalink
add a channel buffer to decouple abci and streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed May 16, 2024
1 parent 430c208 commit 87fa3b4
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 100 deletions.
3 changes: 3 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ func New(
if app.SlinkyClient != nil {
app.SlinkyClient.Stop()
}
if app.GrpcStreamingManager != nil {
app.GrpcStreamingManager.Stop()
}
return nil
},
)
Expand Down
213 changes: 113 additions & 100 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ type GrpcStreamingManagerImpl struct {
// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32

// Readonly buffer to enqueue orderbook updates before pushing them through grpc streams.
// Decouples the execution of abci logic with full node streaming.
updateBuffer chan bufferInternalResponse
}

// bufferInternalResponse is enqueued into the readonly buffer.
// It contains an update respnose and the clob pair id to send this information to.
type bufferInternalResponse struct {
response clobtypes.StreamOrderbookUpdatesResponse

// Information relevant to which Orderbook Subscription to send out to
clobPairId uint32
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand All @@ -37,15 +50,56 @@ type OrderbookSubscription struct {
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
return &GrpcStreamingManagerImpl{
grpcStreamingManager := &GrpcStreamingManagerImpl{
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

// TODO prime the buffer size
updateBuffer: make(chan bufferInternalResponse, 100),
}

// Worker goroutine to consistently read from channel and send out updates
go func() {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.sendUpdateResponse(internalResponse)
}
}()

return grpcStreamingManager
}

func (sm *GrpcStreamingManagerImpl) Enabled() bool {
return true
}

func (sm *GrpcStreamingManagerImpl) Stop() {
close(sm.updateBuffer)
}

func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)

for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if clobPairId == internalResponse.clobPairId {
if err := subscription.srv.Send(
&internalResponse.response,
); err != nil {
subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
}
}
}
}
// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range subscriptionIdsToRemove {
delete(sm.orderbookSubscriptions, id)
}
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
Expand Down Expand Up @@ -74,83 +128,8 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
return nil
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}

// Unmarshal messages to v1 updates.
v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
for clobPairId, update := range updates {
v1update, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
v1updates[clobPairId] = v1update
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
updatesToSend = append(updatesToSend, updates...)
}
}

if len(updatesToSend) > 0 {
streamUpdates := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: updatesToSend,
Snapshot: snapshot,
},
},
}
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: []clobtypes.StreamUpdate{streamUpdates},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
// enqueues messages to be sent to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
Expand All @@ -162,6 +141,8 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)
sm.Lock()
defer sm.Unlock()

// Group fills by clob pair id.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
Expand All @@ -181,36 +162,68 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

// Send response updates into the stream buffer
for clobPairId, streamUpdates := range updatesByClobPairId {
streamResponse := clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdates,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
sm.updateBuffer <- bufferInternalResponse{
response: streamResponse,
clobPairId: clobPairId,
}
}
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// enqueues messages to be sent to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)
sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}
// Group updates by clob pair id.
updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = clobtypes.NewOffchainUpdates()
}
updatesByClobPairId[clobPairId].Messages = append(updatesByClobPairId[clobPairId].Messages, message)
}

if len(streamUpdatesForSubscription) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
// Unmarshal messages to v1 updates and enqueue in buffer to be sent.
for clobPairId, update := range updatesByClobPairId {
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
},
}
sm.updateBuffer <- bufferInternalResponse{
response: clobtypes.StreamOrderbookUpdatesResponse{
Updates: []clobtypes.StreamUpdate{streamUpdate},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
clobPairId: clobPairId,
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
}

Expand Down

0 comments on commit 87fa3b4

Please sign in to comment.