-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdelay_set.go
122 lines (106 loc) · 2.42 KB
/
delay_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package utp_go
import (
"sync"
"time"
)
type expireFunc[P any] func(key any, value P)
type timeWheelItem[P any] struct {
value P
key any
}
type timeWheel[P any] struct {
stopped chan struct{}
interval time.Duration
slots []map[any]*timeWheelItem[P]
ticker *time.Ticker
current int
slotNum int
maxDelay time.Duration
handleExpireFunc expireFunc[P]
mu sync.RWMutex
}
func newTimeWheel[P any](interval time.Duration, slotNum int, handleExpireFunc expireFunc[P]) *timeWheel[P] {
tw := &timeWheel[P]{
stopped: make(chan struct{}),
interval: interval,
slots: make([]map[any]*timeWheelItem[P], slotNum),
current: 0,
slotNum: slotNum,
maxDelay: time.Duration(slotNum) * interval,
handleExpireFunc: handleExpireFunc,
}
for i := 0; i < slotNum; i++ {
tw.slots[i] = make(map[any]*timeWheelItem[P])
}
tw.ticker = time.NewTicker(interval)
go tw.run()
return tw
}
func (tw *timeWheel[P]) put(key any, value P, delay time.Duration) {
tw.mu.Lock()
defer tw.mu.Unlock()
if delay > tw.maxDelay {
delay = tw.maxDelay
}
slots := int(delay / tw.interval)
index := (tw.current + slots) % tw.slotNum
tw.slots[index][key] = &timeWheelItem[P]{
key: key,
value: value,
}
}
func (tw *timeWheel[P]) retain(shouldRemove func(key any) bool) {
tw.mu.Lock()
defer tw.mu.Unlock()
for i := 0; i < tw.slotNum; i++ {
for key := range tw.slots[i] {
if shouldRemove(key) {
delete(tw.slots[i], key)
}
}
}
}
func (tw *timeWheel[P]) remove(key any) {
tw.mu.Lock()
defer tw.mu.Unlock()
for i := 0; i < tw.slotNum; i++ {
if _, exists := tw.slots[i][key]; exists {
delete(tw.slots[i], key)
return
}
}
}
func (tw *timeWheel[P]) run() {
for {
select {
case <-tw.ticker.C:
tw.mu.Lock()
currentSlot := tw.slots[tw.current]
// clear current slot
if len(currentSlot) > 0 {
for key, item := range currentSlot {
// process expirations here
delete(currentSlot, key)
tw.handleExpireFunc(key, item.value)
}
}
tw.current = (tw.current + 1) % tw.slotNum
tw.mu.Unlock()
case <-tw.stopped:
return
}
}
}
func (tw *timeWheel[P]) Len() int {
tw.mu.RLock()
defer tw.mu.RUnlock()
length := 0
for i := 0; i < tw.slotNum; i++ {
length += len(tw.slots[i])
}
return length
}
func (tw *timeWheel[P]) stop() {
tw.ticker.Stop()
close(tw.stopped)
}