-
Notifications
You must be signed in to change notification settings - Fork 2
/
worker.go
372 lines (315 loc) · 10.5 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
package worker
import (
"errors"
"fmt"
"math/rand"
"os"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/transcovo/go-chpr-logger"
"github.com/transcovo/go-chpr-metrics"
)
var errDefaultRecover = errors.New("AMQP handler panicked")
/*
AmqpHandler interface represents a AMQP message handler.
*/
type AmqpHandler interface {
HandleMessage([]byte) error
}
/*
AmqpConsumer struct declares a couple of AMQP routing key and AmqpHandlerFunc to
call when a message is received.
*/
type AmqpConsumer struct {
/*
RoutingKey is the AMQP routing key on which to bind the queue.
*/
RoutingKey string
/*
Handler is the function to call when a message matching the RoutingKey is received.
It should return an error when there is a need to re-queue the message for a second try.
*/
Handler AmqpHandler
}
/*
AmqpWorker struct contains the necessary information to initialize, connect
and consume messages on AMQP.
*/
type AmqpWorker struct {
/*
AmqpURL is the AMQP connection string.
*/
AmqpURL string
/*
Exchange is the AMQP exchange name.
*/
Exchange string
/*
Queue is the AMQP queue name.
*/
Queue string
/*
ConsumerTag is the AMQP consumer tag.
It must be unique per consumer. If an empty string is given, a random and
unique one will be generated.
*/
ConsumerTag string
/*
ChannelPrefetchCount is the AMQP prefetch count setting.
It can be set to 0 to disable the setting.
*/
ChannelPrefetchCount int
/*
Handlers is an array of AmqpConsumer to consume on AMQP.
*/
Handlers []AmqpConsumer
/*
ChannelCloseTimeout is the duration to wait between channels cancelation
and connection close.
It lets handlers finish their tasks after a close is required.
The higher it is, the more a task has time to finish its work
By default, it will not wait
*/
ChannelCloseTimeout time.Duration
// protects about data race condition on the channel
mu sync.RWMutex
requestCloseChannel chan chan struct{}
}
type consumerTagData struct {
ProgramName string
UID int64
}
func uniqueConsumerTag(data *consumerTagData) string {
return fmt.Sprintf("ctag-%s-%d", data.ProgramName, data.UID)
}
func failOnError(err error, msg string) {
if err != nil {
logger.WithFields(logrus.Fields{
"msg": msg,
"err": err,
}).Error("[lib.amqp#failOnError] Error")
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func getChannel(channelGetter func() (*amqp.Channel, error)) *amqp.Channel {
channel, err := channelGetter()
if err != nil {
failOnError(err, "Failed to open a channel")
}
return channel
}
type exchangeDeclareFunc func(string, string, bool, bool, bool, bool, amqp.Table) error
func declareExchange(exchangeDeclare exchangeDeclareFunc, exchangeName string) {
err := exchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil)
if err != nil {
failOnError(err, "Failed to declare an exchange")
}
}
type queueDeclareFunc func(string, bool, bool, bool, bool, amqp.Table) (amqp.Queue, error)
func declareQueue(queueDeclare queueDeclareFunc, queueName string, args amqp.Table) amqp.Queue {
queue, err := queueDeclare(queueName, true, false, false, false, args)
if err != nil {
failOnError(err, "Failed to declare a queue")
}
return queue
}
type channelQosFunc func(int, int, bool) error
func setChannelQos(qosSet channelQosFunc, prefetchCount int) {
err := qosSet(prefetchCount, 0, false)
if err != nil {
failOnError(err, "Failed to set channel QoS")
}
}
type queueBindFunc func(string, string, string, bool, amqp.Table) error
func bindQueue(queueBind queueBindFunc, exchangeName, queueName, routingKey string) {
err := queueBind(queueName, routingKey, exchangeName, false, nil)
if err != nil {
failOnError(err, "Failed to declare bind a queue")
}
}
type consumeChannelFunc func(string, string, bool, bool, bool, bool, amqp.Table) (<-chan amqp.Delivery, error)
func consumeChannel(consume consumeChannelFunc, queueName, consumerTag string) <-chan amqp.Delivery {
messages, err := consume(queueName, consumerTag, false, false, false, false, nil)
if err != nil {
failOnError(err, "Failed to declare consume a queue")
}
return messages
}
/*
Start function is a helper to connect, initialize and consume messages on
AMQP.
The steps are the following:
* connect to AMQP
* get the channel
* declare the exchange
* declare the queue
* set the QoS settings (prefetch count)
* for the array of handlers bind and consume the queue
The function is blocking, is waits for system signals to close the connection
and return.
The caller has to declare a configuration structure (AmqpWorker) and define
a channel to listen on system signals:
signals := make(chan os.Signal)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
worker := lib.AmqpWorker{
AmqpURL: "amqp://localhost",
Exchange: "exchange_name",
Queue: "queue_name",
ChannelPrefetchCount: 0,
ConsumerTag: "",
Handlers: []lib.AmqpConsumer{lib.AmqpConsumer{RoutingKey: "routing_key", Handler: someHandler}},
ChannelCloseTimeout: 1*time.Second,
}
worker.Start(signals)
The AMQP consumers must all have a unique consumer tag. It can be provided in
the AmqpWorker given as parameter, or if it is an empty string, a random and
unique one is generated.
*/
func (worker *AmqpWorker) Start(signals <-chan os.Signal) {
var channels []*amqp.Channel
if worker.ConsumerTag == "" {
data := consumerTagData{
ProgramName: os.Args[0],
UID: rand.Int63(),
}
worker.ConsumerTag = uniqueConsumerTag(&data)
}
worker.mu.Lock()
worker.requestCloseChannel = make(chan chan struct{})
worker.mu.Unlock()
conn, err := amqp.Dial(worker.AmqpURL)
if err != nil {
failOnError(err, "Failed to connect to RabbitMQ")
}
defer conn.Close()
notifyClose := conn.NotifyClose(make(chan *amqp.Error))
waitGroup := sync.WaitGroup{}
for _, handler := range worker.Handlers {
formattedQueueName := formatQueueName(worker.Queue, handler.RoutingKey)
channel := getChannel(conn.Channel)
declareExchange(channel.ExchangeDeclare, worker.Exchange)
setChannelQos(channel.Qos, worker.ChannelPrefetchCount)
declareQueue(channel.QueueDeclare, formattedQueueName, nil)
bindQueue(channel.QueueBind, worker.Exchange, formattedQueueName, handler.RoutingKey)
messages := consumeChannel(channel.Consume, formattedQueueName, worker.ConsumerTag)
channels = append(channels, channel)
waitGroup.Add(1)
go handleMessages(messages, handler.Handler, &waitGroup)
}
requestClose := waitAnyEndSignal(signals, notifyClose, worker.requestCloseChannel)
defer func() {
if requestClose != nil {
close(requestClose)
}
}()
// after this call, messages channels will be closed soon
for _, channel := range channels {
channel.Cancel(worker.ConsumerTag, false)
}
waitGroup.Wait()
// after we canceled the channels, we wait for a bit to let the handlers finish
// their tasks
time.Sleep(worker.ChannelCloseTimeout)
}
// Stop stops a running worker. If it is not running, it returns directly
func (worker *AmqpWorker) Stop() {
worker.mu.RLock()
requestCloseChan := worker.requestCloseChannel
worker.mu.RUnlock()
if requestCloseChan == nil {
return
}
requestClose := make(chan struct{})
requestCloseChan <- requestClose
<-requestClose
}
/*
handleMessages call the handler on each amqp.Delivery = message
If an error on a correct message (ie not a validation error) is returned by the handler, then it is nacked and requeued
If an error on a correct message that has been redelivered ie failed before, the message is nacked to avoid queue filling up
If an error on a incorrect message is returned, the message is not requeued but nacked (this would cause the queue to fill up)
If no error is returned, the message is acked
*/
func handleMessages(messages <-chan amqp.Delivery, handler AmqpHandler, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
for msg := range messages {
waitGroup.Add(1)
go handleDeliveryWithRetry(msg, handler, waitGroup)
}
}
/*
handleDeliveryWithRetry is meant to be called as a goroutine looped over a channel.
It wraps all the re-queuing mechanism.
*/
func handleDeliveryWithRetry(msg amqp.Delivery, handler AmqpHandler, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
err := handleSingleMessage(msg, handler)
scopeLogger := logger.WithFields(logrus.Fields{
"msg": msg,
"msgBody": string(msg.Body),
})
if err == nil {
msg.Ack(false)
return
}
scopeLogger = scopeLogger.WithError(err)
requeue := !msg.Redelivered
if requeue {
metrics.Increment("amqp.msg.nack.requeue")
scopeLogger.Warn("[lib.amqp#handleDeliveryWithRetry] First error handling on a message, requeuing it")
} else {
metrics.Increment("amqp.msg.nack.no_requeue")
scopeLogger.Error("[lib.amqp#handleDeliveryWithRetry] Second handling error on a message, nacking it")
}
msg.Nack(false, requeue)
}
/*
handleSingleMessage calls the given handler with the given message and catches panics.
On panic, the error is returned, otherwise the handler result is returned.
To be able to return the panic result, we used a named return variable to be able to set it in the
defer after the recover.
*/
func handleSingleMessage(msg amqp.Delivery, handler AmqpHandler) (err error) {
defer func() {
recoverErr := recover()
if recoverErr == nil {
return
}
logger.WithFields(logrus.Fields{
"err": recoverErr,
"msg": msg,
}).Error("[AMQP Recovery] Recovered panic from handler")
err = errDefaultRecover
castedErr, ok := recoverErr.(error)
if ok {
err = castedErr
}
}()
return handler.HandleMessage(msg.Body)
}
/*
waitAnyEndSignal allows the start function to receive any signal indicating a error in the worker and hence exiting safely
*/
func waitAnyEndSignal(signals <-chan os.Signal, amqpCloseConnection <-chan *amqp.Error, requestCloseChannel <-chan chan struct{}) chan struct{} {
select {
case signal := <-signals:
logger.WithField("signal", signal).Info("[lib.amqp#waitAnyEndSignal] Received signal from OS")
metrics.Increment("amqp.signals.os_exit")
return nil
case amqpError := <-amqpCloseConnection:
logger.WithField("connectionError", amqpError).Error("[lib.amqp#waitAnyEndSignal] Received error from AMQP connection")
metrics.Increment("amqp.signals.amqp_lost_connection")
return nil
case requestClose := <-requestCloseChannel:
logger.Info("[lib.amqp#waitAnyEndSignal] Close requested")
return requestClose
}
}
/*
formatQueueName return the formatted queue name based on the base queue name and the routing key
*/
func formatQueueName(queueName string, routingKey string) string {
return fmt.Sprintf("%s.%s", queueName, routingKey)
}