Skip to content

Commit

Permalink
Add CommitInterval to kafka new reader
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremy5189 committed Dec 14, 2023
1 parent 2f06def commit 9fe78b2
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions internal/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ func RunKafkaReader(
// XXX this infinite loop is so we reconnect if we get dropped.
for {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaBrokers,
GroupID: uuid.New().String(),
StartOffset: kafka.LastOffset,
Topic: config.KafkaCommandTopic,
Dialer: getDialer(config),
Brokers: config.KafkaBrokers,
GroupID: uuid.New().String(),
StartOffset: kafka.LastOffset,
Topic: config.KafkaCommandTopic,
Dialer: getDialer(config),
CommitInterval: time.Second * 10,
})
defer r.Close()

Expand Down

0 comments on commit 9fe78b2

Please sign in to comment.