@@ -2,27 +2,25 @@ package streaming
2
2
3
3
import (
4
4
"fmt"
5
+ "slices"
5
6
"sync"
6
7
"sync/atomic"
7
8
"time"
8
9
9
- "github.com/dydxprotocol/v4-chain/protocol/lib"
10
- pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
11
- satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
12
-
13
10
"cosmossdk.io/log"
14
11
storetypes "cosmossdk.io/store/types"
15
12
"github.com/cosmos/cosmos-sdk/codec"
16
13
sdk "github.com/cosmos/cosmos-sdk/types"
17
14
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
15
+ "github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
16
+ ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
17
+ "github.com/dydxprotocol/v4-chain/protocol/lib"
18
18
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
19
19
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
20
20
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
21
21
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
22
-
23
- ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
24
-
25
- "github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
22
+ pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
23
+ satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
26
24
)
27
25
28
26
var _ types.FullNodeStreamingManager = (* FullNodeStreamingManagerImpl )(nil )
@@ -96,6 +94,41 @@ type OrderbookSubscription struct {
96
94
nextSnapshotBlock uint32
97
95
}
98
96
97
+ func NewOrderbookSubscription (
98
+ subscriptionId uint32 ,
99
+ clobPairIds []uint32 ,
100
+ subaccountIds []satypes.SubaccountId ,
101
+ marketIds []uint32 ,
102
+ messageSender types.OutgoingMessageSender ,
103
+ updatesChannel chan []clobtypes.StreamUpdate ,
104
+ ) * OrderbookSubscription {
105
+ return & OrderbookSubscription {
106
+ subscriptionId : subscriptionId ,
107
+ initialized : & atomic.Bool {}, // False by default.
108
+ clobPairIds : clobPairIds ,
109
+ subaccountIds : subaccountIds ,
110
+ marketIds : marketIds ,
111
+ messageSender : messageSender ,
112
+ updatesChannel : updatesChannel ,
113
+ }
114
+ }
115
+
116
+ func (sm * FullNodeStreamingManagerImpl ) NewOrderbookSubscription (
117
+ clobPairIds []uint32 ,
118
+ subaccountIds []satypes.SubaccountId ,
119
+ marketIds []uint32 ,
120
+ messageSender types.OutgoingMessageSender ,
121
+ ) * OrderbookSubscription {
122
+ return NewOrderbookSubscription (
123
+ sm .getNextAvailableSubscriptionId (),
124
+ clobPairIds ,
125
+ subaccountIds ,
126
+ marketIds ,
127
+ messageSender ,
128
+ make (chan []clobtypes.StreamUpdate , sm .maxSubscriptionChannelSize ),
129
+ )
130
+ }
131
+
99
132
func (sub * OrderbookSubscription ) IsInitialized () bool {
100
133
return sub .initialized .Load ()
101
134
}
@@ -187,11 +220,58 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
187
220
return id
188
221
}
189
222
223
+ func doFilterStreamUpdateBySubaccount (
224
+ orderBookUpdate * clobtypes.StreamUpdate_OrderbookUpdate ,
225
+ subaccountIdNumbers []uint32 ,
226
+ logger log.Logger ,
227
+ ) bool {
228
+ for _ , orderBookUpdate := range orderBookUpdate .OrderbookUpdate .Updates {
229
+ orderBookUpdateSubaccountIdNumber , err := streaming_util .GetOffChainUpdateV1SubaccountIdNumber (orderBookUpdate )
230
+ if err == nil {
231
+ if slices .Contains (subaccountIdNumbers , orderBookUpdateSubaccountIdNumber ) {
232
+ return true
233
+ }
234
+ } else {
235
+ logger .Error (err .Error ())
236
+ }
237
+ }
238
+ return false
239
+ }
240
+
241
+ // Filter StreamUpdates for subaccountIdNumbers
242
+ // If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
243
+ // If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
244
+ // StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
245
+ func FilterStreamUpdateBySubaccount (
246
+ updates []clobtypes.StreamUpdate ,
247
+ subaccountIdNumbers []uint32 ,
248
+ logger log.Logger ,
249
+ ) * []clobtypes.StreamUpdate {
250
+ // If reflection becomes too expensive, split updatesChannel by message type
251
+ filteredUpdates := []clobtypes.StreamUpdate {}
252
+ for _ , update := range updates {
253
+ switch updateMessage := update .UpdateMessage .(type ) {
254
+ case * clobtypes.StreamUpdate_OrderbookUpdate :
255
+ if doFilterStreamUpdateBySubaccount (updateMessage , subaccountIdNumbers , logger ) {
256
+ filteredUpdates = append (filteredUpdates , update )
257
+ }
258
+ default :
259
+ filteredUpdates = append (filteredUpdates , update )
260
+ }
261
+ }
262
+
263
+ if len (filteredUpdates ) > 0 {
264
+ return & filteredUpdates
265
+ }
266
+ return nil
267
+ }
268
+
190
269
// Subscribe subscribes to the orderbook updates stream.
191
270
func (sm * FullNodeStreamingManagerImpl ) Subscribe (
192
271
clobPairIds []uint32 ,
193
272
subaccountIds []* satypes.SubaccountId ,
194
273
marketIds []uint32 ,
274
+ filterOrdersBySubAccountId bool ,
195
275
messageSender types.OutgoingMessageSender ,
196
276
) (
197
277
err error ,
@@ -200,24 +280,21 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
200
280
if len (clobPairIds ) == 0 && len (subaccountIds ) == 0 && len (marketIds ) == 0 {
201
281
return types .ErrInvalidStreamingRequest
202
282
}
283
+ if filterOrdersBySubAccountId && (len (subaccountIds ) == 0 ) {
284
+ sm .logger .Error ("filterOrdersBySubaccountId with no subaccountIds" )
285
+ return types .ErrInvalidStreamingRequest
286
+ }
203
287
204
288
sm .Lock ()
205
289
sIds := make ([]satypes.SubaccountId , len (subaccountIds ))
290
+ subaccountIdNumbers := make ([]uint32 , len (subaccountIds ))
206
291
for i , subaccountId := range subaccountIds {
207
292
sIds [i ] = * subaccountId
293
+ subaccountIdNumbers [i ] = subaccountId .Number
208
294
}
209
295
210
- subscriptionId := sm .getNextAvailableSubscriptionId ( )
296
+ subscription := sm .NewOrderbookSubscription ( clobPairIds , sIds , marketIds , messageSender )
211
297
212
- subscription := & OrderbookSubscription {
213
- subscriptionId : subscriptionId ,
214
- initialized : & atomic.Bool {}, // False by default.
215
- clobPairIds : clobPairIds ,
216
- subaccountIds : sIds ,
217
- marketIds : marketIds ,
218
- messageSender : messageSender ,
219
- updatesChannel : make (chan []clobtypes.StreamUpdate , sm .maxSubscriptionChannelSize ),
220
- }
221
298
for _ , clobPairId := range clobPairIds {
222
299
// if clobPairId exists in the map, append the subscription id to the slice
223
300
// otherwise, create a new slice with the subscription id
@@ -268,6 +345,12 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
268
345
// Use current goroutine to consistently poll subscription channel for updates
269
346
// to send through stream.
270
347
for updates := range subscription .updatesChannel {
348
+ if filterOrdersBySubAccountId {
349
+ filteredUpdates := FilterStreamUpdateBySubaccount (updates , subaccountIdNumbers , sm .logger )
350
+ if filteredUpdates != nil {
351
+ updates = * filteredUpdates
352
+ }
353
+ }
271
354
metrics .IncrCounterWithLabels (
272
355
metrics .GrpcSendResponseToSubscriberCount ,
273
356
1 ,
@@ -1080,12 +1163,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
1080
1163
sm .FlushStreamUpdatesWithLock ()
1081
1164
1082
1165
// Cache updates to sync local ops queue
1083
- sycnLocalUpdates , syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates (
1166
+ syncLocalUpdates , syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates (
1084
1167
streaming_util .GetOffchainUpdatesV1 (orderBookUpdatesToSyncLocalOpsQueue ),
1085
1168
lib .MustConvertIntegerToUint32 (ctx .BlockHeight ()),
1086
1169
ctx .ExecMode (),
1087
1170
)
1088
- sm .cacheStreamUpdatesByClobPairWithLock (sycnLocalUpdates , syncLocalClobPairIds )
1171
+ sm .cacheStreamUpdatesByClobPairWithLock (syncLocalUpdates , syncLocalClobPairIds )
1089
1172
1090
1173
// Cache updates for finalized fills.
1091
1174
fillStreamUpdates , fillClobPairIds := sm .getStreamUpdatesForOrderbookFills (
0 commit comments