-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdlock_by_zk.go
141 lines (124 loc) · 3.15 KB
/
dlock_by_zk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package dlock
import (
"strconv"
"strings"
"time"
"github.com/go-zookeeper/zk"
log "github.com/sirupsen/logrus"
)
// DlockByZookeeper 通过zookeeper实现的分布式锁服务
type DlockByZookeeper struct {
conn *zk.Conn
}
// NewDlockByZookeeper 获取DlockByZookeeper实例.
func NewDlockByZookeeper(conn *zk.Conn) *DlockByZookeeper {
return &DlockByZookeeper{
conn: conn,
}
}
// TryLock 尝试获取分布式锁, 超时后就放弃 (不可重入锁).
/*
==> acquire lock
n = create("/dlock/fast-lock/request-", "", ephemeral|sequence)
RETRY:
children = getChildren("/dlock/fast-lock", watch=False)
if n is lowest znode in children:
return
else:
exist("/dlock/fast-lock/request-" % (n - 1), watch=True)
watch_event:
goto RETRY
*/
func (dlz *DlockByZookeeper) TryLock(pid string, timeout int64 /* in secs */) (token string, acquired bool) {
path, err := zkSafeCreateWithDefaultDataFilled(dlz.conn, _DlockFastLockPathPrefix, zk.FlagEphemeral|zk.FlagSequence)
if err != nil {
log.WithField("pid", pid).WithError(err).Error("failed to acquire lock")
acquired = false
return
}
seq := dlz.getSequenceNum(path, _DlockFastLockPathPrefix)
ticker1 := time.NewTicker(time.Duration(timeout) * time.Second)
defer ticker1.Stop()
ticker2 := time.NewTicker(time.Duration(200) * time.Millisecond)
defer ticker2.Stop()
LOOP:
for {
select {
case <-ticker1.C:
{
log.WithField("pid", pid).Warn("timeout to acquire lock")
acquired = false
return
}
default:
{
TRY_AGAIN:
children, _, err := zkSafeGetChildren(dlz.conn, _DlockFastLockPath, false)
if err != nil {
log.WithField("pid", pid).WithError(err).Error("failed to acquire lock")
acquired = false
return
}
minSeq := seq
prevSeq := -1
prevSeqPath := ""
for _, child := range children {
_seq := dlz.getSequenceNum(child, _DlockFastLockPathShortestPrefix)
if _seq < minSeq {
minSeq = _seq
}
if _seq < seq && _seq > prevSeq {
prevSeq = _seq
prevSeqPath = child
}
}
if seq == minSeq {
break LOOP
}
_, _, watcher, err := dlz.conn.ExistsW(_DlockFastLockPath + "/" + prevSeqPath)
if err != nil {
log.WithField("pid", pid).WithError(err).Error("failed to acquire lock")
acquired = false
return
}
ticker2.Reset(time.Duration(200) * time.Millisecond)
for {
select {
case ev, ok := <-watcher:
{
if !ok {
acquired = false
return
}
if ev.Type == zk.EventNodeDeleted {
goto TRY_AGAIN
}
}
case <-ticker2.C:
{
goto TRY_AGAIN
}
}
}
}
}
}
token = path
acquired = true
return
}
// Unlock 释放分布式锁.
/*
==> release lock (voluntarily or session timeout)
delete("/dlock/fast-lock/request-" % n)
*/
func (dlz *DlockByZookeeper) Unlock(pid, token string) {
if err := zkSafeDelete(dlz.conn, token, -1); err != nil {
log.WithField("pid", pid).WithError(err).Error("failed to release lock")
}
}
func (dlz *DlockByZookeeper) getSequenceNum(path, prefix string) int {
numStr := strings.TrimPrefix(path, prefix)
num, _ := strconv.Atoi(numStr)
return num
}