Skip to content

Commit ebbaa03

Browse files
authored
Min heap (#10)
* min heap * 更新 * 更新action * 更新代码 * 更新run函数 * fix * 完成最小堆
1 parent 0901cfc commit ebbaa03

File tree

12 files changed

+461
-4
lines changed

12 files changed

+461
-4
lines changed

.github/workflows/go.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ jobs:
1010
runs-on: ubuntu-latest
1111
strategy:
1212
matrix:
13-
go: [ '1.13', '1.14']
13+
go: ['1.19']
1414
name: Go ${{ matrix.go }} sample
1515

1616
steps:
1717

18-
- name: Set up Go 1.13
18+
- name: Set up Go 1.19
1919
uses: actions/setup-go@v1
2020
with:
2121
go-version: ${{ matrix.go }}

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ _long-time-test/long-time-test
1313

1414
# Output of the go coverage tool, specifically when used with LiteIDE
1515
*.out
16+
cover.cov
1617

1718
# Dependency directories (remove the comment below to include it)
1819
# vendor/

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ timer是高性能定时器库
66
## feature
77
* 支持一次性定时器
88
* 支持周期性定时器
9+
* 支持多种数据结构后端,最小堆,5级时间轮
910

1011
## 一次性定时器
1112
```go
@@ -73,6 +74,17 @@ func main() {
7374
tm.Run()
7475
}
7576
```
77+
## 选择不同的的数据结构
78+
```go
79+
import (
80+
"github.com/antlabs/timer"
81+
"log"
82+
)
83+
84+
func main() {
85+
tm := timer.NewTimer(timer.WithMinHeap())// 选择最小堆
86+
}
87+
```
7688
## benchmark
7789

7890
github.com/antlabs/timer 性能最高

go.mod

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
module github.com/antlabs/timer
22

3-
go 1.13
3+
go 1.19
44

55
require (
66
github.com/antlabs/stl v0.0.1
77
github.com/stretchr/testify v1.6.1
88
gopkg.in/go-playground/assert.v1 v1.2.1
99
)
10+
11+
require (
12+
github.com/davecgh/go-spew v1.1.0 // indirect
13+
github.com/pmezard/go-difflib v1.0.0 // indirect
14+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
15+
)

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
77
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
88
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
99
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
10+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
1011
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1112
gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM=
1213
gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=

min_heap.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package timer
2+
3+
import (
4+
"container/heap"
5+
"context"
6+
"math"
7+
"sync"
8+
"time"
9+
)
10+
11+
var _ Timer = (*minHeap)(nil)
12+
13+
type minHeap struct {
14+
mu sync.Mutex
15+
minHeaps
16+
chAdd chan struct{}
17+
ctx context.Context
18+
cancel context.CancelFunc
19+
wait sync.WaitGroup
20+
}
21+
22+
// 一次性定时器
23+
func (m *minHeap) AfterFunc(expire time.Duration, callback func()) TimeNoder {
24+
return m.addCallback(expire, callback, false)
25+
}
26+
27+
// 加任务
28+
func (m *minHeap) addCallback(expire time.Duration, callback func(), isSchedule bool) TimeNoder {
29+
m.mu.Lock()
30+
defer m.mu.Unlock()
31+
32+
node := minHeapNode{
33+
callback: callback,
34+
userExpire: expire,
35+
absExpire: time.Now().Add(expire),
36+
isSchedule: isSchedule,
37+
root: m,
38+
}
39+
40+
heap.Push(&m.minHeaps, node)
41+
select {
42+
case m.chAdd <- struct{}{}:
43+
default:
44+
}
45+
46+
return &node
47+
48+
}
49+
50+
func (m *minHeap) removeTimeNode(node *minHeapNode) {
51+
m.mu.Lock()
52+
if node.index < 0 || node.index > len(m.minHeaps) || len(m.minHeaps) == 0 {
53+
m.mu.Unlock()
54+
return
55+
}
56+
57+
heap.Remove(&m.minHeaps, node.index)
58+
m.mu.Unlock()
59+
}
60+
61+
// 周期性定时器
62+
func (m *minHeap) ScheduleFunc(expire time.Duration, callback func()) TimeNoder {
63+
return m.addCallback(expire, callback, true)
64+
}
65+
66+
// 运行
67+
// 为了避免空转cpu, 会等待一个chan, 只要AfterFunc或者ScheduleFunc被调用就会往这个chan里面写值
68+
func (m *minHeap) Run() {
69+
70+
tm := time.NewTimer(time.Hour)
71+
for {
72+
select {
73+
case <-tm.C:
74+
for {
75+
m.mu.Lock()
76+
now := time.Now()
77+
if m.minHeaps.Len() == 0 {
78+
m.mu.Unlock()
79+
goto next
80+
}
81+
82+
first := &m.minHeaps[0]
83+
84+
if now.After(first.absExpire) {
85+
callback := first.callback
86+
if first.isSchedule {
87+
first.absExpire = now.Add(first.userExpire)
88+
heap.Fix(&m.minHeaps, first.index)
89+
} else {
90+
m.minHeaps.Pop()
91+
}
92+
go callback()
93+
}
94+
95+
if m.minHeaps.Len() == 0 {
96+
m.mu.Unlock()
97+
goto next
98+
}
99+
first = &m.minHeaps[0]
100+
if time.Now().Before(first.absExpire) {
101+
to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire))))
102+
tm.Reset(to)
103+
m.mu.Unlock()
104+
goto next
105+
}
106+
m.mu.Unlock()
107+
}
108+
case <-m.chAdd:
109+
m.mu.Lock()
110+
// 极端情况,加完任务立即给删除了, 判断下当前堆中是否有元素
111+
if m.minHeaps.Len() > 0 {
112+
tm.Reset(time.Since(m.minHeaps[0].absExpire))
113+
}
114+
m.mu.Unlock()
115+
// 进入事件循环,如果为空就会从事件循环里面退出
116+
case <-m.ctx.Done():
117+
// 等待所有任务结束
118+
m.wait.Wait()
119+
return
120+
}
121+
next:
122+
}
123+
}
124+
125+
// 停止所有定时器
126+
func (m *minHeap) Stop() {
127+
m.cancel()
128+
}
129+
130+
func newMinHeap() (mh *minHeap) {
131+
mh = &minHeap{}
132+
heap.Init(&mh.minHeaps)
133+
mh.chAdd = make(chan struct{}, 1)
134+
mh.ctx, mh.cancel = context.WithCancel(context.TODO())
135+
return
136+
}

