Skip to content

Commit 78979b7

Browse files
authored
Merge pull request #22 from antlabs/add-reset
Add reset
2 parents 8c8bf64 + 842da42 commit 78979b7

13 files changed

+396
-115
lines changed

min_heap.go

Lines changed: 111 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
1+
// Copyright 2020-2024 guonaihong, antlabs. All rights reserved.
2+
//
3+
// mit license
14
package timer
25

36
import (
47
"container/heap"
58
"context"
6-
"math"
79
"sync"
810
"sync/atomic"
911
"time"
1012
)
1113

1214
var _ Timer = (*minHeap)(nil)
1315

16+
var defaultTimeout = time.Hour
17+
1418
type minHeap struct {
1519
mu sync.Mutex
1620
minHeaps
1721
chAdd chan struct{}
1822
ctx context.Context
1923
cancel context.CancelFunc
2024
wait sync.WaitGroup
21-
runCount uint32 // 测试时使用
25+
tm *time.Timer
26+
runCount int32 // 单元测试时使用
2227
}
2328

2429
// 一次性定时器
@@ -44,9 +49,6 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS
4449
default:
4550
}
4651

47-
m.mu.Lock()
48-
defer m.mu.Unlock()
49-
5052
node := minHeapNode{
5153
callback: callback,
5254
userExpire: expire,
@@ -60,7 +62,11 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS
6062
node.absExpire = n.Next(time.Now())
6163
}
6264

65+
m.mu.Lock()
6366
heap.Push(&m.minHeaps, &node)
67+
m.wait.Add(1)
68+
m.mu.Unlock()
69+
6470
select {
6571
case m.chAdd <- struct{}{}:
6672
default:
@@ -71,71 +77,120 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS
7177

7278
func (m *minHeap) removeTimeNode(node *minHeapNode) {
7379
m.mu.Lock()
74-
if node.index < 0 || node.index > len(m.minHeaps) || len(m.minHeaps) == 0 {
80+
if node.index < 0 || node.index > int32(len(m.minHeaps)) || int32(len(m.minHeaps)) == 0 {
7581
m.mu.Unlock()
7682
return
7783
}
7884

79-
heap.Remove(&m.minHeaps, node.index)
85+
heap.Remove(&m.minHeaps, int(node.index))
86+
m.wait.Done()
87+
m.mu.Unlock()
88+
}
89+
90+
func (m *minHeap) resetTimeNode(node *minHeapNode, d time.Duration) {
91+
m.mu.Lock()
92+
node.userExpire = d
93+
node.absExpire = time.Now().Add(d)
94+
heap.Fix(&m.minHeaps, int(node.index))
95+
select {
96+
case m.chAdd <- struct{}{}:
97+
default:
98+
}
8099
m.mu.Unlock()
81100
}
82101

102+
func (m *minHeap) getNewSleepTime() time.Duration {
103+
if m.minHeaps.Len() == 0 {
104+
return time.Hour
105+
}
106+
107+
timeout := time.Since(m.minHeaps[0].absExpire)
108+
if timeout < 0 {
109+
timeout = 0
110+
}
111+
return timeout
112+
}
113+
114+
func (m *minHeap) process() {
115+
for {
116+
m.mu.Lock()
117+
now := time.Now()
118+
// 如果堆中没有元素,就等待
119+
// 这时候设置一个相对长的时间,避免空转cpu
120+
if m.minHeaps.Len() == 0 {
121+
m.tm.Reset(time.Hour)
122+
m.mu.Unlock()
123+
return
124+
}
125+
126+
for {
127+
// 取出最小堆的第一个元素
128+
first := m.minHeaps[0]
129+
130+
// 时间未到直接过滤掉
131+
// 只是跳过最近的循环
132+
if !now.After(first.absExpire) {
133+
break
134+
}
135+
136+
// 取出待执行的callback
137+
callback := first.callback
138+
// 如果是周期性任务
139+
if first.isSchedule {
140+
// 计算下次触发的绝对时间点
141+
first.absExpire = first.Next(now)
142+
// 修改下在堆中的位置
143+
heap.Fix(&m.minHeaps, int(first.index))
144+
} else {
145+
// 从堆中删除
146+
heap.Pop(&m.minHeaps)
147+
m.wait.Done()
148+
}
149+
150+
// 正在运行的任务数加1
151+
atomic.AddInt32(&m.runCount, 1)
152+
go func() {
153+
callback()
154+
// 对正在运行的任务数减1
155+
atomic.AddInt32(&m.runCount, -1)
156+
}()
157+
158+
// 如果堆中没有元素,就等待
159+
if m.minHeaps.Len() == 0 {
160+
m.tm.Reset(defaultTimeout)
161+
m.mu.Unlock()
162+
return
163+
}
164+
}
165+
166+
// 取出第一个元素
167+
first := m.minHeaps[0]
168+
// 如果第一个元素的时间还没到,就计算下次触发的时间
169+
if time.Now().Before(first.absExpire) {
170+
to := m.getNewSleepTime()
171+
m.tm.Reset(to)
172+
// fmt.Printf("### now=%v, to = %v, m.minHeaps[0].absExpire = %v\n", time.Now(), to, m.minHeaps[0].absExpire)
173+
m.mu.Unlock()
174+
return
175+
}
176+
m.mu.Unlock()
177+
}
178+
}
179+
83180
// 运行
84181
// 为了避免空转cpu, 会等待一个chan, 只要AfterFunc或者ScheduleFunc被调用就会往这个chan里面写值
85182
func (m *minHeap) Run() {
86-
timeout := time.Hour
87-
tm := time.NewTimer(timeout)
183+
m.tm = time.NewTimer(time.Hour)
184+
m.process()
88185
for {
89186
select {
90-
case <-tm.C:
91-
for {
92-
m.mu.Lock()
93-
now := time.Now()
94-
if m.minHeaps.Len() == 0 {
95-
tm.Reset(timeout)
96-
m.mu.Unlock()
97-
goto next
98-
}
99-
100-
for {
101-
first := m.minHeaps[0]
102-
103-
// 时间未到直接过滤掉
104-
if !now.After(first.absExpire) {
105-
break
106-
}
107-
108-
callback := first.callback
109-
if first.isSchedule {
110-
first.absExpire = first.Next(now)
111-
heap.Fix(&m.minHeaps, first.index)
112-
} else {
113-
heap.Pop(&m.minHeaps)
114-
}
115-
atomic.AddUint32(&m.runCount, 1)
116-
go callback()
117-
118-
if m.minHeaps.Len() == 0 {
119-
tm.Reset(timeout)
120-
m.mu.Unlock()
121-
goto next
122-
}
123-
}
124-
125-
first := m.minHeaps[0]
126-
if time.Now().Before(first.absExpire) {
127-
to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire))))
128-
tm.Reset(to)
129-
m.mu.Unlock()
130-
goto next
131-
}
132-
m.mu.Unlock()
133-
}
187+
case <-m.tm.C:
188+
m.process()
134189
case <-m.chAdd:
135190
m.mu.Lock()
136191
// 极端情况,加完任务立即给删除了, 判断下当前堆中是否有元素
137192
if m.minHeaps.Len() > 0 {
138-
tm.Reset(m.minHeaps[0].absExpire.Sub(time.Now()))
193+
m.tm.Reset(m.getNewSleepTime())
139194
}
140195
m.mu.Unlock()
141196
// 进入事件循环,如果为空就会从事件循环里面退出
@@ -144,7 +199,7 @@ func (m *minHeap) Run() {
144199
m.wait.Wait()
145200
return
146201
}
147-
next:
202+
148203
}
149204
}
150205

