Skip to content

Commit 0bbd88b

Browse files
authored
all: use timer instead of time.After in loops, to avoid memleaks (ethereum#29241)
time.After is equivalent to NewTimer(d).C, and does not call Stop if the timer is no longer needed. This can cause memory leaks. This change changes many such occations to use NewTimer instead, and calling Stop once the timer is no longer needed.
1 parent 1126c6d commit 0bbd88b

File tree

7 files changed

+40
-9
lines changed

7 files changed

+40
-9
lines changed

core/bloombits/matcher.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets []
596596
// of the session, any request in-flight need to be responded to! Empty responses
597597
// are fine though in that case.
598598
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
599+
waitTimer := time.NewTimer(wait)
600+
defer waitTimer.Stop()
601+
599602
for {
600603
// Allocate a new bloom bit index to retrieve data for, stopping when done
601604
bit, ok := s.allocateRetrieval()
@@ -604,14 +607,15 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
604607
}
605608
// Bit allocated, throttle a bit if we're below our batch limit
606609
if s.pendingSections(bit) < batch {
610+
waitTimer.Reset(wait)
607611
select {
608612
case <-s.quit:
609613
// Session terminating, we can't meaningfully service, abort
610614
s.allocateSections(bit, 0)
611615
s.deliverSections(bit, []uint64{}, [][]byte{})
612616
return
613617

614-
case <-time.After(wait):
618+
case <-waitTimer.C:
615619
// Throttling up, fetch whatever is available
616620
}
617621
}

eth/downloader/beaconsync.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
289289
localHeaders = d.readHeaderRange(tail, int(count))
290290
log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
291291
}
292+
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
293+
defer fsHeaderContCheckTimer.Stop()
294+
292295
for {
293296
// Some beacon headers might have appeared since the last cycle, make
294297
// sure we're always syncing to all available ones
@@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
381384
}
382385
// State sync still going, wait a bit for new headers and retry
383386
log.Trace("Pivot not yet committed, waiting...")
387+
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
384388
select {
385-
case <-time.After(fsHeaderContCheck):
389+
case <-fsHeaderContCheckTimer.C:
386390
case <-d.cancelCh:
387391
return errCanceled
388392
}

eth/downloader/downloader.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,7 +1276,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
12761276
var (
12771277
mode = d.getMode()
12781278
gotHeaders = false // Wait for batches of headers to process
1279+
timer = time.NewTimer(time.Second)
12791280
)
1281+
defer timer.Stop()
1282+
12801283
for {
12811284
select {
12821285
case <-d.cancelCh:
@@ -1397,10 +1400,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13971400
if mode == FullSync || mode == SnapSync {
13981401
// If we've reached the allowed number of pending headers, stall a bit
13991402
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
1403+
timer.Reset(time.Second)
14001404
select {
14011405
case <-d.cancelCh:
14021406
return errCanceled
1403-
case <-time.After(time.Second):
1407+
case <-timer.C:
14041408
}
14051409
}
14061410
// Otherwise insert the headers for content retrieval
@@ -1567,7 +1571,10 @@ func (d *Downloader) processSnapSyncContent() error {
15671571
var (
15681572
oldPivot *fetchResult // Locked in pivot block, might change eventually
15691573
oldTail []*fetchResult // Downloaded content after the pivot
1574+
timer = time.NewTimer(time.Second)
15701575
)
1576+
defer timer.Stop()
1577+
15711578
for {
15721579
// Wait for the next batch of downloaded data to be available. If we have
15731580
// not yet reached the pivot point, wait blockingly as there's no need to
@@ -1650,6 +1657,7 @@ func (d *Downloader) processSnapSyncContent() error {
16501657
oldPivot = P
16511658
}
16521659
// Wait for completion, occasionally checking for pivot staleness
1660+
timer.Reset(time.Second)
16531661
select {
16541662
case <-sync.done:
16551663
if sync.err != nil {
@@ -1660,7 +1668,7 @@ func (d *Downloader) processSnapSyncContent() error {
16601668
}
16611669
oldPivot = nil
16621670

1663-
case <-time.After(time.Second):
1671+
case <-timer.C:
16641672
oldTail = afterP
16651673
continue
16661674
}

ethstats/ethstats.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,10 +544,13 @@ func (s *Service) reportLatency(conn *connWrapper) error {
544544
return err
545545
}
546546
// Wait for the pong request to arrive back
547+
timer := time.NewTimer(5 * time.Second)
548+
defer timer.Stop()
549+
547550
select {
548551
case <-s.pongCh:
549552
// Pong delivered, report the latency
550-
case <-time.After(5 * time.Second):
553+
case <-timer.C:
551554
// Ping timeout, abort
552555
return errors.New("ping timed out")
553556
}

p2p/simulations/adapters/exec.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error {
303303
go func() {
304304
waitErr <- n.Cmd.Wait()
305305
}()
306+
timer := time.NewTimer(5 * time.Second)
307+
defer timer.Stop()
308+
306309
select {
307310
case err := <-waitErr:
308311
return err
309-
case <-time.After(5 * time.Second):
312+
case <-timer.C:
310313
return n.Cmd.Process.Kill()
311314
}
312315
}

p2p/simulations/mocker.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
6565
if err != nil {
6666
panic("Could not startup node network for mocker")
6767
}
68-
tick := time.NewTicker(10 * time.Second)
68+
var (
69+
tick = time.NewTicker(10 * time.Second)
70+
timer = time.NewTimer(3 * time.Second)
71+
)
6972
defer tick.Stop()
73+
defer timer.Stop()
74+
7075
for {
7176
select {
7277
case <-quit:
@@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
8085
return
8186
}
8287

88+
timer.Reset(3 * time.Second)
8389
select {
8490
case <-quit:
8591
log.Info("Terminating simulation loop")
8692
return
87-
case <-time.After(3 * time.Second):
93+
case <-timer.C:
8894
}
8995

9096
log.Debug("starting node", "id", id)

p2p/simulations/network.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error {
10281028
}
10291029
}
10301030

1031+
timeout := time.NewTimer(snapshotLoadTimeout)
1032+
defer timeout.Stop()
1033+
10311034
select {
10321035
// Wait until all connections from the snapshot are established.
10331036
case <-allConnected:
10341037
// Make sure that we do not wait forever.
1035-
case <-time.After(snapshotLoadTimeout):
1038+
case <-timeout.C:
10361039
return errors.New("snapshot connections not established")
10371040
}
10381041
return nil

0 commit comments

Comments
 (0)