min_heap_node.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package timer
2+
3+
import (
4+
"time"
5+
)
6+
7+
type minHeapNode struct {
8+
callback func() //用户的callback
9+
absExpire time.Time //绝对时间
10+
userExpire time.Duration //过期时间段
11+
isSchedule bool //是否是周期性任务
12+
index int //在min heap中的索引,方便删除或者重新推入堆中
13+
root *minHeap
14+
}
15+
16+
func (m *minHeapNode) Stop() {
17+
m.root.removeTimeNode(m)
18+
}
19+
20+
type minHeaps []minHeapNode
21+
22+
func (m minHeaps) Len() int { return len(m) }
23+
func (m minHeaps) Less(i, j int) bool { return m[i].absExpire.Before(m[j].absExpire) }
24+
func (m minHeaps) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
25+
26+
func (m *minHeaps) Push(x any) {
27+
// Push and Pop use pointer receivers because they modify the slice's length,
28+
// not just its contents.
29+
*m = append(*m, x.(minHeapNode))
30+
lastIndex := len(*m) - 1
31+
(*m)[lastIndex].index = lastIndex
32+
}
33+
34+
func (m *minHeaps) Pop() any {
35+
old := *m
36+
n := len(old)
37+
x := old[n-1]
38+
*m = old[0 : n-1]
39+
return x
40+
}

0 commit comments

Comments
 (0)