Skip to content

Commit 309f2ec

Browse files
Move helper to util module. Test for streaming util.
1 parent 05ff9f3 commit 309f2ec

File tree

3 files changed

+147
-23
lines changed

3 files changed

+147
-23
lines changed

protocol/streaming/full_node_streaming_manager.go

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
1919
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
2020
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
21+
"github.com/dydxprotocol/v4-chain/protocol/streaming/util"
2122
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
2223
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
2324

@@ -191,29 +192,11 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
191192
return id
192193
}
193194

194-
// TODO best practice for ensuring all cases are handled
195-
// default error? default panic?
196-
func GetOffChannelUpdateV1SubaccountId(update ocutypes.OffChainUpdateV1) uint32 {
197-
var orderSubaccountIdNumber uint32
198-
switch ocu1 := update.UpdateMessage.(type) {
199-
case *ocutypes.OffChainUpdateV1_OrderPlace:
200-
orderSubaccountIdNumber = ocu1.OrderPlace.Order.OrderId.SubaccountId.Number
201-
case *ocutypes.OffChainUpdateV1_OrderRemove:
202-
orderSubaccountIdNumber = ocu1.OrderRemove.RemovedOrderId.SubaccountId.Number
203-
case *ocutypes.OffChainUpdateV1_OrderUpdate:
204-
orderSubaccountIdNumber = ocu1.OrderUpdate.OrderId.SubaccountId.Number
205-
case *ocutypes.OffChainUpdateV1_OrderReplace:
206-
orderSubaccountIdNumber = ocu1.OrderReplace.Order.OrderId.SubaccountId.Number
207-
208-
}
209-
return orderSubaccountIdNumber
210-
}
211-
212195
// Filter StreamUpdates for subaccountIdNumbers
213196
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
214197
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
215198
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
216-
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []clobtypes.StreamUpdate) {
199+
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []clobtypes.StreamUpdate, logger log.Logger) {
217200
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds))
218201
for i, subaccountId := range sub.subaccountIds {
219202
subaccountIdNumbers[i] = subaccountId.Number
@@ -227,9 +210,13 @@ func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []cl
227210
case *clobtypes.StreamUpdate_OrderbookUpdate:
228211
orderBookUpdates := []ocutypes.OffChainUpdateV1{}
229212
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates {
230-
orderBookUpdateSubaccountIdNumber := GetOffChannelUpdateV1SubaccountId(orderBookUpdate)
231-
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
232-
orderBookUpdates = append(orderBookUpdates, orderBookUpdate)
213+
orderBookUpdateSubaccountIdNumber, err := util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
214+
if err != nil {
215+
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
216+
orderBookUpdates = append(orderBookUpdates, orderBookUpdate)
217+
}
218+
} else {
219+
logger.Error(err.Error())
233220
}
234221
}
235222
// Drop the StreamUpdate_OrderbookUpdate if all updates inside were dropped
@@ -347,7 +334,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
347334
}
348335
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate)
349336
defer close(filteredUpdateChannel)
350-
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel)
337+
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
351338
}
352339

353340
// Use current goroutine to consistently poll subscription channel for updates

protocol/streaming/util/util.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,22 @@ func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes
2121
}
2222
return v1updates
2323
}
24+
25+
// TODO best practice for ensuring all cases are handled
26+
// default error? default panic?
27+
func GetOffChainUpdateV1SubaccountIdNumber(update ocutypes.OffChainUpdateV1) (uint32, error) {
28+
var orderSubaccountIdNumber uint32
29+
switch ocu1 := update.UpdateMessage.(type) {
30+
case *ocutypes.OffChainUpdateV1_OrderPlace:
31+
orderSubaccountIdNumber = ocu1.OrderPlace.Order.OrderId.SubaccountId.Number
32+
case *ocutypes.OffChainUpdateV1_OrderRemove:
33+
orderSubaccountIdNumber = ocu1.OrderRemove.RemovedOrderId.SubaccountId.Number
34+
case *ocutypes.OffChainUpdateV1_OrderUpdate:
35+
orderSubaccountIdNumber = ocu1.OrderUpdate.OrderId.SubaccountId.Number
36+
case *ocutypes.OffChainUpdateV1_OrderReplace:
37+
orderSubaccountIdNumber = ocu1.OrderReplace.Order.OrderId.SubaccountId.Number
38+
default:
39+
return 0, fmt.Errorf("UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}")
40+
}
41+
return orderSubaccountIdNumber, nil
42+
}

