Skip to content

Commit 958234d

Browse files
committed
修改可能的数据竞争的代码,使用version管理node的迁移
如果node的version和表头的version不一样,就使用惰性删除
1 parent 7f7f5ed commit 958234d

File tree

4 files changed

+82
-29
lines changed

4 files changed

+82
-29
lines changed

time_wheel.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ func newTimeWheel() *timeWheel {
5959
func (t *timeWheel) init() {
6060

6161
for i := 0; i < nearSize; i++ {
62-
t.t1[i] = newTimeHead()
62+
t.t1[i] = newTimeHead(1, uint64(i))
6363

6464
}
6565

6666
for i := 0; i < 4; i++ {
6767
for j := 0; j < levelSize; j++ {
68-
t.t2Tot5[i][j] = newTimeHead()
68+
t.t2Tot5[i][j] = newTimeHead(uint64(i+2), uint64(j))
6969
}
7070
}
7171

@@ -90,9 +90,11 @@ func (t *timeWheel) add(node *timeNode, jiffies uint64) *timeNode {
9090
expire := node.expire
9191
idx := expire - jiffies
9292

93+
level, index := uint64(1), uint64(0)
94+
9395
if idx < nearSize {
9496

95-
index := uint64(expire) & nearMask
97+
index = uint64(expire) & nearMask
9698
head = t.t1[index]
9799

98100
} else {
@@ -106,8 +108,9 @@ func (t *timeWheel) add(node *timeNode, jiffies uint64) *timeNode {
106108
}
107109

108110
if uint64(idx) < levelMax(i+1) {
109-
index := int64(expire) >> (nearShift + i*levelShift) & levelMask
111+
index = uint64(expire >> (nearShift + i*levelShift) & levelMask)
110112
head = t.t2Tot5[i][index]
113+
level = uint64(i) + 2
111114
break
112115
}
113116
}
@@ -118,7 +121,7 @@ func (t *timeWheel) add(node *timeNode, jiffies uint64) *timeNode {
118121
panic("not found head")
119122
}
120123

121-
head.lockPushBack(node)
124+
head.lockPushBack(node, level, index)
122125

123126
return node
124127
}
@@ -164,7 +167,7 @@ func (t *timeWheel) Stop() {
164167
// 移动链表
165168
func (t *timeWheel) cascade(levelIndex int, index int) {
166169

167-
tmp := newTimeHead()
170+
tmp := newTimeHead(0, 0)
168171

169172
l := t.t2Tot5[levelIndex][index]
170173
l.Lock()
@@ -175,6 +178,8 @@ func (t *timeWheel) cascade(levelIndex int, index int) {
175178

176179
l.ReplaceInit(&tmp.Head)
177180

181+
// 每次链表的元素被移动走,都修改version
182+
atomic.AddUint64(&l.version, 1)
178183
l.Unlock()
179184

180185
offset := unsafe.Offsetof(tmp.Head)
@@ -218,10 +223,10 @@ func (t *timeWheel) moveAndExec() {
218223
return
219224
}
220225

221-
head := newTimeHead()
226+
head := newTimeHead(0, 0)
222227
t1 := t.t1[index]
223228
t1.ReplaceInit(&head.Head)
224-
229+
atomic.AddUint64(&t1.version, 1)
225230
t.t1[index].Unlock()
226231

227232
// 执行
@@ -231,6 +236,11 @@ func (t *timeWheel) moveAndExec() {
231236
head.ForEachSafe(func(pos *list.Head) {
232237
val := (*timeNode)(pos.Entry(offset))
233238
head.Del(pos)
239+
240+
if atomic.LoadUint32(&val.stop) == haveStop {
241+
return
242+
}
243+
234244
go val.callback()
235245

236246
if val.isSchedule {

time_wheel_node.go

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,64 +4,86 @@ import (
44
"sync"
55
"sync/atomic"
66
"time"
7+
"unsafe"
78

89
"github.com/antlabs/stl/list"
910
)
1011

1112
const (
1213
haveStop = uint32(1)
13-
//stopGrab = 1 << (iota + 1)
14-
//pushGrab
1514
)
1615

1716
// 先使用sync.Mutex实现功能
1817
// 后面使用cas优化
1918
type Time struct {
2019
timeNode
2120
sync.Mutex
21+
22+
// |---16bit---|---16bit---|------32bit-----|
23+
// |---level---|---index---|-------seq------|
24+
// level 在near盘子里就是1, 在T2ToTt[0]盘子里就是2起步
25+
// index 就是各自盘子的索引值
26+
// seq 自增id
27+
version uint64
2228
}
2329

24-
func newTimeHead() *Time {
30+
func newTimeHead(level uint64, index uint64) *Time {
2531
head := &Time{}
32+
head.version = genVersionHeight(level, index)
2633
head.Init()
2734
return head
2835
}
2936

30-
func (t *Time) lockPushBack(node *timeNode) {
37+
func genVersionHeight(level uint64, index uint64) uint64 {
38+
return level<<(32+16) | index<<32
39+
}
40+
41+
func (t *Time) lockPushBack(node *timeNode, level uint64, index uint64) {
3142
t.Lock()
3243
defer t.Unlock()
33-
if atomic.LoadUint32(&node.lock) == haveStop {
44+
if atomic.LoadUint32(&node.stop) == haveStop {
3445
return
3546
}
3647

3748
t.AddTail(&node.Head)
38-
node.list = t
49+
atomic.StorePointer(&node.list, unsafe.Pointer(t))
50+
//更新节点的version信息
51+
atomic.StoreUint64(&node.version, atomic.LoadUint64(&t.version))
3952
}
4053

4154
type timeNode struct {
4255
expire uint64
4356
userExpire time.Duration
4457
callback func()
58+
stop uint32
59+
list unsafe.Pointer //存放表头信息
60+
version uint64 //保存节点版本信息
4561
isSchedule bool
46-
close uint32
47-
lock uint32
48-
49-
list *Time
5062

5163
list.Head
5264
}
5365

66+
// 一个timeNode节点有4个状态
67+
// 1.存在于初始化链表中
68+
// 2.被移动到tmp链表
69+
// 3.1 和 3.2是if else的状态
70+
// 3.1被移动到new链表
71+
// 3.2直接执行
72+
// 1和3.1状态是没有问题的
73+
// 2和3.2状态会是没有锁保护下的操作,会有数据竞争
5474
func (t *timeNode) Stop() {
55-
//这里为什么修改成cpyList := t.list
56-
//如果直接使用t.list.Lock()和t.list.Unlock()就会和lockPushBack函数里的node.list = t 是竞争关系
57-
//lockPushBack拿到锁,修改国t.list的值。这时候Stop函数里面的t.list.Lock()持有旧链表里的锁。t.list.unlock新链表里的锁,发触发unlock unlock情况。
58-
59-
//TODO:思考有没有新的竞争关系。。。
60-
cpyList := t.list
75+
76+
atomic.StoreUint32(&t.stop, haveStop)
77+
78+
// 使用版本号算法让timeNode知道自己是否被移动了
79+
// timeNode的version和表头的version一样表示没有被移动可以直接删除
80+
// 如果不一样,可能在第2或者3.2状态,使用惰性删除
81+
cpyList := (*Time)(atomic.LoadPointer(&t.list))
6182
cpyList.Lock()
6283
defer cpyList.Unlock()
84+
if atomic.LoadUint64(&t.version) != atomic.LoadUint64(&cpyList.version) {
85+
return
86+
}
6387

64-
atomic.StoreUint32(&t.close, haveStop)
65-
66-
t.list.Del(&t.Head)
88+
cpyList.Del(&t.Head)
6789
}

time_wheel_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ func Test_LevelMax(t *testing.T) {
1919
assert.Equal(t, levelMax(4), uint64(1<<(nearShift+4*levelShift)))
2020
}
2121

22+
func Test_GenVersion(t *testing.T) {
23+
assert.Equal(t, genVersionHeight(1, 0xf), uint64(0x0001000f00000000))
24+
assert.Equal(t, genVersionHeight(1, 64), uint64(0x0001004000000000))
25+
}
26+
2227
func Test_hour(t *testing.T) {
2328
tw := newTimeWheel()
2429

timer_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func Test_ScheduleFunc(t *testing.T) {
2828
}()
2929

3030
tm.Run()
31-
assert.Equal(t, count, uint32(5))
31+
assert.Equal(t, atomic.LoadUint32(&count), uint32(5))
3232
}
3333

3434
func Test_AfterFunc(t *testing.T) {
@@ -64,7 +64,23 @@ func Test_AfterFunc(t *testing.T) {
6464
tm.Stop()
6565
}()
6666
tm.Run()
67-
assert.Equal(t, count, uint32(2))
67+
assert.Equal(t, atomic.LoadUint32(&count), uint32(2))
68+
}
69+
70+
func Test_Node_Stop_1(t *testing.T) {
71+
tm := NewTimer()
72+
count := uint32(0)
73+
node := tm.AfterFunc(time.Millisecond*10, func() {
74+
atomic.AddUint32(&count, 1)
75+
})
76+
go func() {
77+
time.Sleep(time.Millisecond * 30)
78+
node.Stop()
79+
tm.Stop()
80+
}()
81+
82+
tm.Run()
83+
assert.NotEqual(t, count, 1)
6884
}
6985

7086
func Test_Node_Stop(t *testing.T) {

0 commit comments

Comments
 (0)