Skip to content

Commit

Permalink
Merge branch 'dev' into version-bump-v1.10.5
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Jul 18, 2023
2 parents 775de27 + febd3b5 commit c87beca
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
2 changes: 1 addition & 1 deletion tests/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (te *TestEnvironment) StartCluster() error {
switch te.clusterType {
case StandAlone:
tests.Outf("{{magenta}}starting network-runner with %q{{/}}\n", te.avalancheGoExecPath)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
resp, err := te.GetRunnerClient().Start(ctx, te.avalancheGoExecPath,
runner_sdk.WithNumNodes(5),
runner_sdk.WithGlobalNodeConfig(fmt.Sprintf(`{"log-level":"%s"}`, te.avalancheGoLogLevel)),
Expand Down
10 changes: 3 additions & 7 deletions x/sync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,9 @@ func getAndParse[T any](
// It's safe to call this method multiple times concurrently.
func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, error) {
var (
response []byte
nodeID ids.NodeID
err error
startTime = time.Now()
response []byte
nodeID ids.NodeID
err error
)

c.metrics.RequestMade()
Expand All @@ -261,12 +260,9 @@ func (c *client) get(ctx context.Context, request []byte) (ids.NodeID, []byte, e
}
if err != nil {
c.metrics.RequestFailed()
c.networkClient.TrackBandwidth(nodeID, 0)
return nodeID, response, err
}

bandwidth := float64(len(response)) / (time.Since(startTime).Seconds() + epsilon)
c.networkClient.TrackBandwidth(nodeID, bandwidth)
c.metrics.RequestSucceeded()
return nodeID, response, nil
}
22 changes: 10 additions & 12 deletions x/sync/network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ type NetworkClient interface {
// Returns response bytes, and ErrRequestFailed if the request should be retried.
Request(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error)

// TrackBandwidth should be called for each valid response with the bandwidth
// (length of response divided by request time), and with 0 if the response is invalid.
TrackBandwidth(nodeID ids.NodeID, bandwidth float64)

// The following declarations allow this interface to be embedded in the VM
// to handle incoming responses from peers.
AppResponse(context.Context, ids.NodeID, uint32, []byte) error
Expand Down Expand Up @@ -250,15 +246,24 @@ func (c *networkClient) request(
handler := newResponseHandler()
c.outstandingRequestHandlers[requestID] = handler

var (
response []byte
startTime = time.Now()
)

c.lock.Unlock() // unlock so response can be received

var response []byte
select {
case <-ctx.Done():
c.peers.TrackBandwidth(nodeID, 0)
return nil, ctx.Err()
case response = <-handler.responseChan:
elapsedSeconds := time.Since(startTime).Seconds()
bandwidth := float64(len(response))/elapsedSeconds + epsilon
c.peers.TrackBandwidth(nodeID, bandwidth)
}
if handler.failed {
c.peers.TrackBandwidth(nodeID, 0)
return nil, ErrRequestFailed
}

Expand Down Expand Up @@ -305,10 +310,3 @@ func (c *networkClient) Disconnected(_ context.Context, nodeID ids.NodeID) error
c.peers.Disconnected(nodeID)
return nil
}

func (c *networkClient) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
c.lock.Lock()
defer c.lock.Unlock()

c.peers.TrackBandwidth(nodeID, bandwidth)
}

0 comments on commit c87beca

Please sign in to comment.