-
Notifications
You must be signed in to change notification settings - Fork 106
/
inputkafka.go
234 lines (202 loc) · 6.97 KB
/
inputkafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package inputkafka
import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
)
// ModuleName is the name used in config file
const ModuleName = "kafka"
// InputConfig holds the configuration json fields and internal objects
type InputConfig struct {
config.InputConfig
Version string `json:"version"` // Kafka cluster version, eg: 0.10.2.0
Brokers []string `json:"brokers"` // Kafka bootstrap brokers to connect to, as a comma separated list
Topics []string `json:"topics"` // Kafka topics to be consumed, as a comma separated list
Group string `json:"group"` // Kafka consumer group definition
OffsetOldest bool `json:"offset_oldest"` // Kafka consumer consume initial offset from oldest
Assignor string `json:"assignor"` // Consumer group partition assignment strategy (range, roundrobin)
SecurityProtocol string `json:"security_protocol,omitempty"` // use SASL authentication
SaslMechanism string `json:"sasl_mechanism,omitempty"` // use SASL mechanism
User string `json:"sasl_username,omitempty"` // SASL authentication username
Password string `json:"sasl_password,omitempty"` // SASL authentication password
saConf *sarama.Config
}
// DefaultInputConfig returns an InputConfig struct with default values
func DefaultInputConfig() InputConfig {
return InputConfig{
InputConfig: config.InputConfig{
CommonConfig: config.CommonConfig{
Type: ModuleName,
},
},
SecurityProtocol: "",
SaslMechanism: "",
User: "",
Password: "",
}
}
// InitHandler initialize the input plugin
func InitHandler(
ctx context.Context,
raw config.ConfigRaw,
control config.Control,
) (config.TypeInputConfig, error) {
conf := DefaultInputConfig()
err := config.ReflectConfig(raw, &conf)
if err != nil {
return nil, err
}
sarama.Logger = goglog.Logger
version, err := sarama.ParseKafkaVersion(conf.Version)
if err != nil {
goglog.Logger.Errorf("Error parsing Kafka version: %v", err)
return nil, err
}
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
sarConfig := sarama.NewConfig()
sarConfig.Version = version
sarConfig.Consumer.MaxProcessingTime = 500 * time.Millisecond
sarConfig.Consumer.Group.Session.Timeout = 20 * time.Second
sarConfig.Consumer.Group.Heartbeat.Interval = 6 * time.Second
switch conf.Assignor {
case "roundrobin":
sarConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
sarConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
goglog.Logger.Errorf("Unrecognized consumer group partition assignor: %s", conf.Assignor)
}
if conf.OffsetOldest {
sarConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
}
if len(conf.Topics) < 1 {
goglog.Logger.Error("topics should not be empty")
return nil, err
}
if conf.Group == "" {
goglog.Logger.Error("group should not be empty")
return nil, err
}
if len(conf.Brokers) == 0 {
goglog.Logger.Error("topics should not be empty")
return nil, err
}
if conf.SecurityProtocol == "SASL" {
sarConfig.Net.SASL.Enable = true
sarConfig.Net.SASL.User = conf.User
sarConfig.Net.SASL.Password = conf.Password
if conf.SaslMechanism == "SCRAM-SHA-512" {
sarConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &SCRAMClient{HashGeneratorFcn: SHA512} }
sarConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} else if conf.SaslMechanism == "SCRAM-SHA-256" {
sarConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &SCRAMClient{HashGeneratorFcn: SHA256} }
sarConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
}
}
conf.saConf = sarConfig
conf.Codec, err = config.GetCodecOrDefault(ctx, raw["codec"])
if err != nil {
return nil, err
}
return &conf, nil
}
// Start wraps the actual function starting the plugin
func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) (err error) {
/**
* Setup a new Sarama consumer group
*/
cum := consumerHandle{
i: t,
ch: msgChan,
ready: make(chan bool),
ctx: ctx,
}
ct, cancel := context.WithCancel(ctx)
defer cancel()
client, err := sarama.NewConsumerGroup(t.Brokers, t.Group, t.saConf)
if err != nil {
goglog.Logger.Errorf("Error creating consumer group client: %v", err)
return err
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ct, t.Topics, &cum); err != nil {
goglog.Logger.Errorf("Error from consumer: %v", err)
}
// check if context was canceled, signaling that the consumer should stop
if ct.Err() != nil {
return
}
cum.ready = make(chan bool)
}
}()
<-cum.ready // Await till the consumer has been set up
goglog.Logger.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ct.Done():
goglog.Logger.Println("terminating: context canceled")
case <-sigterm:
goglog.Logger.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
goglog.Logger.Errorf("Error closing client: %v", err)
return err
}
return nil
}
// consumerHandle represents a Sarama consumer group consumer
type consumerHandle struct {
i *InputConfig
ch chan<- logevent.LogEvent
ready chan bool
ctx context.Context
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *consumerHandle) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *consumerHandle) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *consumerHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
var extra = map[string]any{
"topic": message.Topic,
"timestamp": message.Timestamp,
}
ok, err := c.i.Codec.Decode(c.ctx, string(message.Value), extra, []string{}, c.ch)
if !ok {
goglog.Logger.Errorf("decode message to msg chan error : %v", err)
}
session.MarkMessage(message, "")
}
return nil
}