Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848 ListGroups API #4777

Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ blocks:
- sudo dpkg -i rapidjson-dev.deb
- python3 -m pip install -U pip
- ./packaging/tools/build-configurations-checks.sh
- name: 'Build and integration tests'
- name: 'Build and integration tests with "classic" protocol'
commands:
- wget -O rapidjson-dev.deb https://launchpad.net/ubuntu/+archive/primary/+files/rapidjson-dev_1.1.0+dfsg2-3_all.deb
- sudo dpkg -i rapidjson-dev.deb
Expand All @@ -140,7 +140,32 @@ blocks:
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.4.0 --cmd 'make quick')
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.7.0 --cpversion 7.6.0 --cmd 'make quick')
- name: 'Build and integration tests with "consumer" protocol'
commands:
- wget -O rapidjson-dev.deb https://launchpad.net/ubuntu/+archive/primary/+files/rapidjson-dev_1.1.0+dfsg2-3_all.deb
- sudo dpkg -i rapidjson-dev.deb
- python3 -m pip install -U pip
- python3 -m pip -V
- (cd tests && python3 -m pip install -r requirements.txt)
- ./configure --install-deps
# split these up
- ./packaging/tools/rdutcoverage.sh
- make copyright-check
- make -j all examples check
- echo "Verifying that CONFIGURATION.md does not have manual changes"
- git diff --exit-code CONFIGURATION.md
- examples/rdkafka_example -X builtin.features
- ldd src/librdkafka.so.1
- ldd src-cpp/librdkafka++.so.1
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- export TEST_KAFKA_VERSION=3.7.0
- (cd tests && python3 -m trivup.clusters.KafkaCluster --kraft --version 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07'
--cpversion 7.6.0 --conf '["group.coordinator.rebalance.protocols=classic,consumer"]'
--cmd 'TEST_CONSUMER_GROUP_PROTOCOL=consumer make quick')


- name: 'Linux x64: release artifact docker builds'
Expand Down
75 changes: 66 additions & 9 deletions examples/consumer.c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this files changes.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include <signal.h>
#include <string.h>
#include <ctype.h>

#include <stdlib.h>

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
Expand All @@ -54,7 +54,21 @@ static void stop(int sig) {
run = 0;
}

/**
* @brief Parse an integer or fail.
*/
int64_t parse_int(const char *what, const char *str) {
char *end;
unsigned long n = strtoull(str, &end, 0);

if (end != str + strlen(str)) {
fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n",
what, str);
stop(1);
}

return (int64_t)n;
}

/**
* @returns 1 if all bytes are printable, else 0.
Expand All @@ -77,26 +91,40 @@ int main(int argc, char **argv) {
char errstr[512]; /* librdkafka API error reporting buffer */
const char *brokers; /* Argument: broker list */
const char *groupid; /* Argument: Consumer group id */
char **topics; /* Argument: list of topics to subscribe to */
int topic_cnt; /* Number of topics to subscribe to */
rd_kafka_consumer_group_type_t
grouptype; /* Argument: Consumer group type */
const char *grouptypestr; /* The string value of the chosen Consumer
Group Type */
char **topics; /* Argument: list of topics to subscribe to */
int topic_cnt; /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
int i;

/*
* Argument validation
*/
if (argc < 4) {
fprintf(stderr,
"%% Usage: "
"%s <broker> <group.id> <topic1> <topic2>..\n",
argv[0]);
fprintf(
stderr,
"%% Usage: "
"%s <broker> <group.id> <group.type> <topic1> <topic2>..\n",
argv[0]);
return 1;
}

brokers = argv[1];
groupid = argv[2];
topics = &argv[3];
topic_cnt = argc - 3;
grouptype = parse_int("Consumer Group Protocol Type", argv[3]);
if (grouptype < RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN ||
grouptype >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) {
fprintf(
stderr,
"The Consumer Group Type specified should take values from "
"0(UNKNOWN) to RD_KAFKA_CONSUMER_GROUP_TYPE__CNT(3) - 1\n");
return 1;
}
topics = &argv[4];
topic_cnt = argc - 4;