protocol/streaming/util/util_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package util_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
11+
pv1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types"
12+
stypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types"
13+
"github.com/dydxprotocol/v4-chain/protocol/streaming/util"
14+
)
15+
16+
func _ToPtr[V any](v V) *V {
17+
return &v
18+
}
19+
20+
func TestGetOffChainUpdateV1SubaccountIdNumber(t *testing.T) {
21+
subaccountIdNumber := uint32(1337)
22+
orderId := pv1types.IndexerOrderId{
23+
SubaccountId: pv1types.IndexerSubaccountId{
24+
Owner: "foo",
25+
Number: uint32(subaccountIdNumber),
26+
},
27+
ClientId: 0,
28+
OrderFlags: 0,
29+
ClobPairId: 0,
30+
}
31+
order := pv1types.IndexerOrder{
32+
OrderId: orderId,
33+
Side: pv1types.IndexerOrder_SIDE_BUY,
34+
Quantums: uint64(10 ^ 6),
35+
Subticks: 1,
36+
GoodTilOneof: &pv1types.IndexerOrder_GoodTilBlock{
37+
GoodTilBlock: 10 ^ 9,
38+
},
39+
TimeInForce: 10 ^ 9,
40+
ReduceOnly: false,
41+
ClientMetadata: 0,
42+
ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED,
43+
ConditionalOrderTriggerSubticks: 0,
44+
}
45+
newOrder := order
46+
newOrder.Quantums += 10 ^ 6
47+
48+
orderPlaceTime := time.Now()
49+
fillQuantums := uint64(1988)
50+
51+
tests := map[string]struct {
52+
update ocutypes.OffChainUpdateV1
53+
id uint32
54+
err error
55+
}{
56+
"OrderPlace": {
57+
update: ocutypes.OffChainUpdateV1{
58+
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{
59+
OrderPlace: &ocutypes.OrderPlaceV1{
60+
Order: &order,
61+
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED,
62+
TimeStamp: _ToPtr(orderPlaceTime),
63+
},
64+
},
65+
},
66+
id: subaccountIdNumber,
67+
err: nil,
68+
},
69+
"OrderRemove": {
70+
update: ocutypes.OffChainUpdateV1{
71+
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderRemove{
72+
OrderRemove: &ocutypes.OrderRemoveV1{
73+
RemovedOrderId: &orderId,
74+
Reason: stypes.OrderRemovalReason_ORDER_REMOVAL_REASON_USER_CANCELED,
75+
RemovalStatus: ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED,
76+
TimeStamp: _ToPtr(orderPlaceTime.Add(1 * time.Second)),
77+
},
78+
},
79+
},
80+
id: subaccountIdNumber,
81+
err: nil,
82+
},
83+
"OrderUpdate": {
84+
update: ocutypes.OffChainUpdateV1{
85+
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{
86+
OrderUpdate: &ocutypes.OrderUpdateV1{
87+
OrderId: &orderId,
88+
TotalFilledQuantums: fillQuantums,
89+
},
90+
},
91+
},
92+
id: subaccountIdNumber,
93+
err: nil,
94+
},
95+
"OrderReplace": {
96+
update: ocutypes.OffChainUpdateV1{
97+
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderReplace{
98+
OrderReplace: &ocutypes.OrderReplaceV1{
99+
OldOrderId: &orderId,
100+
Order: &newOrder,
101+
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED,
102+
TimeStamp: _ToPtr(orderPlaceTime.Add(3 * time.Second)),
103+
},
104+
},
105+
},
106+
id: subaccountIdNumber,
107+
err: nil,
108+
},
109+
}
110+
for name, testCase := range tests {
111+
t.Run(name, func(t *testing.T) {
112+
id, err := util.GetOffChainUpdateV1SubaccountIdNumber(testCase.update)
113+
fmt.Println("expected", id)
114+
require.Equal(t, err, testCase.err)
115+
require.Equal(t, id, testCase.id)
116+
})
117+
}
118+
}

0 commit comments

Comments
 (0)