Skip to content

Commit

Permalink
cmd: move ctx check before writing to channel (#3054)
Browse files Browse the repository at this point in the history
Fix writing to closed channel error.

category: bug 
ticket: #3053
  • Loading branch information
KaloyanTanev authored Apr 30, 2024
1 parent 71fbeb2 commit aaf1730
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions cmd/testpeers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math/rand"
"slices"
"strings"
"sync"
"time"

k1 "github.com/decred/dcrd/dcrec/secp256k1/v4"
Expand Down Expand Up @@ -202,23 +203,34 @@ func pingPeerOnce(ctx context.Context, tcpNode host.Host, peer p2p.Peer) (ping.R
return result, nil
}

func pingPeerContinuously(ctx context.Context, tcpNode host.Host, peer p2p.Peer, resCh chan ping.Result) {
func pingPeerContinuously(ctx context.Context, tcpNode host.Host, peer p2p.Peer, resCh chan<- ping.Result) {
for {
r, err := pingPeerOnce(ctx, tcpNode, peer)
if err != nil {
return
}

select {
case <-ctx.Done():
return
default:
r, err := pingPeerOnce(ctx, tcpNode, peer)
if err != nil {
return
}
resCh <- r
case resCh <- r:
awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here
time.Sleep(time.Duration(awaitTime) * time.Millisecond)
sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond)
}
}
}

func sleepWithContext(ctx context.Context, d time.Duration) {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
}
}

func runTestPeers(ctx context.Context, w io.Writer, conf testPeersConfig) error {
err := log.InitLogger(conf.Log)
if err != nil {
Expand Down Expand Up @@ -530,25 +542,25 @@ func peerPingLoadTest(ctx context.Context, conf *testPeersConfig, tcpNode host.H
)
testRes := testResult{Name: "PingLoad"}

deadlineC := time.After(conf.LoadTestDuration)
testResCh := make(chan ping.Result, math.MaxInt16)
pingCtx, cancel := context.WithCancel(ctx)
pingCtx, cancel := context.WithTimeout(ctx, conf.LoadTestDuration)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

finished := false
for !finished {
var wg sync.WaitGroup
for pingCtx.Err() == nil {
select {
case <-ticker.C:
go pingPeerContinuously(pingCtx, tcpNode, peer, testResCh)
case <-ctx.Done():
finished = true
case <-deadlineC:
finished = true
wg.Add(1)
go func() {
pingPeerContinuously(pingCtx, tcpNode, peer, testResCh)
wg.Done()
}()
case <-pingCtx.Done():
}
}
cancel()
wg.Wait()
close(testResCh)

highestRTT := time.Duration(0)
Expand Down

0 comments on commit aaf1730

Please sign in to comment.