-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsnailx.go
147 lines (138 loc) · 3.33 KB
/
snailx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package snailx
import (
"errors"
"fmt"
"github.com/nats-io/nuid"
"runtime"
"sync"
)
func New() (x SnailX) {
services := newLocalServiceGroup()
serviceBus := newServiceEventLoopBus(services)
if err := serviceBus.start(); err != nil {
panic(err)
}
x = &standaloneSnailX{
services: services,
snailMap: make(map[string]Snail),
runMutex: new(sync.Mutex),
run: true,
serviceBus:serviceBus,
}
return
}
type SnailX interface {
Stop() (err error)
Deploy(snail Snail) (id string)
DeployWithOptions(snail Snail, options SnailOptions) (id string)
UnDeploy(snailId string)
ServiceBus() (bus ServiceBus)
}
type standaloneSnailX struct {
serviceBus ServiceBus
services ServiceGroup
snailMap map[string]Snail
runMutex *sync.Mutex
run bool
}
func (x *standaloneSnailX) Stop() (err error) {
x.runMutex.Lock()
defer x.runMutex.Unlock()
if x.run == false {
err = errors.New("stop failed, cause it is stopped")
return
}
x.run = false
ids := make([]string, 0, len(x.snailMap))
for id := range x.snailMap {
ids = append(ids, id)
}
wg := new(sync.WaitGroup)
for _, id := range ids {
wg.Add(1)
go func(id string, x *standaloneSnailX, wg *sync.WaitGroup) {
if snail, has := x.snailMap[id]; has {
snail.Stop()
wg.Done()
}
}(id, x, wg)
}
wg.Wait()
for _, id := range ids {
delete(x.snailMap, id)
}
x.services.UnDeployAll()
return x.serviceBus.stop()
}
func (x *standaloneSnailX) Deploy(snail Snail) (id string) {
x.runMutex.Lock()
defer x.runMutex.Unlock()
if x.run == false {
panic("deploy failed, cause it is stopped")
return
}
id = fmt.Sprintf("snail-%d-%s", len(x.snailMap)+1, nuid.Next())
serviceBus := newServiceEventLoopBus(x.services)
if err := serviceBus.start(); err != nil {
panic(err)
}
snail.SetServiceBus(serviceBus)
snail.Start()
x.snailMap[id] = snail
return
}
func (x *standaloneSnailX) DeployWithOptions(snail Snail, options SnailOptions) (id string) {
x.runMutex.Lock()
defer x.runMutex.Unlock()
if x.run == false {
panic("deploy failed, cause it is stopped")
return
}
var serviceBus ServiceBus
id = fmt.Sprintf("snail-%d-%s", len(x.snailMap)+1, nuid.Next())
serviceKind := options.ServiceBusKind
if serviceKind == "" {
serviceKind = EventServiceBus
}
if options.ServiceBusKind == EventServiceBus {
serviceBus = newServiceEventLoopBus(x.services)
} else if options.ServiceBusKind == WorkerServiceBus {
workers := options.WorkersNum
if workers <= 0 {
workers = runtime.NumCPU() * 2
}
serviceBus = newServiceWorkBus(workers, x.services)
} else if options.ServiceBusKind == FlyServiceBus {
flyServiceBusCapacity := options.FlyServiceBusCapacity
if flyServiceBusCapacity <= 0 {
flyServiceBusCapacity = runtime.NumCPU() * 2 * 64
}
serviceBus = newServiceEventFlyBus(flyServiceBusCapacity, x.services)
} else {
panic("snailx: unknown service kind")
return
}
if err := serviceBus.start(); err != nil {
panic(err)
}
snail.SetServiceBus(serviceBus)
snail.Start()
x.snailMap[id] = snail
return
}
func (x *standaloneSnailX) UnDeploy(snailId string) {
x.runMutex.Lock()
defer x.runMutex.Unlock()
if x.run == false {
panic("deploy failed, cause it is stopped")
return
}
if snail, has := x.snailMap[snailId]; has {
snail.Stop()
delete(x.snailMap, snailId)
}
}
func (x *standaloneSnailX) ServiceBus() (bus ServiceBus) {
bus = x.serviceBus
return
}