-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathServer.go
111 lines (95 loc) · 2.66 KB
/
Server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package workercluster
import (
"context"
"time"
"github.com/wangyuche/workercluster/core/k8s"
"github.com/wangyuche/workercluster/core/leader"
"github.com/wangyuche/workercluster/core/worker"
wk "github.com/wangyuche/workercluster/core/worker/protos"
clientset "k8s.io/client-go/kubernetes"
)
type WorkerCluster struct {
k8s *clientset.Clientset
leader *leader.LeaderElection
leadercancel context.CancelFunc
bemastercb chan bool
masterchangecb chan string
wkserver *worker.Server
stream chan wk.WorkerGRPC_StreamWorkerServer
cb iWorkerCallBack
worker *worker.Worker
reconnect chan bool
m string
}
type iWorkerCallBack interface {
BeMaster()
WorkerIdleEvent(*worker.WorkerInfo)
MasterRecv([]byte)
WorkerRecv([]byte)
}
func New(path string) *WorkerCluster {
instance := &WorkerCluster{}
instance.connectk8s(path)
return instance
}
func (this *WorkerCluster) connectk8s(path string) {
this.k8s = k8s.ConnectKubernetes(path)
}
func (this *WorkerCluster) Run(name string, namespace string, id string, port string, cb iWorkerCallBack) {
ctx, cancel := context.WithCancel(context.Background())
this.wkserver = worker.NewServer(port, cb)
this.leadercancel = cancel
this.bemastercb = make(chan bool)
this.masterchangecb = make(chan string)
this.reconnect = make(chan bool)
this.cb = cb
go this.leaderElectionEvent()
l := leader.New(this.k8s, ctx, name, namespace, id, &LeaderCallBack{bemastercb: this.bemastercb, masterchangecb: this.masterchangecb})
this.leader = l
go l.LeaderElection()
}
func (this *WorkerCluster) leaderElectionEvent() {
for {
select {
case <-this.bemastercb:
this.cb.BeMaster()
break
case m := <-this.masterchangecb:
this.m = m
this.worker = worker.NewWorker(this.m, this.cb, this.reconnect)
this.worker.Run()
break
case <-this.reconnect:
time.Sleep(5 * time.Second)
this.worker = worker.NewWorker(this.m, this.cb, this.reconnect)
this.worker.Run()
break
default:
}
}
}
func (this *WorkerCluster) LeaderCancel() {
this.LeaderCancel()
}
func (this *WorkerCluster) GetWorkers() map[*worker.WorkerInfo]wk.WorkerStatus {
return this.wkserver.GetWorkers()
}
func (this *WorkerCluster) ChangeStatusIdle() {
this.worker.ChangeStatusIdle()
}
func (this *WorkerCluster) ChangeStatusBusy() {
this.worker.ChangeStatusBusy()
}
func (this *WorkerCluster) GetWorker() *worker.Worker {
return this.worker
}
type LeaderCallBack struct {
bemastercb chan bool
masterchangecb chan string
}
func (this *LeaderCallBack) BeMaster() {
this.bemastercb <- true
}
func (this *LeaderCallBack) MasterChange(master string) {
this.masterchangecb <- master
}