From 60208860266271d76e743148f5e6a01abef9896a Mon Sep 17 00:00:00 2001 From: childe Date: Tue, 9 Jul 2024 10:51:42 +0800 Subject: [PATCH] add: sleep backoff ms --- simple_consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/simple_consumer.go b/simple_consumer.go index b432369..198cf1a 100644 --- a/simple_consumer.go +++ b/simple_consumer.go @@ -273,7 +273,6 @@ func (c *SimpleConsumer) Stop() { c.cancel() c.stop = true - // c.stopWG.Wait() close(c.messages) if c.leaderBroker != nil { @@ -344,6 +343,7 @@ func (c *SimpleConsumer) Consume(offset int64, messageChan chan *FullMessage) (< for !c.stop { if err = c.getLeaderBroker(); err != nil { logger.Error(err, "get leader broker of [%s/%d] error: %s", "topic", c.topic, "partitionID", c.partitionID) + time.Sleep(time.Millisecond * time.Duration(c.config.RetryBackOffMS)) } else { break } @@ -462,6 +462,7 @@ func (c *SimpleConsumer) consumeMessages(innerMessages chan *FullMessage, messag for !c.stop { if err = c.getLeaderBroker(); err != nil { logger.Error(err, "failer to get leader", "topic", c.topic, "partitionID", c.partitionID) + time.Sleep(time.Millisecond * time.Duration(c.config.RetryBackOffMS)) } else { break }