-
Notifications
You must be signed in to change notification settings - Fork 12k
Open
Labels
Description
Before Creating the Enhancement Request
- I have confirmed that this should be classified as an enhancement rather than a bug/feature.
Summary
public synchronized void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
...
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
...
Consumer need to wait fixed 10 seconds after recieve broker's reset offset request, which is neither elegant nor efficient. The current implementation could cause badly consumption pause with multiple and dense request. We may check each process queue's status with a max waiting time until they are all clear.
Motivation
To polish code implementation and avoid perhaps performance side effect.
Describe the Solution You'd Like
Implement a neat polling for every process queue.
Describe Alternatives You've Considered
More complicated logic like CountDownLatch may help too.
Additional Context
No response
Reactions are currently unavailable