-
-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathoutboxer.go
164 lines (139 loc) · 4.37 KB
/
outboxer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Package outboxer is an implementation of the outbox pattern.
// The producer of messages can durably store those messages in a local outbox before sending to a Message Endpoint.
// The durable local storage may be implemented in the Message Channel directly, especially when combined
// with Idempotent Messages.
package outboxer
import (
"context"
"database/sql"
"errors"
"time"
)
const (
messageBatchSize = 100
cleanUpBatchSize = 100
)
var (
// ErrMissingEventStream is used when no event stream is provided.
ErrMissingEventStream = errors.New("an event stream is required for the outboxer to work")
// ErrMissingDataStore is used when no data store is provided.
ErrMissingDataStore = errors.New("a data store is required for the outboxer to work")
)
// ExecerContext defines the exec context method that is used within a transaction.
type ExecerContext interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// DataStore defines the data store methods.
type DataStore interface {
// Tries to find the given message in the outbox.
GetEvents(ctx context.Context, batchSize int32) ([]*OutboxMessage, error)
Add(ctx context.Context, m *OutboxMessage) error
AddWithinTx(ctx context.Context, m *OutboxMessage, fn func(ExecerContext) error) error
SetAsDispatched(ctx context.Context, id int64) error
Remove(ctx context.Context, since time.Time, batchSize int32) error
}
// EventStream defines the event stream methods.
type EventStream interface {
Send(context.Context, *OutboxMessage) error
}
// Outboxer implements the outbox pattern.
type Outboxer struct {
cleanUpBefore time.Time
ds DataStore
es EventStream
errChan chan error
okChan chan struct{}
checkInterval time.Duration
cleanUpInterval time.Duration
cleanUpBatchSize int32
messageBatchSize int32
}
// New creates a new instance of Outboxer.
func New(opts ...Option) (*Outboxer, error) {
o := Outboxer{
errChan: make(chan error),
okChan: make(chan struct{}),
messageBatchSize: messageBatchSize,
cleanUpBatchSize: cleanUpBatchSize,
}
for _, opt := range opts {
opt(&o)
}
if o.ds == nil {
return nil, ErrMissingDataStore
}
if o.es == nil {
return nil, ErrMissingEventStream
}
return &o, nil
}
// ErrChan returns the error channel.
func (o *Outboxer) ErrChan() <-chan error {
return o.errChan
}
// OkChan returns the ok channel that is used when each message is successfully delivered.
func (o *Outboxer) OkChan() <-chan struct{} {
return o.okChan
}
// Send sends a message.
func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error {
return o.ds.Add(ctx, m)
}
// SendWithinTx encapsulate any database call within a transaction.
func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error {
return o.ds.AddWithinTx(ctx, evt, fn)
}
// Start encapsulates two go routines. Starts the dispatcher, which is responsible for getting the messages
// from the data store and sending to the event stream.
// Starts the cleanup process, that makes sure old messages are removed from the data store.
func (o *Outboxer) Start(ctx context.Context) {
go o.StartDispatcher(ctx)
go o.StartCleanup(ctx)
}
// StartDispatcher starts the dispatcher, which is responsible for getting the messages
// from the data store and sending to the event stream.
func (o *Outboxer) StartDispatcher(ctx context.Context) {
ticker := time.NewTicker(o.checkInterval)
for {
select {
case <-ticker.C:
evts, err := o.ds.GetEvents(ctx, o.messageBatchSize)
if err != nil {
o.errChan <- err
break
}
for _, e := range evts {
if err := o.es.Send(ctx, e); err != nil {
o.errChan <- err
} else {
if err := o.ds.SetAsDispatched(ctx, e.ID); err != nil {
o.errChan <- err
} else {
o.okChan <- struct{}{}
}
}
}
case <-ctx.Done():
return
}
}
}
// StartCleanup starts the cleanup process, that makes sure old messages are removed from the data store.
func (o *Outboxer) StartCleanup(ctx context.Context) {
ticker := time.NewTicker(o.cleanUpInterval)
for {
select {
case <-ticker.C:
if err := o.ds.Remove(ctx, o.cleanUpBefore, o.cleanUpBatchSize); err != nil {
o.errChan <- err
}
case <-ctx.Done():
return
}
}
}
// Stop closes all channels.
func (o *Outboxer) Stop() {
close(o.errChan)
close(o.okChan)
}