-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP 848 ListGroups API #4777
KIP 848 ListGroups API #4777
Conversation
- Mock handler implementation - Rename current consumer protocol from generic to classic - Mock handler with automatic or manual assignment - More consumer group metadata getters - Test helpers - Expedite next HB after FindCoordinator doing it with an exponential backoff to avoid tight loops - Configurable session timeout and HB interval - Fix mock handler ListOffsets response LeaderEpoch instead of CurrentLeaderEpoch - Integration tests passing with AK trunk - Improve documentation and KIP 848 specific mock tests - Add mock tests for unknown topic id in metadata request and partial reconciliation - Make test 0147 more reliable - Fix test 0106 after HB timeout change - Exclude test case with AK trunk - Rename rd_kafka_buf_write_tags to rd_kafka_buf_write_tags_empty - Trivup 0.12.5 can run a KafkaCluster directly with KRaft and AK trunk - Trivup 0.12.6 build with a specific commit
3b45306
to
aa99639
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First comments, on second commit only, about implementation except tests and example
* [KIP-848] integration tests passing - Mock handler implementation - Rename current consumer protocol from generic to classic - Mock handler with automatic or manual assignment - More consumer group metadata getters - Test helpers - Expedite next HB after FindCoordinator doing it with an exponential backoff to avoid tight loops - Configurable session timeout and HB interval - Fix mock handler ListOffsets response LeaderEpoch instead of CurrentLeaderEpoch - Integration tests passing with AK trunk - Improve documentation and KIP 848 specific mock tests - Add mock tests for unknown topic id in metadata request and partial reconciliation - Make test 0147 more reliable - Fix test 0106 after HB timeout change - Exclude test case with AK trunk * Trivup 0.12.5 can run a KafkaCluster directly with KRaft and AK trunk * Trivup 0.12.6 build with a specific commit * rebase commit * Rebased * change * changes * changes * Style fix * PR comments * changes * minor * whitespace --------- Co-authored-by: Emanuele Sabellico <esabellico@confluent.io> Co-authored-by: Anchit Jain <anjain@confluent.io>
* [KIP-848] integration tests passing - Mock handler implementation - Rename current consumer protocol from generic to classic - Mock handler with automatic or manual assignment - More consumer group metadata getters - Test helpers - Expedite next HB after FindCoordinator doing it with an exponential backoff to avoid tight loops - Configurable session timeout and HB interval - Fix mock handler ListOffsets response LeaderEpoch instead of CurrentLeaderEpoch - Integration tests passing with AK trunk - Improve documentation and KIP 848 specific mock tests - Add mock tests for unknown topic id in metadata request and partial reconciliation - Make test 0147 more reliable - Fix test 0106 after HB timeout change - Exclude test case with AK trunk * Trivup 0.12.5 can run a KafkaCluster directly with KRaft and AK trunk * Trivup 0.12.6 build with a specific commit * rebase commit * Rebased * change * changes * changes * Style fix * PR comments * changes * minor * whitespace * pull and push changes --------- Co-authored-by: Emanuele Sabellico <esabellico@confluent.io> Co-authored-by: Anchit Jain <anjain@confluent.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second pass about tests and example
tests/0081-admin.c
Outdated
rd_kafka_consumer_group_type_t consumer_type = | ||
RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same: when test_consumer_group_protocol_classic()
it should search for CONSUMER, othewise CLASSIC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, these are true but since the test is similar for other clients which do not have has_match_types functionality like in librdkafka, so only these were agreed upon. Otherwise, we would diverge a lot !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @emasab. This test will fail when the consumer is created with CLASSIC protocol. Please fix this.
examples/list_consumer_groups.c
Outdated
@@ -296,7 +313,7 @@ int main(int argc, char **argv) { | |||
/* | |||
* Parse common options | |||
*/ | |||
while ((opt = getopt(argc, argv, "b:X:d:")) != -1) { | |||
while ((opt = getopt(argc, argv, "b:X:d")) != -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this change as d
parameter requires an arg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the usage message with state count and group types arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass without tests
src/rdkafka_admin.h
Outdated
@@ -445,6 +451,7 @@ struct rd_kafka_ConsumerGroupListing_s { | |||
/** Is it a simple consumer group? That means empty protocol_type. */ | |||
rd_bool_t is_simple_consumer_group; | |||
rd_kafka_consumer_group_state_t state; /**< Consumer group state. */ | |||
rd_kafka_consumer_group_type_t group_type; /**< Consumer group type. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would name it type
instead of group_type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this is to maintain parity with the convention such as rk_type etc.. Also in other clients such as go type is a keyword.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is type
in java as well. Please change it.
Build is failing |
Do you have the warnings flag on, if so the warnings are not for my changes atleast. The build runs fine for me. |
I am talking about Semaphore build in the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the failing tests in the build as well.
@@ -184,24 +187,30 @@ int64_t parse_int(const char *what, const char *str) { | |||
static void | |||
cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running list_consumer_groups
without any argument is causing Segmentation Fault. It should show usage instead. Fix this.
tests/0081-admin.c
Outdated
while (1) { | ||
rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); | ||
TEST_SAY("ListConsumerGroups: got %s in %.3fms\n", | ||
rd_kafka_event_name(rkev), | ||
TIMING_DURATION(&timing) / 1000.0f); | ||
if (rkev == NULL) | ||
continue; | ||
if (rd_kafka_event_error(rkev)) | ||
TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), | ||
rd_kafka_event_error_string(rkev)); | ||
|
||
if (rd_kafka_event_type(rkev) == | ||
RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) { | ||
break; | ||
} | ||
|
||
rd_kafka_event_destroy(rkev); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract out this part to a separate function. This is repeated below as well.
tests/0081-admin.c
Outdated
if (has_match_types) { | ||
/* With the options of the consumer protocol we should get 0 | ||
* valid cnt */ | ||
rd_kafka_AdminOptions_t *optionwithconsumer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
option_with_consumer
. Same at other places as well.
tests/0081-admin.c
Outdated
quickexit: | ||
rd_kafka_queue_destroy(mainq); | ||
|
||
rd_kafka_destroy(rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not even used. Remove this change.
tests/0081-admin.c
Outdated
@@ -2825,6 +3035,72 @@ static void do_test_ListConsumerGroups(const char *what, | |||
|
|||
rd_kafka_event_destroy(rkev); | |||
|
|||
if (has_match_types) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we testing in this block?
tests/0081-admin.c
Outdated
rd_kafka_consumer_group_type_t consumer_type = | ||
RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @emasab. This test will fail when the consumer is created with CLASSIC protocol. Please fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we need alot of new code to test only the newly added field type. It should have been very simple. @emasab was also suggesting the same thing in another comment.
if broker version doesn't support type, then we should have UNKNOWN type.
else if (test_consumer_group_protocol_consumer()) type should be CONSUMER
else type should be CLASSIC.
Even if we need this much code, there is alot of duplication that can be optimized.
Let's discuss this over a call about these test changes.
Apart from this the test 0081 is crashing in my local as well when running with broker v3.7.0 with classic protocol. |
Librdkafka KIP 848 ListGroups API Running the example : ./list_consumer_groups -b localhost:9092 state_cnt_int type_cnt_int [ state_1_int state_2_int ] [ type_1_int type_2_int ] Running the integration test(do_test_ListConsumerGroups) : TESTS=0081 make Running the unit tests(do_test_ListConsumerGroups) : TESTS=0080 make Any warnings via configure, make or make install are not subject to my changes. Formatting done via, Integration Testing done with :
If any test fail, it is either because any consumer_group did not get destroy and had its information rolled over in the next spawn, or the Apache Kafka Version (trunk) instead of 3.8 or 3.7 ! I can discuss the same in the office hours as well ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments. Checking more.
tests/0081-admin.c
Outdated
|
||
/* match_types would not work if the broker version is below 3.8.0.0 */ | ||
if (has_match_types && match_types) | ||
match_types = rd_true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't reuse the same variable.
tests/0081-admin.c
Outdated
} else { | ||
TEST_SAY( | ||
"'TEST_CONSUMER_GROUP_PROTOCOL' Environment " | ||
"variable not set properly, cane be consumer or " | ||
"classic\n"); | ||
match_types = rd_false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not possible. Remove this.
test_ListConsumerGroups_helper( | ||
rk, option_group_protocol_not_in_use, q, | ||
list_consumer_groups, TEST_LIST_CONSUMER_GROUPS_CNT, 0, | ||
group_protocol_not_in_use, rd_false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use has_match_states
here as well.
src/rdkafka_admin.h
Outdated
@@ -445,6 +451,7 @@ struct rd_kafka_ConsumerGroupListing_s { | |||
/** Is it a simple consumer group? That means empty protocol_type. */ | |||
rd_bool_t is_simple_consumer_group; | |||
rd_kafka_consumer_group_state_t state; /**< Consumer group state. */ | |||
rd_kafka_consumer_group_type_t group_type; /**< Consumer group type. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is type
in java as well. Please change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more minor comments.
src/rdkafka_admin.c
Outdated
"Only a known group type is allowed, UNKNOWN Group " | ||
"Type is not allowed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just say "UNKNOWN group type is not allowed."
All contributors need to sign the Contributor License Agreement here before this PR can be approved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code and example part seems fine. Just a few minor comments.
rd_kafka_consumer_group_type_name(rd_kafka_consumer_group_type_t type); | ||
|
||
/** | ||
* @brief Returns a code for a group type name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should specify that we compare ignoring the case.
aa99639
to
19ab07f
Compare
19ab07f
to
3d70ed0
Compare
53ea4ba
to
50c4130
Compare
Closed with #4860 |
No description provided.