/*
Expand Down Expand Up @@ -127,6 +155,35 @@ int main(int argc, char **argv) {
return 1;
}

/* Set the consumer group type(if broker version supports it).
* The following group type should be set:
* classic
* consumer
* unknown will give the skip the property */
switch (grouptype) {
case RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN:
break;
case RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC:
printf("The Consumer Group Protocol chosen is classic.\n");
grouptypestr = "classic";
break;
case RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER:
printf("The Consumer Group Protocol chosen is consumer.\n");
grouptypestr = "consumer";
break;
default:
break;
}
if (grouptype != RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN) {
if (rd_kafka_conf_set(conf, "group.protocol", grouptypestr,
errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
}

/* If there is no previously committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
* in the partition to start fetching messages.
Expand Down
57 changes: 44 additions & 13 deletions examples/list_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ static void usage(const char *reason, ...) {
fprintf(stderr,
"List groups usage examples\n"
"\n"
"Usage: %s <options> <state1> <state2> ...\n"
"Usage: %s <options> state_cnt group_type_cnt [<state1> "
"<state2>] [<group_type1> <group_type2>] ...\n"
"\n"
"Options:\n"
" -b <brokers> Bootstrap server list to connect to.\n"
Expand Down Expand Up @@ -145,12 +146,15 @@ static int print_groups_info(const rd_kafka_ListConsumerGroups_result_t *list) {
int is_simple_consumer_group =
rd_kafka_ConsumerGroupListing_is_simple_consumer_group(
group);
rd_kafka_consumer_group_type_t group_type =
rd_kafka_ConsumerGroupListing_type(group);

printf("Group \"%s\", is simple %" PRId32
", "
"state %s",
"state %s, type %s",
group_id, is_simple_consumer_group,
rd_kafka_consumer_group_state_name(state));
rd_kafka_consumer_group_state_name(state),
rd_kafka_consumer_group_type_name(group_type));
printf("\n");
}
for (i = 0; i < result_error_cnt; i++) {
Expand All @@ -171,7 +175,7 @@ int64_t parse_int(const char *what, const char *str) {
if (end != str + strlen(str)) {
fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n",
what, str);
exit(1);
usage("Not a valid Integer");
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
}

return (int64_t)n;
Expand All @@ -184,24 +188,37 @@ int64_t parse_int(const char *what, const char *str) {
static void
cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running list_consumer_groups without any argument is causing Segmentation Fault. It should show usage instead. Fix this.

rd_kafka_t *rk;
const char **states_str = NULL;
char errstr[512];
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *event = NULL;
rd_kafka_error_t *error = NULL;
int i;
int retval = 0;
int states_cnt = 0;
int retval = 0;
int states_cnt = 0;
int group_types_cnt = 0;
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_consumer_group_state_t *states;
rd_kafka_consumer_group_type_t *group_types;

states_cnt = parse_int("state count", argv[0]);
group_types_cnt = parse_int("group type count", argv[1]);
if (states_cnt < 0 || group_types_cnt < 0)
usage("Length of attributes/options cannot be negative");
if (states_cnt + group_types_cnt + 2 != argc)
usage(
"Number of arguments do not match states_cnt and "
"types_cnt");

if (argc >= 1) {
states_str = (const char **)&argv[0];
states_cnt = argc;
}
states = calloc(states_cnt, sizeof(rd_kafka_consumer_group_state_t));
for (i = 0; i < states_cnt; i++) {
states[i] = parse_int("state code", states_str[i]);
states[i] = (rd_kafka_consumer_group_state_t)parse_int(
"state code", argv[2 + i]);
}

group_types =
calloc(group_types_cnt, sizeof(rd_kafka_consumer_group_type_t));
for (i = 0; i < group_types_cnt; i++) {
group_types[i] = (rd_kafka_consumer_group_type_t)parse_int(
"group type code", argv[i + states_cnt + 2]);
}

/*
Expand Down Expand Up @@ -235,10 +252,22 @@ cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
options, states, states_cnt))) {
fprintf(stderr, "%% Failed to set states: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
goto exit;
}
free(states);
if ((error = rd_kafka_AdminOptions_set_match_consumer_group_types(
options, group_types, group_types_cnt))) {
fprintf(stderr, "%% Failed to set group types: %s\n",
rd_kafka_error_string(error));
goto exit;
}
free(group_types);

printf(
"The response depends on the specific broker version used, "
"all request attributes may not be used if the broker version"
"does not support them.\n"
"==============================\n");
pranavrth marked this conversation as resolved.
Show resolved Hide resolved

rd_kafka_ListConsumerGroups(rk, options, queue);
rd_kafka_AdminOptions_destroy(options);
Expand Down Expand Up @@ -273,6 +302,8 @@ cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {


exit:
if (error)
rd_kafka_error_destroy(error);
if (event)
rd_kafka_event_destroy(event);
rd_kafka_queue_destroy(queue);
Expand Down
22 changes: 21 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4703,6 +4703,26 @@ rd_kafka_consumer_group_state_code(const char *name) {
return RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN;
}

static const char *rd_kafka_consumer_group_type_names[] = {
"Unknown", "Consumer", "Classic"};

const char *
rd_kafka_consumer_group_type_name(rd_kafka_consumer_group_type_t type) {
if (type < 0 || type >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT)
return NULL;
return rd_kafka_consumer_group_type_names[type];
}

rd_kafka_consumer_group_type_t
rd_kafka_consumer_group_type_code(const char *name) {
size_t i;
for (i = 0; i < RD_KAFKA_CONSUMER_GROUP_TYPE__CNT; i++) {
if (!rd_strcasecmp(rd_kafka_consumer_group_type_names[i], name))
return i;
}
return RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN;
}

static void rd_kafka_DescribeGroups_resp_cb(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
Expand Down Expand Up @@ -4968,7 +4988,7 @@ rd_kafka_list_groups(rd_kafka_t *rk,
state.wait_cnt++;
rkb_cnt++;
error = rd_kafka_ListGroupsRequest(
rkb, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
rkb, 0, NULL, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
rd_kafka_ListGroups_resp_cb, &state);
if (error) {
rd_kafka_ListGroups_resp_cb(rk, rkb,
Expand Down
Loading