-
Notifications
You must be signed in to change notification settings - Fork 0
/
counter.go
104 lines (91 loc) · 2.1 KB
/
counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package timebased
import (
"sort"
"sync"
"time"
)
type counter map[string]uint64
func NewCounterWithStore(interval time.Duration, store Store) *TimedCounter {
c := newTimedCounter(interval)
c.store = store
c.start()
store.start()
return c
}
// TimedCounter every interval passes, try to store and reset the counter.
type TimedCounter struct {
store Store
keyChannel chan string
stopChannel chan interface{}
current counter
interval time.Duration
mutex *sync.Mutex
}
func newTimedCounter(interval time.Duration) *TimedCounter {
return &TimedCounter{
keyChannel: make(chan string),
stopChannel: make(chan interface{}),
interval: interval,
current: counter{},
mutex: &sync.Mutex{},
}
}
func (tc *TimedCounter) Inc(key string) {
tc.keyChannel <- key
}
// increase the key's count stored in tc.current
func (tc *TimedCounter) increase(key string) {
tc.mutex.Lock()
defer tc.mutex.Unlock()
if val, ok := tc.current[key]; ok {
tc.current[key] = val + 1
} else {
tc.current[key] = 1
}
}
// set tc.current to a new counter, start next cycle of counting
func (tc *TimedCounter) nextCycle() {
tc.mutex.Lock()
defer tc.mutex.Unlock()
if tc.store != nil {
tc.store.store(tc.current)
}
tc.current = counter{}
}
func (tc *TimedCounter) start() {
go func() {
ticker := AlignToInterval(tc.interval)
for {
select {
case <-ticker:
tc.nextCycle()
ticker = AlignToInterval(tc.interval)
case key := <-tc.keyChannel:
tc.increase(key)
case <-tc.stopChannel:
return
}
}
}()
}
func (tc *TimedCounter) Stop() {
close(tc.stopChannel)
}
func (tc *TimedCounter) Report() (result []Element, err error) {
if tc.store == nil {
return
}
result, err = tc.store.collect()
if err != nil {
return
}
if !tc.store.sorted() {
sort.SliceStable(result, func(i, j int) bool { return result[i].Count >= result[j].Count })
}
return
}
func AlignToInterval(interval time.Duration) <-chan time.Time {
from := time.Now().Local().UnixNano()
unit := interval.Nanoseconds()
return time.After(time.Duration((from/unit+1)*unit-from) + time.Microsecond*10)
}