Skip to content

Commit 1b37a17

Browse files
mateeullahmalikMatee Ullah
andauthored
[PSL-1202] limit max number of calls to cnode (#885)
* [PSL-1202] limit max number of calls to cnode * fix --------- Co-authored-by: Matee Ullah <mateeullah@Matees-MacBook-Pro.local>
1 parent 11ad8c2 commit 1b37a17

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

pastel/jsonrpc/jsonrpc.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ import (
1515
"time"
1616

1717
"encoding/json"
18+
"golang.org/x/sync/semaphore"
1819
)
1920

2021
const (
2122
jsonrpcVersion = "2.0"
2223
timeout = 70 * time.Second
2324
httpTimeout = 60 * time.Second
25+
maxConcurrentRequests = int64(350)
2426
)
2527

2628
// RPCClient sends JSON-RPC requests over HTTP to the provided JSON-RPC backend.
@@ -246,6 +248,7 @@ type rpcClient struct {
246248
endpoint string
247249
httpClient *http.Client
248250
customHeaders map[string]string
251+
sem *semaphore.Weighted
249252
}
250253

251254
// RPCClientOpts can be provided to NewClientWithOpts() to change configuration of RPCClient.
@@ -320,6 +323,7 @@ func NewClientWithOpts(endpoint string, opts *RPCClientOpts) RPCClient {
320323
IdleConnTimeout: 60 * time.Second,
321324
},
322325
},
326+
sem: semaphore.NewWeighted(maxConcurrentRequests),
323327
customHeaders: make(map[string]string),
324328
}
325329

@@ -449,6 +453,11 @@ func (client *rpcClient) newRequest(ctx context.Context, req interface{}) (*http
449453
}
450454

451455
func (client *rpcClient) doCall(cctx context.Context, RPCRequest *RPCRequest) (*RPCResponse, error) {
456+
if err := client.sem.Acquire(cctx, 1); err != nil {
457+
return nil, fmt.Errorf("waiting for semaphore on rpc call on %v", err.Error())
458+
}
459+
defer client.sem.Release(1)
460+
452461
ctx, cancel := context.WithTimeout(cctx, timeout)
453462
defer cancel()
454463

@@ -497,11 +506,16 @@ func (client *rpcClient) doCall(cctx context.Context, RPCRequest *RPCRequest) (*
497506
}
498507

499508
func (client *rpcClient) doBatchCall(rpcRequest []*RPCRequest) ([]*RPCResponse, error) {
509+
if err := client.sem.Acquire(context.Background(), 1); err != nil {
510+
return nil, fmt.Errorf("waiting for semaphore on rpc batch call on %v", err.Error())
511+
}
512+
defer client.sem.Release(1)
513+
500514
httpRequest, err := client.newRequest(context.Background(), rpcRequest)
501515
if err != nil {
502516
return nil, fmt.Errorf("rpc batch call on %v: %v", client.endpoint, err.Error())
503517
}
504-
518+
505519
httpResponse, err := client.httpClient.Do(httpRequest)
506520
if err != nil {
507521
return nil, fmt.Errorf("rpc batch call on %v: %v", httpRequest.URL.String(), err.Error())

0 commit comments

Comments
 (0)