-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent.go
106 lines (92 loc) · 2.52 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package google_pubsub
import (
"context"
"errors"
"time"
"github.com/cenkalti/backoff"
)
type Context struct {
Metadata map[string]string
EventName string
Data []byte
}
type Notifier interface {
Dispatch(name string, data []byte, metadata map[string]string) error
Close()
}
type (
// Listener Deprecated please use AdvancedPubSubListener instead.
//To allow compatibility please use NewAdvancedPubSubListener(oldListener Listener, true)
Listener interface {
EventName() string
GroupID() string
Caller([]byte, map[string]string) error
OnSuccess(metadata map[string]string)
OnError(err error, metadata map[string]string)
}
AdvancedPubSubListener interface {
Listener
EarlyAckEnabled() bool
}
)
type Event struct {
MaxRetries uint64
listeners []Listener
Channel chan Context
}
// NewEvent allow client to retrieve in Event instance
func NewEvent(maxRetries uint64) Event {
return Event{
MaxRetries: maxRetries,
listeners: make([]Listener, 0, 10),
Channel: make(chan Context, maxRetries),
}
}
// Subscribe allow client to subscribe
func (e *Event) Subscribe(listeners ...Listener) {
e.listeners = append(e.listeners, listeners...)
}
// Dispatch event based on event name
func (e *Event) Dispatch(name string, data []byte, metadata map[string]string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
select {
case e.Channel <- Context{EventName: name, Data: data, Metadata: metadata}:
return nil
case <-ctx.Done():
return errors.New("there is not listeners available. Channel is busy")
}
}
// Start should be called once on start up
func (e *Event) Start() error {
const numWorkers = 4
for w := 1; w <= numWorkers; w++ {
go e.worker()
}
return nil
}
// Close allow client to stop channels
func (e *Event) Close() {
close(e.Channel)
}
func (e *Event) worker() {
for event := range e.Channel {
for _, listener := range e.listeners {
if event.EventName == listener.EventName() {
go func(maxRetries uint64, l Listener, metadata map[string]string, data []byte) {
_ = retry(maxRetries, func() error {
return l.Caller(data, metadata)
}, l, metadata)
}(e.MaxRetries, listener, event.Metadata, event.Data)
}
}
}
}
func retry(maxRetries uint64, fn func() error, listener Listener, metadata map[string]string) error {
if err := backoff.Retry(fn, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), maxRetries)); err != nil {
listener.OnError(err, metadata)
return err
}
listener.OnSuccess(metadata)
return nil
}