Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for an infinite loop in cooperative sticky assignor #4800

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@

librdkafka v2.6.1 is a maintenance release:

* Fix for a Fetch regression when connecting to Apache Kafka < 2.7 (#4871).
* Fix for a Fetch regression when connecting to Apache Kafka < 2.7 (#4871).
* Fix for an infinite loop happening with cooperative-sticky assignor
under some particular conditions (#4800).


## Fixes

### Consumer fixes

* Issues: #4870
Fix for a Fetch regression when connecting to Apache Kafka < 2.7, causing
fetches to fail.
Happening since v2.6.0 (#4871)
* Issues: #4870
Fix for a Fetch regression when connecting to Apache Kafka < 2.7, causing
fetches to fail.
Happening since v2.6.0 (#4871)
* Issues: #4783.
A consumer configured with the `cooperative-sticky` partition assignment
strategy could get stuck in an infinite loop, with corresponding spike of
main thread CPU usage.
That happened with some particular orders of members and potential
assignable partitions.
Solved by removing the infinite loop cause.
Happening since: 1.6.0 (#4800).



Expand Down
5 changes: 3 additions & 2 deletions src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ isBalanced(rd_kafka_t *rk,
* currentAssignment's element we get both the consumer
* and partition list in elem here. */
RD_LIST_FOREACH(elem, sortedCurrentSubscriptions, i) {
int j;
const char *consumer = (const char *)elem->key;
const rd_kafka_topic_partition_list_t *potentialTopicPartitions;
const rd_kafka_topic_partition_list_t *consumerPartitions;
Expand All @@ -836,9 +837,9 @@ isBalanced(rd_kafka_t *rk,

/* Otherwise make sure it can't get any more partitions */

for (i = 0; i < potentialTopicPartitions->cnt; i++) {
for (j = 0; j < potentialTopicPartitions->cnt; j++) {
const rd_kafka_topic_partition_t *partition =
&potentialTopicPartitions->elems[i];
&potentialTopicPartitions->elems[j];
const char *otherConsumer;
int otherConsumerPartitionCount;

Expand Down