Skip to content

Commit 75a5066

Browse files
authored
kafka(ticdc): sarama do not retry if produce message failed to prevent out of order (#11870) (#12026)
close #11935
1 parent 79bd0c0 commit 75a5066

File tree

2 files changed

+6
-9
lines changed

2 files changed

+6
-9
lines changed

cdc/sink/mq/producer/kafka/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,14 +455,14 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
455455
// For kafka cluster with a bad network condition, producer should not try to
456456
// waster too much time on sending a message, get response no matter success
457457
// or fail as soon as possible is preferred.
458-
config.Producer.Retry.Max = 3
459-
config.Producer.Retry.Backoff = 100 * time.Millisecond
458+
config.Producer.Retry.Max = 0
460459

461460
// make sure sarama producer flush messages as soon as possible.
462461
config.Producer.Flush.Bytes = 0
463462
config.Producer.Flush.Messages = 0
464463
config.Producer.Flush.Frequency = time.Duration(0)
465464

465+
config.Net.MaxOpenRequests = 1
466466
config.Net.DialTimeout = c.DialTimeout
467467
config.Net.WriteTimeout = c.WriteTimeout
468468
config.Net.ReadTimeout = c.ReadTimeout

pkg/logutil/log.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,11 @@ func ZapErrorFilter(err error, filterErrors ...error) zap.Field {
195195

196196
// initSaramaLogger hacks logger used in sarama lib
197197
func initSaramaLogger(level zapcore.Level) error {
198-
// only available less than info level
199-
if !zapcore.InfoLevel.Enabled(level) {
200-
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
201-
if err != nil {
202-
return errors.Trace(err)
203-
}
204-
sarama.Logger = logger
198+
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
199+
if err != nil {
200+
return errors.Trace(err)
205201
}
202+
sarama.Logger = logger
206203
return nil
207204
}
208205

0 commit comments

Comments
 (0)