From 9fe78b267ca56fc1d1d0f7efb9b28569f095cc0c Mon Sep 17 00:00:00 2001 From: Jeremy Yen Date: Thu, 14 Dec 2023 12:39:35 -0800 Subject: [PATCH] Add CommitInterval to kafka new reader --- internal/kafka.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/kafka.go b/internal/kafka.go index 40e43e9..70860a1 100644 --- a/internal/kafka.go +++ b/internal/kafka.go @@ -89,11 +89,12 @@ func RunKafkaReader( // XXX this infinite loop is so we reconnect if we get dropped. for { r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: config.KafkaBrokers, - GroupID: uuid.New().String(), - StartOffset: kafka.LastOffset, - Topic: config.KafkaCommandTopic, - Dialer: getDialer(config), + Brokers: config.KafkaBrokers, + GroupID: uuid.New().String(), + StartOffset: kafka.LastOffset, + Topic: config.KafkaCommandTopic, + Dialer: getDialer(config), + CommitInterval: time.Second * 10, }) defer r.Close()