-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode_manager.go
118 lines (99 loc) · 2.32 KB
/
node_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package easycall
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/starjiang/elog"
)
const (
ZK_NOT_EXSIT_NODE_CACHE_TIME = 5000
)
type NodeManager struct {
exsit int64
mutex *sync.Mutex
cli *clientv3.Client
serviceName string
nodeList []*Node
timeout time.Duration
watched bool
}
type Node struct {
Ip string `json:"ip"`
Port int `json:"port"`
Weight int `json:"weight"`
Active int32
}
func NewNodeManager(endpoints []string, serviceName string, timeout time.Duration) (*NodeManager, error) {
nodeManager := &NodeManager{}
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: timeout,
})
if err != nil {
return nil, err
}
nodeManager.cli = cli
nodeManager.serviceName = serviceName
nodeManager.mutex = &sync.Mutex{}
nodeManager.timeout = timeout
return nodeManager, nil
}
func (nm *NodeManager) getNodes() ([]*Node, error) {
path := EASYCALL_ETCD_SERVICE_PATH + "/" + nm.serviceName + "/nodes"
nm.mutex.Lock()
defer nm.mutex.Unlock()
if nm.nodeList == nil {
timeNow := GetTimeNow()
if nm.exsit+ZK_NOT_EXSIT_NODE_CACHE_TIME > timeNow {
return nil, errors.New("zk node not exsit")
}
err := nm.loadServiceNode(path)
if nm.nodeList == nil {
nm.exsit = GetTimeNow()
return nil, err
}
return nm.nodeList, nil
}
if len(nm.nodeList) == 0 {
return nil, errors.New("service " + nm.serviceName + " not found")
}
return nm.nodeList, nil
}
func (nm *NodeManager) loadServiceNode(path string) error {
ctx, cancel := context.WithTimeout(context.Background(), nm.timeout)
resp, err := nm.cli.Get(ctx, path, clientv3.WithPrefix())
cancel()
if err != nil {
return err
}
if nm.watched == false {
nm.watched = true
//children changed reload
go func() {
for {
//watch
rch := nm.cli.Watch(context.Background(), path, clientv3.WithPrefix())
for _ = range rch {
elog.Info(path, "node reload")
nm.loadServiceNode(path)
}
elog.Error(path, "watch failed")
}
}()
}
nodeList := make([]*Node, 0)
for _, ev := range resp.Kvs {
node := &Node{}
err = json.Unmarshal(ev.Value, node)
if err != nil {
elog.Error("decode child data fail:", err)
continue
}
nodeList = append(nodeList, node)
}
nm.nodeList = nodeList
return nil
}