Skip to content

Commit 39213e7

Browse files
Full Node Streaming Order Filter utils and tests
1 parent 309f2ec commit 39213e7

File tree

3 files changed

+356
-39
lines changed

3 files changed

+356
-39
lines changed

protocol/streaming/full_node_streaming_manager.go

Lines changed: 58 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,20 @@ import (
77
"sync/atomic"
88
"time"
99

10-
"github.com/dydxprotocol/v4-chain/protocol/lib"
11-
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
12-
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
13-
1410
"cosmossdk.io/log"
1511
storetypes "cosmossdk.io/store/types"
1612
"github.com/cosmos/cosmos-sdk/codec"
1713
sdk "github.com/cosmos/cosmos-sdk/types"
1814
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
15+
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
16+
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
17+
"github.com/dydxprotocol/v4-chain/protocol/lib"
1918
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
2019
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
21-
"github.com/dydxprotocol/v4-chain/protocol/streaming/util"
2220
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
2321
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
24-
25-
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
26-
27-
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
22+
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
23+
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
2824
)
2925

3026
var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
@@ -96,9 +92,41 @@ type OrderbookSubscription struct {
9692
// If interval snapshots are turned on, the next block height at which
9793
// a snapshot should be sent out.
9894
nextSnapshotBlock uint32
95+
}
96+
97+
func NewOrderbookSubscription(
98+
subscriptionId uint32,
99+
clobPairIds []uint32,
100+
subaccountIds []satypes.SubaccountId,
101+
marketIds []uint32,
102+
messageSender types.OutgoingMessageSender,
103+
updatesChannel chan []clobtypes.StreamUpdate,
104+
) *OrderbookSubscription {
105+
return &OrderbookSubscription{
106+
subscriptionId: subscriptionId,
107+
initialized: &atomic.Bool{}, // False by default.
108+
clobPairIds: clobPairIds,
109+
subaccountIds: subaccountIds,
110+
marketIds: marketIds,
111+
messageSender: messageSender,
112+
updatesChannel: updatesChannel,
113+
}
114+
}
99115

100-
// Filter orders for subaccountIds
101-
filterOrders bool
116+
func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription(
117+
clobPairIds []uint32,
118+
subaccountIds []satypes.SubaccountId,
119+
marketIds []uint32,
120+
messageSender types.OutgoingMessageSender,
121+
) *OrderbookSubscription {
122+
return NewOrderbookSubscription(
123+
sm.getNextAvailableSubscriptionId(),
124+
clobPairIds,
125+
subaccountIds,
126+
marketIds,
127+
messageSender,
128+
make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
129+
)
102130
}
103131

104132
func (sub *OrderbookSubscription) IsInitialized() bool {
@@ -196,7 +224,10 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
196224
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
197225
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
198226
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
199-
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []clobtypes.StreamUpdate, logger log.Logger) {
227+
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(
228+
output chan []clobtypes.StreamUpdate,
229+
logger log.Logger,
230+
) {
200231
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds))
201232
for i, subaccountId := range sub.subaccountIds {
202233
subaccountIdNumbers[i] = subaccountId.Number
@@ -210,8 +241,8 @@ func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []cl
210241
case *clobtypes.StreamUpdate_OrderbookUpdate:
211242
orderBookUpdates := []ocutypes.OffChainUpdateV1{}
212243
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates {
213-
orderBookUpdateSubaccountIdNumber, err := util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
214-
if err != nil {
244+
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
245+
if err == nil {
215246
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
216247
orderBookUpdates = append(orderBookUpdates, orderBookUpdate)
217248
}
@@ -266,17 +297,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
266297
sIds[i] = *subaccountId
267298
}
268299

269-
subscriptionId := sm.getNextAvailableSubscriptionId()
300+
subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender)
270301

271-
subscription := &OrderbookSubscription{
272-
subscriptionId: subscriptionId,
273-
initialized: &atomic.Bool{}, // False by default.
274-
clobPairIds: clobPairIds,
275-
subaccountIds: sIds,
276-
marketIds: marketIds,
277-
messageSender: messageSender,
278-
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
279-
}
280302
for _, clobPairId := range clobPairIds {
281303
// if clobPairId exists in the map, append the subscription id to the slice
282304
// otherwise, create a new slice with the subscription id
@@ -325,16 +347,21 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
325347
sm.Unlock()
326348

327349
// If filterOrders, listen to filtered channel and start filter goroutine
328-
// Error if fitlerOrders but no subaccounts are subscribed
350+
// Error if filterOrders but no subaccounts are subscribed
329351
filteredUpdateChannel := subscription.updatesChannel
330-
if subscription.filterOrders {
352+
if filterOrders {
331353
if len(subaccountIds) == 0 {
332-
// TODO panic?
333-
// log error
354+
sm.logger.Error(
355+
fmt.Sprintf(
356+
"filterOrders requires subaccountIds for subscription id: %+v",
357+
subscription.subscriptionId,
358+
),
359+
)
360+
} else {
361+
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate)
362+
defer close(filteredUpdateChannel)
363+
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
334364
}
335-
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate)
336-
defer close(filteredUpdateChannel)
337-
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
338365
}
339366

340367
// Use current goroutine to consistently poll subscription channel for updates

0 commit comments

Comments
 (0)