Skip to content

Commit 59495ba

Browse files
authored
fix: fix kafka writer batch conf (#3498)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent df82939 commit 59495ba

File tree

1 file changed

+30
-9
lines changed

1 file changed

+30
-9
lines changed

extensions/sinks/kafka/ext/kafka.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/json"
2020
"fmt"
2121
"strings"
22+
"time"
2223

2324
kafkago "github.com/segmentio/kafka-go"
2425

@@ -46,10 +47,17 @@ type sinkConf struct {
4647
}
4748

4849
type kafkaConf struct {
49-
MaxAttempts int `json:"maxAttempts"`
50-
RequiredACKs int `json:"requiredACKs"`
51-
Key string `json:"key"`
52-
Headers interface{} `json:"headers"`
50+
MaxAttempts int `json:"maxAttempts"`
51+
RequiredACKs int `json:"requiredACKs"`
52+
Key string `json:"key"`
53+
Headers interface{} `json:"headers"`
54+
WriterConf kafkaWriterConf `json:"writerConf"`
55+
}
56+
57+
type kafkaWriterConf struct {
58+
BatchSize int `json:"batchSize"`
59+
BatchTimeout time.Duration `json:"batchTimeout"`
60+
BatchBytes int64 `json:"batchBytes"`
5361
}
5462

5563
func (m *kafkaSink) Ping(_ string, props map[string]interface{}) error {
@@ -92,10 +100,7 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error {
92100
return err
93101
}
94102
m.tlsConfig = tlsConfig
95-
kc := &kafkaConf{
96-
RequiredACKs: -1,
97-
MaxAttempts: 1,
98-
}
103+
kc := getDefaultKafkaConf()
99104
if err := cast.MapToStruct(props, kc); err != nil {
100105
return err
101106
}
@@ -122,12 +127,15 @@ func (m *kafkaSink) buildKafkaWriter() error {
122127
AllowAutoTopicCreation: true,
123128
MaxAttempts: m.kc.MaxAttempts,
124129
RequiredAcks: kafkago.RequiredAcks(m.kc.RequiredACKs),
125-
BatchSize: 1,
130+
BatchSize: m.kc.WriterConf.BatchSize,
131+
BatchBytes: m.kc.WriterConf.BatchBytes,
132+
BatchTimeout: m.kc.WriterConf.BatchTimeout,
126133
Transport: &kafkago.Transport{
127134
SASL: mechanism,
128135
TLS: m.tlsConfig,
129136
},
130137
}
138+
conf.Log.Infof("kafka writer batchSize:%v, batchTimeout:%v", m.kc.WriterConf.BatchSize, m.kc.WriterConf.BatchTimeout.String())
131139
m.writer = w
132140
return nil
133141
}
@@ -310,3 +318,16 @@ func (m *kafkaSink) ping(address string) error {
310318
defer c.Close()
311319
return nil
312320
}
321+
322+
func getDefaultKafkaConf() *kafkaConf {
323+
c := &kafkaConf{
324+
RequiredACKs: -1,
325+
MaxAttempts: 1,
326+
WriterConf: kafkaWriterConf{
327+
BatchSize: 5000,
328+
BatchTimeout: 200 * time.Millisecond,
329+
BatchBytes: 1048576 * 10, // 10MB
330+
},
331+
}
332+
return c
333+
}

0 commit comments

Comments
 (0)