forked from twitchscience/kinsumer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfig.go
203 lines (164 loc) · 6.18 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Copyright (c) 2016 Twitch Interactive
package kinsumer
import (
"time"
)
//TODO: Update documentation to include the defaults
//TODO: Update the 'with' methods' comments to be less ridiculous
// Config holds all configuration values for a single Kinsumer instance
type Config struct {
stats StatReceiver
logger Logger
manualCheckpointing bool
// ---------- [ Per Shard Worker ] ----------
// Time to sleep if no records are found
throttleDelay time.Duration
// Delay between commits to the checkpoint database
commitFrequency time.Duration
// Delay between tests for the client or shard numbers changing
shardCheckFrequency time.Duration
// Max age for client record before we consider it stale
clientRecordMaxAge *time.Duration
// Starting timestamp of the shard iterator, if "AT_TIMESTAMP" is the desired iterator type
iteratorStartTimestamp *time.Time
// ---------- [ For the leader (first client alphabetically) ] ----------
// Time between leader actions
leaderActionFrequency time.Duration
// ---------- [ For the entire Kinsumer ] ----------
// Size of the buffer for the combined records channel. When the channel fills up
// the workers will stop adding new elements to the queue, so a slow client will
// potentially fall behind the kinesis stream.
bufferSize int
// ---------- [ For the Dynamo DB tables ] ----------
// Read and write capacity for the Dynamo DB tables when created
// with CreateRequiredTables() call. If tables already exist because they were
// created on a prevoius run or created manually, these parameters will not be used.
dynamoReadCapacity int64
dynamoWriteCapacity int64
// Time to wait between attempts to verify tables were created/deleted completely
dynamoWaiterDelay time.Duration
// use ListShards to avoid LimitExceedException from DescribeStream
useListShardsForKinesisStreamReady bool
}
// NewConfig returns a default Config struct
func NewConfig() Config {
return Config{
throttleDelay: 250 * time.Millisecond,
commitFrequency: 1000 * time.Millisecond,
shardCheckFrequency: 1 * time.Minute,
leaderActionFrequency: 1 * time.Minute,
bufferSize: 100,
stats: &NoopStatReceiver{},
dynamoReadCapacity: 10,
dynamoWriteCapacity: 10,
dynamoWaiterDelay: 3 * time.Second,
logger: &DefaultLogger{},
}
}
// WithManualCheckpointing returns a Config with a modified manual checkpointing flag
// If set to false, records will be automatically checkpointed upon calls to Next()
// If set to true, NextWithCheckpointer() must be used and the returned checkpointer function
// must be called when the record is fully processed.
func (c Config) WithManualCheckpointing(v bool) Config {
c.manualCheckpointing = v
return c
}
// WithThrottleDelay returns a Config with a modified throttle delay
func (c Config) WithThrottleDelay(delay time.Duration) Config {
c.throttleDelay = delay
return c
}
// WithCommitFrequency returns a Config with a modified commit frequency
func (c Config) WithCommitFrequency(commitFrequency time.Duration) Config {
c.commitFrequency = commitFrequency
return c
}
// WithShardCheckFrequency returns a Config with a modified shard check frequency
func (c Config) WithShardCheckFrequency(shardCheckFrequency time.Duration) Config {
c.shardCheckFrequency = shardCheckFrequency
return c
}
// WithClientRecordMaxAge returns a config with a modified client record max age
func (c Config) WithClientRecordMaxAge(clientRecordMaxAge *time.Duration) Config {
c.clientRecordMaxAge = clientRecordMaxAge
return c
}
// WithLeaderActionFrequency returns a Config with a modified leader action frequency
func (c Config) WithLeaderActionFrequency(leaderActionFrequency time.Duration) Config {
c.leaderActionFrequency = leaderActionFrequency
return c
}
// WithBufferSize returns a Config with a modified buffer size
func (c Config) WithBufferSize(bufferSize int) Config {
c.bufferSize = bufferSize
return c
}
// WithStats returns a Config with a modified stats
func (c Config) WithStats(stats StatReceiver) Config {
c.stats = stats
return c
}
// WithIteratorStartTimestamp returns a Config with a modified iteratorStartTimestamp
func (c Config) WithIteratorStartTimestamp(timestamp *time.Time) Config {
c.iteratorStartTimestamp = timestamp
return c
}
// WithDynamoReadCapacity returns a Config with a modified dynamo read capacity
func (c Config) WithDynamoReadCapacity(readCapacity int64) Config {
c.dynamoReadCapacity = readCapacity
return c
}
// WithDynamoWriteCapacity returns a Config with a modified dynamo write capacity
func (c Config) WithDynamoWriteCapacity(writeCapacity int64) Config {
c.dynamoWriteCapacity = writeCapacity
return c
}
// WithDynamoWaiterDelay returns a Config with a modified dynamo waiter delay
func (c Config) WithDynamoWaiterDelay(delay time.Duration) Config {
c.dynamoWaiterDelay = delay
return c
}
// WithLogger returns a Config with a modified logger
func (c Config) WithLogger(logger Logger) Config {
c.logger = logger
return c
}
// WithUseListShardsForKinesisStreamReady returns a config with a modified useListShardsForKinesisStreamReady toggle
func (c Config) WithUseListShardsForKinesisStreamReady(shouldUse bool) Config {
c.useListShardsForKinesisStreamReady = shouldUse
return c
}
// Verify that a config struct has sane and valid values
func validateConfig(c *Config) error {
if c.throttleDelay < 200*time.Millisecond {
return ErrConfigInvalidThrottleDelay
}
if c.commitFrequency == 0 {
return ErrConfigInvalidCommitFrequency
}
if c.shardCheckFrequency == 0 {
return ErrConfigInvalidShardCheckFrequency
}
if c.leaderActionFrequency == 0 {
return ErrConfigInvalidLeaderActionFrequency
}
if c.clientRecordMaxAge != nil && *c.clientRecordMaxAge < c.shardCheckFrequency {
return ErrConfigInvalidClientRecordMaxAge
}
if c.shardCheckFrequency > c.leaderActionFrequency {
return ErrConfigInvalidLeaderActionFrequency
}
if c.bufferSize == 0 {
return ErrConfigInvalidBufferSize
}
if c.stats == nil {
return ErrConfigInvalidStats
}
if c.dynamoReadCapacity == 0 || c.dynamoWriteCapacity == 0 {
return ErrConfigInvalidDynamoCapacity
}
if c.logger == nil {
return ErrConfigInvalidLogger
}
return nil
}