Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit f6a631e

Browse files
committed
stop arbiter if all meters are stopped
1 parent cac0b30 commit f6a631e

File tree

2 files changed

+33
-16
lines changed

2 files changed

+33
-16
lines changed

meter.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,7 @@ func NewMeter() Meter {
3838
return NilMeter{}
3939
}
4040
m := newStandardMeter()
41-
arbiter.Lock()
42-
defer arbiter.Unlock()
43-
arbiter.meters[m] = struct{}{}
44-
if !arbiter.started {
45-
arbiter.started = true
46-
go arbiter.tick()
47-
}
41+
m.startArbiter()
4842
return m
4943
}
5044

@@ -145,9 +139,7 @@ func newStandardMeter() *StandardMeter {
145139
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
146140
func (m *StandardMeter) Stop() {
147141
if atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
148-
arbiter.Lock()
149-
delete(arbiter.meters, m)
150-
arbiter.Unlock()
142+
m.stopArbiter()
151143
}
152144
}
153145

@@ -156,7 +148,7 @@ func (m *StandardMeter) Count() int64 {
156148
return atomic.LoadInt64(&m.snapshot.count)
157149
}
158150

159-
// Mark records the occurance of n events.
151+
// Mark records the occurrance of n events.
160152
func (m *StandardMeter) Mark(n int64) {
161153
if atomic.LoadUint32(&m.stopped) == 1 {
162154
return
@@ -221,23 +213,50 @@ func (m *StandardMeter) tick() {
221213
m.updateSnapshot()
222214
}
223215

216+
func (m *StandardMeter) startArbiter() {
217+
arbiter.Lock()
218+
defer arbiter.Unlock()
219+
arbiter.meters[m] = struct{}{}
220+
if !arbiter.started {
221+
arbiter.started = true
222+
go arbiter.tick()
223+
}
224+
}
225+
226+
func (m *StandardMeter) stopArbiter() {
227+
arbiter.Lock()
228+
defer arbiter.Unlock()
229+
delete(arbiter.meters, m)
230+
if len(arbiter.meters) == 0 && arbiter.started {
231+
arbiter.cancel <- struct{}{}
232+
arbiter.started = false
233+
}
234+
}
235+
224236
// meterArbiter ticks meters every 5s from a single goroutine.
225237
// meters are references in a set for future stopping.
226238
type meterArbiter struct {
227239
sync.RWMutex
228240
started bool
229241
meters map[*StandardMeter]struct{}
230-
ticker *time.Ticker
242+
cancel chan struct{}
231243
}
232244

233-
var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
245+
var arbiter = meterArbiter{
246+
meters: make(map[*StandardMeter]struct{}),
247+
cancel: make(chan struct{}),
248+
}
234249

235250
// Ticks meters on the scheduled interval
236251
func (ma *meterArbiter) tick() {
252+
ticker := time.NewTicker(5 * time.Second)
253+
defer ticker.Stop()
237254
for {
238255
select {
239-
case <-ma.ticker.C:
256+
case <-ticker.C:
240257
ma.tickMeters()
258+
case <-ma.cancel:
259+
return
241260
}
242261
}
243262
}

meter_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func BenchmarkMeterParallel(b *testing.B) {
2929
func TestMeterConcurrency(t *testing.T) {
3030
rand.Seed(time.Now().Unix())
3131
ma := meterArbiter{
32-
ticker: time.NewTicker(time.Millisecond),
3332
meters: make(map[*StandardMeter]struct{}),
3433
}
3534
m := newStandardMeter()
@@ -62,7 +61,6 @@ func TestGetOrRegisterMeter(t *testing.T) {
6261

6362
func TestMeterDecay(t *testing.T) {
6463
ma := meterArbiter{
65-
ticker: time.NewTicker(time.Millisecond),
6664
meters: make(map[*StandardMeter]struct{}),
6765
}
6866
m := newStandardMeter()

0 commit comments

Comments
 (0)