forked from francescopepe/formigo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.go
124 lines (99 loc) · 3.46 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package formigo
import (
"log"
"time"
"github.com/Pod-Point/go-queue-worker/internal/client"
)
const (
defaultErrorThreshold = 3
defaultErrorPeriod = time.Second * 120
defaultConcurrency = 100
defaultRetrievers = 1
defaultDeleterBufferSize = 10
defaultDeleterBufferTimeout = time.Millisecond * 500
)
type DeleterConfiguration struct {
BufferSize int
BufferTimeout time.Duration
}
// The ErrorConfiguration defines a threshold for which the worker stops. If the number
// of errors occurred during the worker execution passes the given Threshold on the
// specified Period, the worker stops.
type ErrorConfiguration struct {
// Number of errors that must occur in the Period before the worker stops.
// Default: 3.
Threshold int
// Duration of the period for which, if the number of errors passes the Threshold, the worker stops.
// Default: 120s.
Period time.Duration
// The error report function
ReportFunc func(err error)
}
// The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either
// the buffer is full or the timeout has passed since the first message got added.
type MultiMessageBufferConfiguration struct {
// Max number of messages that the buffer can contain.
// Default: 10.
Size int
// Time after which the buffer gets processed, no matter whether it is full or not.
// This value MUST be smaller tha VisibilityTimeout in the
// RetrieveMessageConfiguration + the maximum processing time of the handler.
// If this is not set correctly, the same message could be processed multiple times.
// Default: 1s.
Timeout time.Duration
}
type SingleMessageConsumerConfiguration struct {
Handler singleMessageHandler
}
type MultiMessageConsumerConfiguration struct {
Handler multiMessageHandler
BufferConfig MultiMessageBufferConfiguration
}
type Configuration struct {
// A queue client
Client client.Client
// Number of Go routines that process the messages from the Queue.
// The higher this value, the more Go routines are spawned to process the messages.
// Using a high value can be useful when the Handler of the consumer perform slow I/O operations.
// Default: 100.
Concurrency int
// Number of Go routines that retrieve messages from the Queue.
// The higher this value, the more Go routines are spawned to read the messages from the
// queue and provide them to the worker's consumers.
// Using a high value can be useful when the network is slow or when consumers are quicker
// than retrievers.
// Default: 1.
Retrievers int
// The ErrorConfiguration.
ErrorConfig ErrorConfiguration
// The messages Consumer.
Consumer consumer
// Configuration for the deleter
DeleterConfig DeleterConfiguration
}
func setWorkerConfigValues(config Configuration) Configuration {
if config.Retrievers == 0 {
config.Retrievers = defaultRetrievers
}
if config.Concurrency == 0 {
config.Concurrency = defaultConcurrency
}
if config.ErrorConfig.Threshold == 0 {
config.ErrorConfig.Threshold = defaultErrorThreshold
}
if config.ErrorConfig.Period == 0 {
config.ErrorConfig.Period = defaultErrorPeriod
}
if config.ErrorConfig.ReportFunc == nil {
config.ErrorConfig.ReportFunc = func(err error) {
log.Println("ERROR", err)
}
}
if config.DeleterConfig.BufferSize == 0 {
config.DeleterConfig.BufferSize = defaultDeleterBufferSize
}
if config.DeleterConfig.BufferTimeout == 0 {
config.DeleterConfig.BufferTimeout = defaultDeleterBufferTimeout
}
return config
}