Skip to content

Commit 82370c6

Browse files
committed
sync client
1 parent 44b27dd commit 82370c6

File tree

7 files changed

+127
-1131
lines changed

7 files changed

+127
-1131
lines changed

protocol/app/app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,6 +1418,7 @@ func New(
14181418

14191419
if app.GrpcStreamingManager.Enabled() {
14201420
app.GrpcStreamingTestClient = streamingclient.NewGrpcClient(appFlags, app.Logger())
1421+
app.GrpcStreamingManager.SubscribeTestClient(app.GrpcStreamingTestClient)
14211422
}
14221423

14231424
// Report out app version and git commit. This will be run when validators restart.

protocol/streaming/grpc/client/client.go

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package client
22

33
import (
4-
"context"
54
"sync"
65

76
"cosmossdk.io/log"
87

98
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
10-
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
119
v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types"
1210
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
1311
)
@@ -38,44 +36,44 @@ func NewGrpcClient(appflags appflags.Flags, logger log.Logger) *GrpcClient {
3836
}
3937

4038
// Subscribe to grpc orderbook updates.
41-
go func() {
42-
grpcClient := daemontypes.GrpcClientImpl{}
43-
44-
// Make a connection to the Cosmos gRPC query services.
45-
queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress)
46-
if err != nil {
47-
logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err)
48-
return
49-
}
50-
defer func() {
51-
if err := grpcClient.CloseConnection(queryConn); err != nil {
52-
logger.Error("Failed to close gRPC connection", "error", err)
53-
}
54-
}()
55-
56-
clobQueryClient := clobtypes.NewQueryClient(queryConn)
57-
updateClient, err := clobQueryClient.StreamOrderbookUpdates(
58-
context.Background(),
59-
&clobtypes.StreamOrderbookUpdatesRequest{
60-
ClobPairId: []uint32{0, 1},
61-
},
62-
)
63-
if err != nil {
64-
logger.Error("Failed to stream orderbook updates", "error", err)
65-
return
66-
}
67-
68-
for {
69-
update, err := updateClient.Recv()
70-
if err != nil {
71-
logger.Error("Failed to receive orderbook update", "error", err)
72-
return
73-
}
74-
75-
logger.Info("Received orderbook update", "update", update)
76-
client.Update(update)
77-
}
78-
}()
39+
// go func() {
40+
// grpcClient := daemontypes.GrpcClientImpl{}
41+
42+
// // Make a connection to the Cosmos gRPC query services.
43+
// queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress)
44+
// if err != nil {
45+
// logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err)
46+
// return
47+
// }
48+
// defer func() {
49+
// if err := grpcClient.CloseConnection(queryConn); err != nil {
50+
// logger.Error("Failed to close gRPC connection", "error", err)
51+
// }
52+
// }()
53+
54+
// clobQueryClient := clobtypes.NewQueryClient(queryConn)
55+
// updateClient, err := clobQueryClient.StreamOrderbookUpdates(
56+
// context.Background(),
57+
// &clobtypes.StreamOrderbookUpdatesRequest{
58+
// ClobPairId: []uint32{0, 1},
59+
// },
60+
// )
61+
// if err != nil {
62+
// logger.Error("Failed to stream orderbook updates", "error", err)
63+
// return
64+
// }
65+
66+
// for {
67+
// update, err := updateClient.Recv()
68+
// if err != nil {
69+
// logger.Error("Failed to receive orderbook update", "error", err)
70+
// return
71+
// }
72+
73+
// logger.Info("Received orderbook update", "update", update)
74+
// client.Update(update)
75+
// }
76+
// }()
7977
return client
8078
}
8179

protocol/streaming/grpc/grpc_streaming_manager.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cosmos/gogoproto/proto"
88
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
99
"github.com/dydxprotocol/v4-chain/protocol/lib"
10+
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client"
1011
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
1112
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
1213
)
@@ -32,6 +33,9 @@ type OrderbookSubscription struct {
3233

3334
// Stream
3435
srv clobtypes.Query_StreamOrderbookUpdatesServer
36+
37+
// Testing
38+
client *client.GrpcClient
3539
}
3640

3741
func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
@@ -72,6 +76,19 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
7276
return nil
7377
}
7478

79+
func (sm *GrpcStreamingManagerImpl) SubscribeTestClient(client *client.GrpcClient) {
80+
subscription := &OrderbookSubscription{
81+
clobPairIds: []uint32{0, 1},
82+
client: client,
83+
}
84+
85+
sm.Lock()
86+
defer sm.Unlock()
87+
88+
sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription
89+
sm.nextSubscriptionId++
90+
}
91+
7592
// SendOrderbookUpdates groups updates by their clob pair ids and
7693
// sends messages to the subscribers.
7794
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
@@ -105,7 +122,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
105122

106123
// Send updates to subscribers.
107124
idsToRemove := make([]uint32, 0)
108-
for id, subscription := range sm.orderbookSubscriptions {
125+
for _, subscription := range sm.orderbookSubscriptions {
109126
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
110127
for _, clobPairId := range subscription.clobPairIds {
111128
if updates, ok := v1updates[clobPairId]; ok {
@@ -114,16 +131,14 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
114131
}
115132

116133
if len(updatesToSend) > 0 {
117-
if err := subscription.srv.Send(
134+
subscription.client.Update(
118135
&clobtypes.StreamOrderbookUpdatesResponse{
119136
Updates: updatesToSend,
120137
Snapshot: snapshot,
121138
BlockHeight: blockHeight,
122139
ExecMode: uint32(execMode),
123140
},
124-
); err != nil {
125-
idsToRemove = append(idsToRemove, id)
126-
}
141+
)
127142
}
128143
}
129144

protocol/streaming/grpc/noop_streaming_manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package grpc
22

33
import (
44
sdk "github.com/cosmos/cosmos-sdk/types"
5+
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client"
56
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
67
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
78
)
@@ -27,6 +28,9 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
2728
return clobtypes.ErrGrpcStreamingManagerNotEnabled
2829
}
2930

31+
func (sm *NoopGrpcStreamingManager) SubscribeTestClient(client *client.GrpcClient) {
32+
}
33+
3034
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
3135
updates *clobtypes.OffchainUpdates,
3236
snapshot bool,

protocol/streaming/grpc/types/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package types
22

33
import (
44
sdk "github.com/cosmos/cosmos-sdk/types"
5+
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client"
56
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
67
)
78

@@ -15,6 +16,9 @@ type GrpcStreamingManager interface {
1516
) (
1617
err error,
1718
)
19+
SubscribeTestClient(
20+
client *client.GrpcClient,
21+
)
1822
GetUninitializedClobPairIds() []uint32
1923
SendOrderbookUpdates(
2024
offchainUpdates *clobtypes.OffchainUpdates,

0 commit comments

Comments
 (0)