-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbell.go
201 lines (167 loc) · 4.91 KB
/
bell.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// Package bell implements a simple event system (bell ringing and listening)
//
// Several listeners can be added for each ringing (handlerFunc).
// Listeners are called in a separate goroutine through an established channel.
// When the bell rings, a message is sequentially transmitted to each listener.
//
// If a channel is closed, the goroutine for that event is terminated.
//
// Example for usage:
// Listen("event_name", func(message Message) { fmt.PrintLn(message) }) - add listener on bell by name "event_name"
// Ring("event_name", "some_data") - Ring on bell (call event "event_name")
package bell
import (
"fmt"
"sync"
)
// Message The message that is passed to the event handler
type Message interface{}
// Events thread safe structure stores events, their handlers and functions for management
type Events struct {
mutex sync.RWMutex
wg sync.WaitGroup
channels map[string][]chan Message
queueSize uint
}
// New constructor for Events
func New() *Events {
return &Events{channels: map[string][]chan Message{}}
}
// Queue set events queue size
func (e *Events) Queue(size uint) *Events {
e.mutex.Lock()
defer e.mutex.Unlock()
e.queueSize = size
return e
}
// ListenN Subscribe on event where
// event - the event name,
// handlerFunc - handler function
// copiesCount - count handlers copies run
func (e *Events) ListenN(event string, handlerFunc func(message Message), copiesCount uint) {
e.mutex.Lock()
defer e.mutex.Unlock()
channel := make(chan Message, e.queueSize)
for i := uint(0); i < copiesCount; i++ {
go func(c chan Message, wg *sync.WaitGroup) {
for {
message, ok := <-c
if !ok {
break
}
handlerFunc(message)
wg.Done()
}
}(channel, &e.wg)
}
e.channels[event] = append(e.channels[event], channel)
}
// Listen Subscribe on event where
// event - the event name,
// handlerFunc - handler function
func (e *Events) Listen(event string, handlerFunc func(message Message)) {
e.ListenN(event, handlerFunc, 1)
}
// Ring Call event there
// event - event name
// message - data that will be passed to the event handler
func (e *Events) Ring(event string, message Message) error {
e.mutex.RLock()
defer e.mutex.RUnlock()
if _, ok := e.channels[event]; !ok {
return fmt.Errorf("channel %s not found", event)
}
for _, c := range e.channels[event] {
e.wg.Add(1)
c <- message
}
return nil
}
// Has Checks if there are listeners for the passed event
func (e *Events) Has(event string) bool {
e.mutex.RLock()
defer e.mutex.RUnlock()
_, ok := e.channels[event]
return ok
}
// List Returns a list of events that listeners are subscribed to
func (e *Events) List() []string {
e.mutex.RLock()
defer e.mutex.RUnlock()
list := make([]string, 0, len(e.channels))
for event := range e.channels {
list = append(list, event)
}
return list
}
// Remove Removes listeners by event name
// Removing listeners closes channels and stops the goroutine.
//
// If you call the function without the "names" parameter, all listeners of all events will be removed.
func (e *Events) Remove(names ...string) {
e.mutex.Lock()
defer e.mutex.Unlock()
if len(names) == 0 {
keys := make([]string, 0, len(e.channels))
for k := range e.channels {
keys = append(keys, k)
}
names = keys
}
for _, name := range names {
for _, channel := range e.channels[name] {
close(channel)
}
delete(e.channels, name)
}
}
// Wait Blocks the thread until all running events are completed
func (e *Events) Wait() {
e.mutex.Lock()
defer e.mutex.Unlock()
e.wg.Wait()
}
// globalState store of global event handlers
var globalState = New()
// ListenN Subscribe on event where
// event - the event name,
// handlerFunc - handler function
// copiesCount - count handlers copies run
func ListenN(event string, handlerFunc func(message Message), copiesCount uint) {
globalState.ListenN(event, handlerFunc, copiesCount)
}
// Listen Subscribe on event where
// event - the event name,
// handlerFunc - handler function
func Listen(event string, handlerFunc func(message Message)) {
globalState.Listen(event, handlerFunc)
}
// Ring Call event there
// event - event name
// message - data that will be passed to the event handler
func Ring(event string, message Message) error {
return globalState.Ring(event, message)
}
// Has Checks if there are listeners for the passed event
func Has(event string) bool {
return globalState.Has(event)
}
// List Returns a list of events that listeners are subscribed to
func List() []string {
return globalState.List()
}
// Remove Removes listeners by event name
// Removing listeners closes channels and stops the goroutine.
//
// If you call the function without the "names" parameter, all listeners of all events will be removed.
func Remove(names ...string) {
globalState.Remove(names...)
}
// Wait Blocks the thread until all running events are completed
func Wait() {
globalState.Wait()
}
// Queue set events queue size
func Queue(size uint) {
globalState.Queue(size)
}