From aaf17304f018e0ac55bd8305cd0004f6d5ea9ca4 Mon Sep 17 00:00:00 2001 From: Kaloyan Tanev <24719519+KaloyanTanev@users.noreply.github.com> Date: Tue, 30 Apr 2024 12:55:01 +0200 Subject: [PATCH] cmd: move ctx check before writing to channel (#3054) Fix writing to closed channel error. category: bug ticket: #3053 --- cmd/testpeers.go | 48 ++++++++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/cmd/testpeers.go b/cmd/testpeers.go index 18106f6cd..6ec0167d7 100644 --- a/cmd/testpeers.go +++ b/cmd/testpeers.go @@ -12,6 +12,7 @@ import ( "math/rand" "slices" "strings" + "sync" "time" k1 "github.com/decred/dcrd/dcrec/secp256k1/v4" @@ -202,23 +203,34 @@ func pingPeerOnce(ctx context.Context, tcpNode host.Host, peer p2p.Peer) (ping.R return result, nil } -func pingPeerContinuously(ctx context.Context, tcpNode host.Host, peer p2p.Peer, resCh chan ping.Result) { +func pingPeerContinuously(ctx context.Context, tcpNode host.Host, peer p2p.Peer, resCh chan<- ping.Result) { for { + r, err := pingPeerOnce(ctx, tcpNode, peer) + if err != nil { + return + } + select { case <-ctx.Done(): return - default: - r, err := pingPeerOnce(ctx, tcpNode, peer) - if err != nil { - return - } - resCh <- r + case resCh <- r: awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here - time.Sleep(time.Duration(awaitTime) * time.Millisecond) + sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond) } } } +func sleepWithContext(ctx context.Context, d time.Duration) { + timer := time.NewTimer(d) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + } +} + func runTestPeers(ctx context.Context, w io.Writer, conf testPeersConfig) error { err := log.InitLogger(conf.Log) if err != nil { @@ -530,25 +542,25 @@ func peerPingLoadTest(ctx context.Context, conf *testPeersConfig, tcpNode host.H ) testRes := testResult{Name: "PingLoad"} - deadlineC := time.After(conf.LoadTestDuration) testResCh := make(chan ping.Result, math.MaxInt16) - pingCtx, cancel := context.WithCancel(ctx) + pingCtx, cancel := context.WithTimeout(ctx, conf.LoadTestDuration) defer cancel() ticker := time.NewTicker(time.Second) defer ticker.Stop() - finished := false - for !finished { + var wg sync.WaitGroup + for pingCtx.Err() == nil { select { case <-ticker.C: - go pingPeerContinuously(pingCtx, tcpNode, peer, testResCh) - case <-ctx.Done(): - finished = true - case <-deadlineC: - finished = true + wg.Add(1) + go func() { + pingPeerContinuously(pingCtx, tcpNode, peer, testResCh) + wg.Done() + }() + case <-pingCtx.Done(): } } - cancel() + wg.Wait() close(testResCh) highestRTT := time.Duration(0)