@@ -156,7 +211,7 @@ func (m *minHeap) Stop() {
156211
func newMinHeap() (mh *minHeap) {
157212
mh = &minHeap{}
158213
heap.Init(&mh.minHeaps)
159-
mh.chAdd = make(chan struct{}, 1)
214+
mh.chAdd = make(chan struct{}, 1024)
160215
mh.ctx, mh.cancel = context.WithCancel(context.TODO())
161216
return
162217
}

min_heap_node.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// Copyright 2020-2024 guonaihong, antlabs. All rights reserved.
2+
//
3+
// mit license
14
package timer
25

36
import (
@@ -8,19 +11,22 @@ type minHeapNode struct {
811
callback func() // 用户的callback
912
absExpire time.Time // 绝对时间
1013
userExpire time.Duration // 过期时间段
11-
next Next // 自定义下个触发的时间点
14+
root *minHeap // 指向最小堆
15+
next Next // 自定义下个触发的时间点, cronex项目用到了
16+
index int32 // 在min heap中的索引,方便删除或者重新推入堆中
1217
isSchedule bool // 是否是周期性任务
13-
index int // 在min heap中的索引,方便删除或者重新推入堆中
14-
root *minHeap
1518
}
1619

1720
func (m *minHeapNode) Stop() {
1821
m.root.removeTimeNode(m)
1922
}
23+
func (m *minHeapNode) Reset(d time.Duration) {
24+
m.root.resetTimeNode(m, d)
25+
}
2026

2127
func (m *minHeapNode) Next(now time.Time) time.Time {
2228
if m.next != nil {
23-
return m.next.Next(now)
29+
return (m.next).Next(now)
2430
}
2531
return now.Add(m.userExpire)
2632
}
@@ -33,15 +39,15 @@ func (m minHeaps) Less(i, j int) bool { return m[i].absExpire.Before(m[j].absExp
3339

3440
func (m minHeaps) Swap(i, j int) {
3541
m[i], m[j] = m[j], m[i]
36-
m[i].index = i
37-
m[j].index = j
42+
m[i].index = int32(i)
43+
m[j].index = int32(j)
3844
}
3945

4046
func (m *minHeaps) Push(x any) {
4147
// Push and Pop use pointer receivers because they modify the slice's length,
4248
// not just its contents.
4349
*m = append(*m, x.(*minHeapNode))
44-
lastIndex := len(*m) - 1
50+
lastIndex := int32(len(*m) - 1)
4551
(*m)[lastIndex].index = lastIndex
4652
}
4753

min_heap_node_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// Copyright 2020-2024 guonaihong, antlabs. All rights reserved.
2+
//
3+
// mit license
4+
15
package timer
26

37
import (
@@ -6,6 +10,11 @@ import (
610
"time"
711
)
812

13+
func Test_NodeSizeof(t *testing.T) {
14+
t.Run("输出最小堆node的sizeof", func(t *testing.T) {
15+
// t.Logf("minHeapNode size: %d, %d\n", unsafe.Sizeof(minHeapNode{}), unsafe.Sizeof(time.Timer{}))
16+
})
17+
}
918
func Test_MinHeap(t *testing.T) {
1019
t.Run("", func(t *testing.T) {
1120
var mh minHeaps

0 commit comments

Comments
 (0)