Skip to content

Commit b3b0ce2

Browse files
mergify[bot]jayy04
andauthored
paginate liquidation daemon response (backport #2118) (#2127)
Co-authored-by: jayy04 <103467857+jayy04@users.noreply.github.com>
1 parent af25ad8 commit b3b0ce2

File tree

13 files changed

+389
-107
lines changed

13 files changed

+389
-107
lines changed

protocol/daemons/flags/flags.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package flags
22

33
import (
4+
"time"
5+
46
servertypes "github.com/cosmos/cosmos-sdk/server/types"
57
oracleconfig "github.com/skip-mev/slinky/oracle/config"
68
"github.com/spf13/cast"
79
"github.com/spf13/cobra"
8-
"time"
910
)
1011

1112
// List of CLI flags for Server and Client.
@@ -22,9 +23,10 @@ const (
2223
FlagBridgeDaemonLoopDelayMs = "bridge-daemon-loop-delay-ms"
2324
FlagBridgeDaemonEthRpcEndpoint = "bridge-daemon-eth-rpc-endpoint"
2425

25-
FlagLiquidationDaemonEnabled = "liquidation-daemon-enabled"
26-
FlagLiquidationDaemonLoopDelayMs = "liquidation-daemon-loop-delay-ms"
27-
FlagLiquidationDaemonQueryPageLimit = "liquidation-daemon-query-page-limit"
26+
FlagLiquidationDaemonEnabled = "liquidation-daemon-enabled"
27+
FlagLiquidationDaemonLoopDelayMs = "liquidation-daemon-loop-delay-ms"
28+
FlagLiquidationDaemonQueryPageLimit = "liquidation-daemon-query-page-limit"
29+
FlagLiquidationDaemonResponsePageLimit = "liquidation-daemon-response-page-limit"
2830

2931
// Oracle flags
3032
FlagOracleEnabled = "oracle.enabled"
@@ -62,6 +64,8 @@ type LiquidationFlags struct {
6264
LoopDelayMs uint32
6365
// QueryPageLimit configures the pagination limit for fetching subaccounts.
6466
QueryPageLimit uint64
67+
// ResponsePageLimit configures the pagination limit for the response to application.
68+
ResponsePageLimit uint64
6569
}
6670

6771
// PriceFlags contains configuration flags for the Price Daemon.
@@ -102,9 +106,10 @@ func GetDefaultDaemonFlags() DaemonFlags {
102106
EthRpcEndpoint: "",
103107
},
104108
Liquidation: LiquidationFlags{
105-
Enabled: true,
106-
LoopDelayMs: 1_600,
107-
QueryPageLimit: 1_000,
109+
Enabled: true,
110+
LoopDelayMs: 1_600,
111+
QueryPageLimit: 1_000,
112+
ResponsePageLimit: 2_000,
108113
},
109114
Price: PriceFlags{
110115
Enabled: false,
@@ -183,6 +188,11 @@ func AddDaemonFlagsToCmd(
183188
df.Liquidation.QueryPageLimit,
184189
"Limit on the number of items to fetch per query in the Liquidation Daemon task loop.",
185190
)
191+
cmd.Flags().Uint64(
192+
FlagLiquidationDaemonResponsePageLimit,
193+
df.Liquidation.ResponsePageLimit,
194+
"Limit on the number of items to send to the main application in the Liquidation Daemon task loop.",
195+
)
186196

187197
// Price Daemon.
188198
cmd.Flags().Bool(
@@ -276,6 +286,11 @@ func GetDaemonFlagValuesFromOptions(
276286
result.Liquidation.QueryPageLimit = v
277287
}
278288
}
289+
if option := appOpts.Get(FlagLiquidationDaemonResponsePageLimit); option != nil {
290+
if v, err := cast.ToUint64E(option); err == nil {
291+
result.Liquidation.ResponsePageLimit = v
292+
}
293+
}
279294

280295
// Price Daemon.
281296
if option := appOpts.Get(FlagPriceDaemonEnabled); option != nil {

protocol/daemons/flags/flags_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestAddDaemonFlagsToCmd(t *testing.T) {
2626
flags.FlagLiquidationDaemonEnabled,
2727
flags.FlagLiquidationDaemonLoopDelayMs,
2828
flags.FlagLiquidationDaemonQueryPageLimit,
29+
flags.FlagLiquidationDaemonResponsePageLimit,
2930

3031
flags.FlagPriceDaemonEnabled,
3132
flags.FlagPriceDaemonLoopDelayMs,
@@ -53,6 +54,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
5354
optsMap[flags.FlagLiquidationDaemonEnabled] = true
5455
optsMap[flags.FlagLiquidationDaemonLoopDelayMs] = uint32(2222)
5556
optsMap[flags.FlagLiquidationDaemonQueryPageLimit] = uint64(3333)
57+
optsMap[flags.FlagLiquidationDaemonResponsePageLimit] = uint64(4444)
5658

5759
optsMap[flags.FlagPriceDaemonEnabled] = true
5860
optsMap[flags.FlagPriceDaemonLoopDelayMs] = uint32(4444)
@@ -83,6 +85,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
8385
require.Equal(t, optsMap[flags.FlagLiquidationDaemonEnabled], r.Liquidation.Enabled)
8486
require.Equal(t, optsMap[flags.FlagLiquidationDaemonLoopDelayMs], r.Liquidation.LoopDelayMs)
8587
require.Equal(t, optsMap[flags.FlagLiquidationDaemonQueryPageLimit], r.Liquidation.QueryPageLimit)
88+
require.Equal(t, optsMap[flags.FlagLiquidationDaemonResponsePageLimit], r.Liquidation.ResponsePageLimit)
8689

8790
// Price Daemon.
8891
require.Equal(t, optsMap[flags.FlagPriceDaemonEnabled], r.Price.Enabled)

protocol/daemons/liquidation/client/grpc_helper.go

Lines changed: 135 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func (c *Client) SendLiquidatableSubaccountIds(
211211
liquidatableSubaccountIds []satypes.SubaccountId,
212212
negativeTncSubaccountIds []satypes.SubaccountId,
213213
openPositionInfoMap map[uint32]*clobtypes.SubaccountOpenPositionInfo,
214+
pageLimit uint64,
214215
) error {
215216
defer telemetry.ModuleMeasureSince(
216217
metrics.LiquidationDaemon,
@@ -241,19 +242,145 @@ func (c *Client) SendLiquidatableSubaccountIds(
241242
subaccountOpenPositionInfo = append(subaccountOpenPositionInfo, *openPositionInfoMap[perpetualId])
242243
}
243244

244-
request := &api.LiquidateSubaccountsRequest{
245-
BlockHeight: blockHeight,
246-
LiquidatableSubaccountIds: liquidatableSubaccountIds,
247-
NegativeTncSubaccountIds: negativeTncSubaccountIds,
248-
SubaccountOpenPositionInfo: subaccountOpenPositionInfo,
249-
}
245+
// Break this down to multiple requests if the number of subaccounts is too large.
246+
247+
// Liquidatable subaccount ids.
248+
requests := GenerateLiquidateSubaccountsPaginatedRequests(
249+
liquidatableSubaccountIds,
250+
blockHeight,
251+
pageLimit,
252+
)
250253

251-
if _, err := c.LiquidationServiceClient.LiquidateSubaccounts(ctx, request); err != nil {
252-
return err
254+
// Negative TNC subaccount ids.
255+
requests = append(
256+
requests,
257+
GenerateNegativeTNCSubaccountsPaginatedRequests(
258+
negativeTncSubaccountIds,
259+
blockHeight,
260+
pageLimit,
261+
)...,
262+
)
263+
264+
// Subaccount open position info.
265+
requests = append(
266+
requests,
267+
GenerateSubaccountOpenPositionPaginatedRequests(
268+
subaccountOpenPositionInfo,
269+
blockHeight,
270+
pageLimit,
271+
)...,
272+
)
273+
274+
for _, req := range requests {
275+
if _, err := c.LiquidationServiceClient.LiquidateSubaccounts(ctx, req); err != nil {
276+
return err
277+
}
253278
}
279+
254280
return nil
255281
}
256282

283+
func GenerateLiquidateSubaccountsPaginatedRequests(
284+
ids []satypes.SubaccountId,
285+
blockHeight uint32,
286+
pageLimit uint64,
287+
) []*api.LiquidateSubaccountsRequest {
288+
if len(ids) == 0 {
289+
return []*api.LiquidateSubaccountsRequest{
290+
{
291+
BlockHeight: blockHeight,
292+
LiquidatableSubaccountIds: []satypes.SubaccountId{},
293+
},
294+
}
295+
}
296+
297+
requests := make([]*api.LiquidateSubaccountsRequest, 0)
298+
for start := 0; start < len(ids); start += int(pageLimit) {
299+
end := lib.Min(start+int(pageLimit), len(ids))
300+
request := &api.LiquidateSubaccountsRequest{
301+
BlockHeight: blockHeight,
302+
LiquidatableSubaccountIds: ids[start:end],
303+
}
304+
requests = append(requests, request)
305+
}
306+
return requests
307+
}
308+
309+
func GenerateNegativeTNCSubaccountsPaginatedRequests(
310+
ids []satypes.SubaccountId,
311+
blockHeight uint32,
312+
pageLimit uint64,
313+
) []*api.LiquidateSubaccountsRequest {
314+
if len(ids) == 0 {
315+
return []*api.LiquidateSubaccountsRequest{
316+
{
317+
BlockHeight: blockHeight,
318+
NegativeTncSubaccountIds: []satypes.SubaccountId{},
319+
},
320+
}
321+
}
322+
323+
requests := make([]*api.LiquidateSubaccountsRequest, 0)
324+
for start := 0; start < len(ids); start += int(pageLimit) {
325+
end := lib.Min(start+int(pageLimit), len(ids))
326+
request := &api.LiquidateSubaccountsRequest{
327+
BlockHeight: blockHeight,
328+
NegativeTncSubaccountIds: ids[start:end],
329+
}
330+
requests = append(requests, request)
331+
}
332+
return requests
333+
}
334+
335+
func GenerateSubaccountOpenPositionPaginatedRequests(
336+
subaccountOpenPositionInfo []clobtypes.SubaccountOpenPositionInfo,
337+
blockHeight uint32,
338+
pageLimit uint64,
339+
) []*api.LiquidateSubaccountsRequest {
340+
if len(subaccountOpenPositionInfo) == 0 {
341+
return []*api.LiquidateSubaccountsRequest{
342+
{
343+
BlockHeight: blockHeight,
344+
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
345+
},
346+
}
347+
}
348+
349+
requests := make([]*api.LiquidateSubaccountsRequest, 0)
350+
for _, info := range subaccountOpenPositionInfo {
351+
// Long positions.
352+
for start := 0; start < len(info.SubaccountsWithLongPosition); start += int(pageLimit) {
353+
end := lib.Min(start+int(pageLimit), len(info.SubaccountsWithLongPosition))
354+
request := &api.LiquidateSubaccountsRequest{
355+
BlockHeight: blockHeight,
356+
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
357+
{
358+
PerpetualId: info.PerpetualId,
359+
SubaccountsWithLongPosition: info.SubaccountsWithLongPosition[start:end],
360+
},
361+
},
362+
}
363+
requests = append(requests, request)
364+
}
365+
366+
// Short positions.
367+
for start := 0; start < len(info.SubaccountsWithShortPosition); start += int(pageLimit) {
368+
end := lib.Min(start+int(pageLimit), len(info.SubaccountsWithShortPosition))
369+
request := &api.LiquidateSubaccountsRequest{
370+
BlockHeight: blockHeight,
371+
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
372+
{
373+
PerpetualId: info.PerpetualId,
374+
SubaccountsWithShortPosition: info.SubaccountsWithShortPosition[start:end],
375+
},
376+
},
377+
}
378+
requests = append(requests, request)
379+
}
380+
}
381+
return requests
382+
}
383+
257384
func newContextWithQueryBlockHeight(
258385
ctx context.Context,
259386
blockHeight uint32,

protocol/daemons/liquidation/client/grpc_helper_test.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -469,22 +469,45 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
469469
req := &api.LiquidateSubaccountsRequest{
470470
BlockHeight: uint32(50),
471471
LiquidatableSubaccountIds: []satypes.SubaccountId{constants.Alice_Num0, constants.Bob_Num0},
472-
NegativeTncSubaccountIds: []satypes.SubaccountId{constants.Carl_Num0, constants.Dave_Num0},
472+
}
473+
response := &api.LiquidateSubaccountsResponse{}
474+
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
475+
476+
req = &api.LiquidateSubaccountsRequest{
477+
BlockHeight: uint32(50),
478+
NegativeTncSubaccountIds: []satypes.SubaccountId{constants.Carl_Num0, constants.Dave_Num0},
479+
}
480+
response = &api.LiquidateSubaccountsResponse{}
481+
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
482+
483+
req = &api.LiquidateSubaccountsRequest{
484+
BlockHeight: uint32(50),
473485
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
474486
{
475487
PerpetualId: 0,
476488
SubaccountsWithLongPosition: []satypes.SubaccountId{
477489
constants.Alice_Num0,
478490
constants.Carl_Num0,
479491
},
492+
},
493+
},
494+
}
495+
response = &api.LiquidateSubaccountsResponse{}
496+
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
497+
498+
req = &api.LiquidateSubaccountsRequest{
499+
BlockHeight: uint32(50),
500+
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
501+
{
502+
PerpetualId: 0,
480503
SubaccountsWithShortPosition: []satypes.SubaccountId{
481504
constants.Bob_Num0,
482505
constants.Dave_Num0,
483506
},
484507
},
485508
},
486509
}
487-
response := &api.LiquidateSubaccountsResponse{}
510+
response = &api.LiquidateSubaccountsResponse{}
488511
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
489512
},
490513
liquidatableSubaccountIds: []satypes.SubaccountId{
@@ -512,12 +535,24 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
512535
"Success Empty": {
513536
setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
514537
req := &api.LiquidateSubaccountsRequest{
538+
BlockHeight: uint32(50),
539+
LiquidatableSubaccountIds: []satypes.SubaccountId{},
540+
}
541+
response := &api.LiquidateSubaccountsResponse{}
542+
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
543+
544+
req = &api.LiquidateSubaccountsRequest{
545+
BlockHeight: uint32(50),
546+
NegativeTncSubaccountIds: []satypes.SubaccountId{},
547+
}
548+
response = &api.LiquidateSubaccountsResponse{}
549+
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
550+
551+
req = &api.LiquidateSubaccountsRequest{
515552
BlockHeight: uint32(50),
516-
LiquidatableSubaccountIds: []satypes.SubaccountId{},
517-
NegativeTncSubaccountIds: []satypes.SubaccountId{},
518553
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
519554
}
520-
response := &api.LiquidateSubaccountsResponse{}
555+
response = &api.LiquidateSubaccountsResponse{}
521556
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
522557
},
523558
liquidatableSubaccountIds: []satypes.SubaccountId{},
@@ -527,10 +562,8 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
527562
"Errors are propagated": {
528563
setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
529564
req := &api.LiquidateSubaccountsRequest{
530-
BlockHeight: uint32(50),
531-
LiquidatableSubaccountIds: []satypes.SubaccountId{},
532-
NegativeTncSubaccountIds: []satypes.SubaccountId{},
533-
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
565+
BlockHeight: uint32(50),
566+
LiquidatableSubaccountIds: []satypes.SubaccountId{},
534567
}
535568
mck.On("LiquidateSubaccounts", ctx, req).Return(nil, errors.New("test error"))
536569
},
@@ -555,6 +588,7 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
555588
tc.liquidatableSubaccountIds,
556589
tc.negativeTncSubaccountIds,
557590
tc.subaccountOpenPositionInfo,
591+
1000,
558592
)
559593
require.Equal(t, tc.expectedError, err)
560594
})

protocol/daemons/liquidation/client/sub_task_runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func (s *SubTaskRunnerImpl) RunLiquidationDaemonTaskLoop(
8383
liquidatableSubaccountIds,
8484
negativeTncSubaccountIds,
8585
subaccountOpenPositionInfo,
86+
liqFlags.ResponsePageLimit,
8687
)
8788
if err != nil {
8889
return err

0 commit comments

Comments
 (0)