Skip to content

Commit

Permalink
Variable modified in read method
Browse files Browse the repository at this point in the history
  • Loading branch information
guan46 committed Jan 31, 2025
1 parent cdab2d6 commit 58dd563
Showing 1 changed file with 12 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,17 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
if (permits.intValue() > 0) {
boolean canDispatchEntry = canDispatchEntry(consumer, entry, readType, stickyKeyHash);
if (!canDispatchEntry) {
if(blockedByHash != null){
blockedByHash.setTrue();
}
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
}
}
}
}
Expand Down Expand Up @@ -507,27 +512,18 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {

// checks if the entry can be dispatched to the consumer
private boolean canDispatchEntry(Consumer consumer, Entry entry,
ReadType readType, int stickyKeyHash,
MutableBoolean blockedByHash) {
ReadType readType, int stickyKeyHash) {
// If redeliveryMessages contains messages that correspond to the same hash as the entry to be dispatched
// do not send those messages for order guarantee
if (readType == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}

if (drainingHashesRequired) {
// If the hash is draining, do not send the message
if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}
}

return true;
}

Expand Down

0 comments on commit 58dd563

Please sign in to comment.