Skip to content

Commit c5dda04

Browse files
Full Node Streaming Order Filtering by Subaccount impl and tests
1 parent 56cc48e commit c5dda04

File tree

11 files changed

+932
-131
lines changed

11 files changed

+932
-131
lines changed

indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,13 @@ export interface StreamOrderbookUpdatesRequest {
278278
/** Market ids for price updates. */
279279

280280
marketIds: number[];
281+
/**
282+
* Filter order updates by subaccount IDs.
283+
* If true, the orderbook updates only include orders from provided subaccount
284+
* IDs.
285+
*/
286+
287+
filterOrdersBySubaccountId: boolean;
281288
}
282289
/**
283290
* StreamOrderbookUpdatesRequest is a request message for the
@@ -293,6 +300,13 @@ export interface StreamOrderbookUpdatesRequestSDKType {
293300
/** Market ids for price updates. */
294301

295302
market_ids: number[];
303+
/**
304+
* Filter order updates by subaccount IDs.
305+
* If true, the orderbook updates only include orders from provided subaccount
306+
* IDs.
307+
*/
308+
309+
filter_orders_by_subaccount_id: boolean;
296310
}
297311
/**
298312
* StreamOrderbookUpdatesResponse is a response message for the
@@ -1298,7 +1312,8 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques
12981312
return {
12991313
clobPairId: [],
13001314
subaccountIds: [],
1301-
marketIds: []
1315+
marketIds: [],
1316+
filterOrdersBySubaccountId: false
13021317
};
13031318
}
13041319

@@ -1323,6 +1338,11 @@ export const StreamOrderbookUpdatesRequest = {
13231338
}
13241339

13251340
writer.ldelim();
1341+
1342+
if (message.filterOrdersBySubaccountId === true) {
1343+
writer.uint32(32).bool(message.filterOrdersBySubaccountId);
1344+
}
1345+
13261346
return writer;
13271347
},
13281348

@@ -1365,6 +1385,10 @@ export const StreamOrderbookUpdatesRequest = {
13651385

13661386
break;
13671387

1388+
case 4:
1389+
message.filterOrdersBySubaccountId = reader.bool();
1390+
break;
1391+
13681392
default:
13691393
reader.skipType(tag & 7);
13701394
break;
@@ -1379,6 +1403,7 @@ export const StreamOrderbookUpdatesRequest = {
13791403
message.clobPairId = object.clobPairId?.map(e => e) || [];
13801404
message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || [];
13811405
message.marketIds = object.marketIds?.map(e => e) || [];
1406+
message.filterOrdersBySubaccountId = object.filterOrdersBySubaccountId ?? false;
13821407
return message;
13831408
}
13841409

proto/dydxprotocol/clob/query.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ message StreamOrderbookUpdatesRequest {
186186

187187
// Market ids for price updates.
188188
repeated uint32 market_ids = 3;
189+
190+
// Filter order updates by subaccount IDs.
191+
// If true, the orderbook updates only include orders from provided subaccount
192+
// IDs.
193+
bool filter_orders_by_subaccount_id = 4;
189194
}
190195

191196
// StreamOrderbookUpdatesResponse is a response message for the

protocol/streaming/full_node_streaming_manager.go

Lines changed: 103 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,25 @@ package streaming
22

33
import (
44
"fmt"
5+
"slices"
56
"sync"
67
"sync/atomic"
78
"time"
89

9-
"github.com/dydxprotocol/v4-chain/protocol/lib"
10-
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
11-
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
12-
1310
"cosmossdk.io/log"
1411
storetypes "cosmossdk.io/store/types"
1512
"github.com/cosmos/cosmos-sdk/codec"
1613
sdk "github.com/cosmos/cosmos-sdk/types"
1714
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
15+
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
16+
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
17+
"github.com/dydxprotocol/v4-chain/protocol/lib"
1818
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
1919
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
2020
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
2121
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
22-
23-
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
24-
25-
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
22+
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
23+
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
2624
)
2725

2826
var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
@@ -96,6 +94,41 @@ type OrderbookSubscription struct {
9694
nextSnapshotBlock uint32
9795
}
9896

97+
func NewOrderbookSubscription(
98+
subscriptionId uint32,
99+
clobPairIds []uint32,
100+
subaccountIds []satypes.SubaccountId,
101+
marketIds []uint32,
102+
messageSender types.OutgoingMessageSender,
103+
updatesChannel chan []clobtypes.StreamUpdate,
104+
) *OrderbookSubscription {
105+
return &OrderbookSubscription{
106+
subscriptionId: subscriptionId,
107+
initialized: &atomic.Bool{}, // False by default.
108+
clobPairIds: clobPairIds,
109+
subaccountIds: subaccountIds,
110+
marketIds: marketIds,
111+
messageSender: messageSender,
112+
updatesChannel: updatesChannel,
113+
}
114+
}
115+
116+
func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription(
117+
clobPairIds []uint32,
118+
subaccountIds []satypes.SubaccountId,
119+
marketIds []uint32,
120+
messageSender types.OutgoingMessageSender,
121+
) *OrderbookSubscription {
122+
return NewOrderbookSubscription(
123+
sm.getNextAvailableSubscriptionId(),
124+
clobPairIds,
125+
subaccountIds,
126+
marketIds,
127+
messageSender,
128+
make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
129+
)
130+
}
131+
99132
func (sub *OrderbookSubscription) IsInitialized() bool {
100133
return sub.initialized.Load()
101134
}
@@ -187,11 +220,58 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
187220
return id
188221
}
189222

223+
func doFilterStreamUpdateBySubaccount(
224+
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
225+
subaccountIdNumbers []uint32,
226+
logger log.Logger,
227+
) bool {
228+
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
229+
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
230+
if err == nil {
231+
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
232+
return true
233+
}
234+
} else {
235+
logger.Error(err.Error())
236+
}
237+
}
238+
return false
239+
}
240+
241+
// Filter StreamUpdates for subaccountIdNumbers
242+
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
243+
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
244+
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
245+
func FilterStreamUpdateBySubaccount(
246+
updates []clobtypes.StreamUpdate,
247+
subaccountIdNumbers []uint32,
248+
logger log.Logger,
249+
) *[]clobtypes.StreamUpdate {
250+
// If reflection becomes too expensive, split updatesChannel by message type
251+
filteredUpdates := []clobtypes.StreamUpdate{}
252+
for _, update := range updates {
253+
switch updateMessage := update.UpdateMessage.(type) {
254+
case *clobtypes.StreamUpdate_OrderbookUpdate:
255+
if doFilterStreamUpdateBySubaccount(updateMessage, subaccountIdNumbers, logger) {
256+
filteredUpdates = append(filteredUpdates, update)
257+
}
258+
default:
259+
filteredUpdates = append(filteredUpdates, update)
260+
}
261+
}
262+
263+
if len(filteredUpdates) > 0 {
264+
return &filteredUpdates
265+
}
266+
return nil
267+
}
268+
190269
// Subscribe subscribes to the orderbook updates stream.
191270
func (sm *FullNodeStreamingManagerImpl) Subscribe(
192271
clobPairIds []uint32,
193272
subaccountIds []*satypes.SubaccountId,
194273
marketIds []uint32,
274+
filterOrdersBySubAccountId bool,
195275
messageSender types.OutgoingMessageSender,
196276
) (
197277
err error,
@@ -200,24 +280,21 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
200280
if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
201281
return types.ErrInvalidStreamingRequest
202282
}
283+
if filterOrdersBySubAccountId && (len(subaccountIds) == 0) {
284+
sm.logger.Error("filterOrdersBySubaccountId with no subaccountIds")
285+
return types.ErrInvalidStreamingRequest
286+
}
203287

204288
sm.Lock()
205289
sIds := make([]satypes.SubaccountId, len(subaccountIds))
290+
subaccountIdNumbers := make([]uint32, len(subaccountIds))
206291
for i, subaccountId := range subaccountIds {
207292
sIds[i] = *subaccountId
293+
subaccountIdNumbers[i] = subaccountId.Number
208294
}
209295

210-
subscriptionId := sm.getNextAvailableSubscriptionId()
296+
subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender)
211297

212-
subscription := &OrderbookSubscription{
213-
subscriptionId: subscriptionId,
214-
initialized: &atomic.Bool{}, // False by default.
215-
clobPairIds: clobPairIds,
216-
subaccountIds: sIds,
217-
marketIds: marketIds,
218-
messageSender: messageSender,
219-
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
220-
}
221298
for _, clobPairId := range clobPairIds {
222299
// if clobPairId exists in the map, append the subscription id to the slice
223300
// otherwise, create a new slice with the subscription id
@@ -268,6 +345,12 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
268345
// Use current goroutine to consistently poll subscription channel for updates
269346
// to send through stream.
270347
for updates := range subscription.updatesChannel {
348+
if filterOrdersBySubAccountId {
349+
filteredUpdates := FilterStreamUpdateBySubaccount(updates, subaccountIdNumbers, sm.logger)
350+
if filteredUpdates != nil {
351+
updates = *filteredUpdates
352+
}
353+
}
271354
metrics.IncrCounterWithLabels(
272355
metrics.GrpcSendResponseToSubscriberCount,
273356
1,
@@ -1080,12 +1163,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
10801163
sm.FlushStreamUpdatesWithLock()
10811164

10821165
// Cache updates to sync local ops queue
1083-
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
1166+
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
10841167
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
10851168
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
10861169
ctx.ExecMode(),
10871170
)
1088-
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
1171+
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)
10891172

10901173
// Cache updates for finalized fills.
10911174
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(

0 commit comments

Comments
 (0)