Skip to content

Commit 67625bf

Browse files
mergify[bot]jayy04
andauthored
Increase FNS default numbers and close connections properly with reasons (backport #2634) (#2639)
Co-authored-by: jayy04 <103467857+jayy04@users.noreply.github.com>
1 parent c0a8826 commit 67625bf

File tree

3 files changed

+34
-19
lines changed

3 files changed

+34
-19
lines changed

protocol/app/flags/flags.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ const (
7272

7373
DefaultGrpcStreamingEnabled = false
7474
DefaultGrpcStreamingFlushIntervalMs = 50
75-
DefaultGrpcStreamingMaxBatchSize = 10000
76-
DefaultGrpcStreamingMaxChannelBufferSize = 10000
75+
DefaultGrpcStreamingMaxBatchSize = 100_000
76+
DefaultGrpcStreamingMaxChannelBufferSize = 100_000
7777
DefaultWebsocketStreamingEnabled = false
7878
DefaultWebsocketStreamingPort = 9092
7979
DefaultFullNodeStreamingSnapshotInterval = 0

protocol/app/flags/flags_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
257257
expectedGrpcEnable: true,
258258
expectedGrpcStreamingEnable: false,
259259
expectedGrpcStreamingFlushMs: 50,
260-
expectedGrpcStreamingBatchSize: 10000,
261-
expectedGrpcStreamingMaxChannelBufferSize: 10000,
260+
expectedGrpcStreamingBatchSize: 100_000,
261+
expectedGrpcStreamingMaxChannelBufferSize: 100_000,
262262
expectedWebsocketEnabled: false,
263263
expectedWebsocketPort: 9092,
264264
expectedFullNodeStreamingSnapshotInterval: 0,

protocol/streaming/ws/websocket_server.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
const (
2020
CLOB_PAIR_IDS_QUERY_PARAM = "clobPairIds"
2121
MARKET_IDS_QUERY_PARAM = "marketIds"
22+
23+
CLOSE_DEADLINE = 5 * time.Second
2224
)
2325

2426
var upgrader = websocket.Upgrader{
@@ -68,33 +70,30 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
6870
// Parse clobPairIds from query parameters
6971
clobPairIds, err := parseUint32(r, CLOB_PAIR_IDS_QUERY_PARAM)
7072
if err != nil {
71-
ws.logger.Error(
72-
"Error parsing clobPairIds",
73-
"err", err,
74-
)
75-
http.Error(w, err.Error(), http.StatusBadRequest)
73+
ws.logger.Error("Error parsing clobPairIds", "err", err)
74+
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
75+
ws.logger.Error("Error sending close message", "err", err)
76+
}
7677
return
7778
}
7879

7980
// Parse marketIds from query parameters
8081
marketIds, err := parseUint32(r, MARKET_IDS_QUERY_PARAM)
8182
if err != nil {
82-
ws.logger.Error(
83-
"Error parsing marketIds",
84-
"err", err,
85-
)
86-
http.Error(w, err.Error(), http.StatusBadRequest)
83+
ws.logger.Error("Error parsing marketIds", "err", err)
84+
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
85+
ws.logger.Error("Error sending close message", "err", err)
86+
}
8787
return
8888
}
8989

9090
// Parse subaccountIds from query parameters
9191
subaccountIds, err := parseSubaccountIds(r)
9292
if err != nil {
93-
ws.logger.Error(
94-
"Error parsing subaccountIds",
95-
"err", err,
96-
)
97-
http.Error(w, err.Error(), http.StatusBadRequest)
93+
ws.logger.Error("Error parsing subaccountIds", "err", err)
94+
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
95+
ws.logger.Error("Error sending close message", "err", err)
96+
}
9897
return
9998
}
10099

@@ -118,10 +117,26 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
118117
"Ending handler for websocket connection",
119118
"err", err,
120119
)
120+
if err := sendCloseWithReason(conn, websocket.CloseInternalServerErr, err.Error()); err != nil {
121+
ws.logger.Error("Error sending close message", "err", err)
122+
}
121123
return
122124
}
123125
}
124126

127+
func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
128+
closeMessage := websocket.FormatCloseMessage(closeCode, reason)
129+
// Set a write deadline to avoid blocking indefinitely
130+
if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil {
131+
return err
132+
}
133+
return conn.WriteControl(
134+
websocket.CloseMessage,
135+
closeMessage,
136+
time.Now().Add(CLOSE_DEADLINE),
137+
)
138+
}
139+
125140
// parseSubaccountIds is a helper function to parse the subaccountIds from the query parameters.
126141
func parseSubaccountIds(r *http.Request) ([]*satypes.SubaccountId, error) {
127142
subaccountIdsParam := r.URL.Query().Get("subaccountIds")

0 commit comments

Comments
 (0)