From bd8861cb04b456693af7dff9c55c8c5451cad12b Mon Sep 17 00:00:00 2001 From: "muyun.cyt" <921148484@qq.com> Date: Fri, 20 Dec 2024 11:04:31 +0800 Subject: [PATCH] fix bug:seek offset won't work due to wrong map key type (#1184) Co-authored-by: muyun.cyt --- consumer/pull_consumer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 32587790..95920615 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -208,12 +208,17 @@ func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, origin if pc.SubType != Assign { return originOffset } - value, exist := pc.mq2seekOffset.LoadAndDelete(mq) + value, exist := pc.mq2seekOffset.LoadAndDelete(*mq) if !exist { return originOffset } else { nextOffset := value.(int64) _ = pc.updateOffset(mq, nextOffset) + rlog.Info("pull consumer assign new offset", map[string]interface{}{ + "group": pc.GroupName, + "mq": mq, + "offset": nextOffset, + }) return nextOffset } } @@ -711,7 +716,7 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes } func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) { - pc.mq2seekOffset.Store(mq, offset) + pc.mq2seekOffset.Store(*mq, offset) rlog.Info("pull consumer seek offset", map[string]interface{}{ "mq": mq, "offset": offset, @@ -881,6 +886,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag) } + rlog.Debug(fmt.Sprintf("defaultPullConsumer pull message from broker: %s, request: %+v", brokerResult.BrokerAddr, pullRequest), nil) + result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest) if err != nil { rlog.Warning("defaultPullConsumer pull message from broker error", map[string]interface{}{