-
Notifications
You must be signed in to change notification settings - Fork 0
/
memory_test.go
328 lines (274 loc) · 8.81 KB
/
memory_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package emitter
import (
"errors"
"sync"
"testing"
"time"
)
// TestNewMemoryEmitter tests the creation of a new MemoryEmitter.
func TestNewMemoryEmitter(t *testing.T) {
emitter := NewMemoryEmitter()
if emitter == nil {
t.Fatal("NewMemoryEmitter() should not return nil")
}
}
// TestOnOff tests subscribing to and unsubscribing from a topic.
func TestOnOff(t *testing.T) {
emitter := NewMemoryEmitter()
listener := func(e Event) error {
return nil
}
// On to a topic.
id, err := emitter.On("testTopic", listener)
if err != nil {
t.Fatalf("On() failed with error: %v", err)
}
if id == "" {
t.Fatal("Onrned an empty ID")
}
// Now unsubscribe and ensure the listener is removed.
if err := emitter.Off("testTopic", id); err != nil {
t.Fatalf("Off() failed with error: %v", err)
}
}
// TestEmitAsyncSuccess tests the asynchronous Emit method for successful event handling.
func TestEmitAsyncSuccess(t *testing.T) {
emitter := NewMemoryEmitter()
// Create a listener that does not return an error.
listener := func(e Event) error {
// Simulate some work.
time.Sleep(10 * time.Millisecond)
return nil
}
// Subscribe the listener to the "testTopic".
_, err := emitter.On("testTopic", listener)
if err != nil {
t.Fatalf("On() failed with error: %v", err)
}
// Emit the event asynchronously.
errChan := emitter.Emit("testTopic", "testPayload")
// Collect errors from the error channel.
var emitErrors []error
for err := range errChan {
if err != nil {
emitErrors = append(emitErrors, err)
}
}
// Check that there were no errors during emission.
if len(emitErrors) != 0 {
t.Errorf("Emit() resulted in errors: %v", emitErrors)
}
}
// TestEmitAsyncFailure tests the asynchronous Emit method for event handling that returns an error.
func TestEmitAsyncFailure(t *testing.T) {
emitter := NewMemoryEmitter()
// Create a listener that returns an error.
listener := func(e Event) error {
// Simulate some work.
time.Sleep(10 * time.Millisecond)
return errors.New("listener error")
}
// Subscribe the listener to the "testTopic".
_, err := emitter.On("testTopic", listener)
if err != nil {
t.Fatalf("On() failed with error: %v", err)
}
// Emit the event asynchronously.
errChan := emitter.Emit("testTopic", "testPayload")
// Collect errors from the error channel.
var emitErrors []error
for err := range errChan {
if err != nil {
emitErrors = append(emitErrors, err)
}
}
// Check that the errors slice is not empty, indicating that an error was returned by the listener.
if len(emitErrors) == 0 {
t.Error("Emit() should have resulted in errors, but none were returned")
}
}
// TestEmitSyncSuccess tests emitting to a topic.
func TestEmitSyncSuccess(t *testing.T) {
emitter := NewMemoryEmitter()
received := make(chan string, 1) // Buffered channel to receive one message.
// Prepare the listener.
listener := createTestListener(received)
// On to the topic.
_, err := emitter.On("testTopic", listener)
if err != nil {
t.Fatalf("On() failed with error: %v", err)
}
emitter.Emit("testTopic", "testPayload") // Emit the event.
// Wait for the listener to handle the event or timeout after a specific duration.
select {
case topic := <-received:
if topic != "testTopic" {
t.Fatalf("Expected to receive event on 'testTopic', got '%v'", topic)
}
case <-time.After(5 * time.Second):
t.Fatal("Test timed out waiting for the event to be received")
}
}
// TestEmitSyncFailure tests the synchronous EmitSync method for event handling that returns an error.
func TestEmitSyncFailure(t *testing.T) {
emitter := NewMemoryEmitter()
// Create a listener that returns an error.
listener := func(e Event) error {
return errors.New("listener error")
}
// Subscribe the listener to the "testTopic".
_, err := emitter.On("testTopic", listener)
if err != nil {
t.Fatalf("On() failed with error: %v", err)
}
// Emit the event synchronously and collect errors.
errors := emitter.EmitSync("testTopic", "testPayload")
// Check that the errors slice is not empty, indicating that an error was returned by the listener.
if len(errors) == 0 {
t.Error("EmitSync() should have resulted in errors, but none were returned")
}
}
// TestGetTopic tests getting a topic.
func TestGetTopic(t *testing.T) {
emitter := NewMemoryEmitter()
// Creating a topic by subscribing to it.
_, err := emitter.On("testTopic", func(e Event) error { return nil })
if err != nil {
t.Fatalf("On() failed with error: %v", err)
}
topic, err := emitter.GetTopic("testTopic")
if err != nil {
t.Fatalf("GetTopic() failed with error: %v", err)
}
if topic == nil {
t.Fatal("GetTopic() returned nil")
}
}
// TestEnsureTopic tests getting or creating a topic.
func TestEnsureTopic(t *testing.T) {
emitter := NewMemoryEmitter()
// Get or create a new topic.
topic := emitter.EnsureTopic("newTopic")
if topic == nil {
t.Fatal("EnsureTopic() should not return nil")
}
// Try to retrieve the same topic and check if it's the same instance.
sameTopic, err := emitter.GetTopic("newTopic")
if err != nil {
t.Fatalf("GetTopic() failed with error: %v", err)
}
if sameTopic != topic {
t.Fatal("EnsureTopic() did not return the same instance of the topic")
}
}
func TestWildcardSubscriptionAndEmiting(t *testing.T) {
emitter := NewMemoryEmitter()
topics := []string{
"event.some.*.*",
"event.some.*.run",
"event.some.**",
"**.thing.run",
}
expectedMatches := map[string][]string{
"event.some.thing.run": {"event.some.*.*", "event.some.*.run", "event.some.**", "**.thing.run"},
"event.some.thing.do": {"event.some.*.*", "event.some.**"},
"event.some.thing": {"event.some.**"},
}
receivedEvents := new(sync.Map) // A concurrent map to store received events.
// On the mock listener to all topics.
for _, topic := range topics {
topicName := topic
_, err := emitter.On(topicName, func(e Event) error {
// Record the event in the receivedEvents map.
eventPayload := e.Payload().(string)
t.Logf("Listener received event on topic: %s with payload: %s", topicName, eventPayload)
payloadEvents, _ := receivedEvents.LoadOrStore(eventPayload, new(sync.Map))
payloadEvents.(*sync.Map).Store(topicName, struct{}{})
return nil
})
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s", topic, err)
}
}
// Emit events to all topics and check if the listeners are notified.
for eventKey := range expectedMatches {
t.Logf("Emitting event: %s", eventKey)
emitter.Emit(eventKey, eventKey) // Use the eventKey as the payload for identification.
}
// Allow some time for the events to be processed asynchronously.
time.Sleep(1 * time.Second) // In real tests, use synchronization primitives instead of Sleep.
// Verify that the correct listeners were notified.
for eventKey, expectedTopics := range expectedMatches {
if topics, ok := receivedEvents.Load(eventKey); ok {
receivedTopics := make([]string, 0)
topics.(*sync.Map).Range(func(key, value interface{}) bool {
receivedTopic := key.(string)
receivedTopics = append(receivedTopics, receivedTopic)
return true
})
for _, expectedTopic := range expectedTopics {
if !contains(receivedTopics, expectedTopic) {
t.Errorf("Expected topic %s to be notified for event %s, but it was not", expectedTopic, eventKey)
}
}
} else {
t.Errorf("No events received for eventKey %s", eventKey)
}
}
}
func TestMemoryEmitterClose(t *testing.T) {
emitter := NewMemoryEmitter()
// Set up topics and listeners
topic1 := "topic1"
listener1 := func(e Event) error { return nil }
emitter.On(topic1, listener1)
topic2 := "topic2"
listener2 := func(e Event) error { return nil }
emitter.On(topic2, listener2)
// Use a Pool
pool := NewPondPool(10, 1000)
emitter.SetPool(pool)
// Close the emitter
err := emitter.Close()
if err != nil {
t.Errorf("Close() should not return an error: %v", err)
}
// Verify topics have been removed
_, err = emitter.GetTopic(topic1)
if err == nil {
t.Errorf("GetTopic() should return an error after Close()")
}
_, err = emitter.GetTopic(topic2)
if err == nil {
t.Errorf("GetTopic() should return an error after Close()")
}
// Verify the pool has been released
if pool.Running() > 0 {
t.Errorf("Pool should be released and have no running workers after Close()")
}
// Verify that no new events can be emitted
errChan := emitter.Emit(topic1, "payload")
select {
case err := <-errChan:
if err == nil {
t.Errorf("Emit() should return an error after Close()")
}
case <-time.After(5 * time.Second):
t.Fatal("Test timed out waiting for the error to be received")
}
}
func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}
func createTestListener(received chan<- string) func(Event) error {
return func(e Event) error {
// Send the topic to the received channel.
received <- e.Topic()
return nil
}
}