Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848 ListGroups API #4777

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,28 @@ blocks:
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
jobs:
- name: 'Build configuration checks'
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
- name: 'Build and integration tests with "classic" protocol'
commands:
- wget -O rapidjson-dev.deb https://launchpad.net/ubuntu/+archive/primary/+files/rapidjson-dev_1.1.0+dfsg2-3_all.deb
- sudo dpkg -i rapidjson-dev.deb
- python3 -m pip install -U pip
- ./packaging/tools/build-configurations-checks.sh
- name: 'Build and integration tests'
- python3 -m pip -V
- (cd tests && python3 -m pip install -r requirements.txt)
- ./configure --install-deps
# split these up
- ./packaging/tools/rdutcoverage.sh
- make copyright-check
- make -j all examples check
- echo "Verifying that CONFIGURATION.md does not have manual changes"
- git diff --exit-code CONFIGURATION.md
- examples/rdkafka_example -X builtin.features
- ldd src/librdkafka.so.1
- ldd src-cpp/librdkafka++.so.1
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.7.0 --cpversion 7.6.0 --cmd 'make quick')
- name: 'Build and integration tests with "consumer" protocol'
commands:
- wget -O rapidjson-dev.deb https://launchpad.net/ubuntu/+archive/primary/+files/rapidjson-dev_1.1.0+dfsg2-3_all.deb
- sudo dpkg -i rapidjson-dev.deb
Expand All @@ -137,10 +152,14 @@ blocks:
- examples/rdkafka_example -X builtin.features
- ldd src/librdkafka.so.1
- ldd src-cpp/librdkafka++.so.1
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.4.0 --cmd 'make quick')
- export TEST_KAFKA_VERSION=3.7.0
- (cd tests && python3 -m trivup.clusters.KafkaCluster --kraft --version 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07'
--cpversion 7.6.0 --conf '["group.coordinator.rebalance.protocols=classic,consumer"]'
--cmd 'TEST_CONSUMER_GROUP_PROTOCOL=consumer make quick')


- name: 'Linux x64: release artifact docker builds'
Expand Down
1 change: 0 additions & 1 deletion examples/alter_consumer_group_offsets.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ cmd_alter_consumer_group_offsets(rd_kafka_conf_t *conf, int argc, char **argv) {
fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
exit(1);
}

mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
/* Read passed partition-offsets */
rd_kafka_topic_partition_list_t *partitions =
rd_kafka_topic_partition_list_new(num_partitions);
Expand Down
7 changes: 7 additions & 0 deletions examples/consumer.c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this files changes.

Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ int main(int argc, char **argv) {
return 1;
}

// if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr,
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
// sizeof(errstr)) != RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// rd_kafka_conf_destroy(conf);
// return 1;
// }

/*
* Create consumer instance.
*
Expand Down
41 changes: 29 additions & 12 deletions examples/list_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@ static int print_groups_info(const rd_kafka_ListConsumerGroups_result_t *list) {
int is_simple_consumer_group =
rd_kafka_ConsumerGroupListing_is_simple_consumer_group(
group);
rd_kafka_consumer_group_type_t group_type =
rd_kafka_ConsumerGroupListing_type(group);

printf("Group \"%s\", is simple %" PRId32
", "
"state %s",
"state %s, type %s",
group_id, is_simple_consumer_group,
rd_kafka_consumer_group_state_name(state));
rd_kafka_consumer_group_state_name(state),
rd_kafka_consumer_group_type_name(group_type));
printf("\n");
}
for (i = 0; i < result_error_cnt; i++) {
Expand Down Expand Up @@ -184,24 +187,29 @@ int64_t parse_int(const char *what, const char *str) {
static void
cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running list_consumer_groups without any argument is causing Segmentation Fault. It should show usage instead. Fix this.

rd_kafka_t *rk;
const char **states_str = NULL;
char errstr[512];
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *event = NULL;
rd_kafka_error_t *error = NULL;
int i;
int retval = 0;
int states_cnt = 0;
int retval = 0;
int states_cnt = 0;
int group_types_cnt = 0;
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_consumer_group_state_t *states;
rd_kafka_consumer_group_type_t *group_types;


if (argc >= 1) {
states_str = (const char **)&argv[0];
states_cnt = argc;
}
states_cnt = parse_int("state count", argv[0]);
states = calloc(states_cnt, sizeof(rd_kafka_consumer_group_state_t));
for (i = 0; i < states_cnt; i++) {
states[i] = parse_int("state code", states_str[i]);
states[i] = parse_int("state code", argv[i + 1]);
}
group_types_cnt = parse_int("group type count", argv[states_cnt + 1]);
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved

group_types =
calloc(group_types_cnt, sizeof(rd_kafka_consumer_group_type_t));
for (i = 0; i < group_types_cnt; i++) {
group_types[i] =
parse_int("group type code", argv[i + states_cnt + 2]);
}

/*
Expand Down Expand Up @@ -239,6 +247,15 @@ cmd_list_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
goto exit;
}
free(states);
if ((error = rd_kafka_AdminOptions_set_match_consumer_group_types(
options, group_types, group_types_cnt))) {
fprintf(stderr, "%% Failed to set group types: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
goto exit;
}
free(group_types);


rd_kafka_ListConsumerGroups(rk, options, queue);
rd_kafka_AdminOptions_destroy(options);
Expand Down Expand Up @@ -296,7 +313,7 @@ int main(int argc, char **argv) {
/*
* Parse common options
*/
while ((opt = getopt(argc, argv, "b:X:d:")) != -1) {
while ((opt = getopt(argc, argv, "b:X:d")) != -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this change as d parameter requires an arg

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the usage message with state count and group types arguments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.

switch (opt) {
case 'b':
conf_set(conf, "bootstrap.servers", optarg);
Expand Down
38 changes: 36 additions & 2 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4703,6 +4703,26 @@ rd_kafka_consumer_group_state_code(const char *name) {
return RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN;
}

static const char *rd_kafka_consumer_group_type_names[] = {
"Unknown", "Consumer", "Classic"};

const char *
rd_kafka_consumer_group_type_name(rd_kafka_consumer_group_type_t type) {
if (type < 0 || type >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT)
return NULL;
return rd_kafka_consumer_group_type_names[type];
}

rd_kafka_consumer_group_type_t
rd_kafka_consumer_group_type_code(const char *name) {
size_t i;
for (i = 0; i < RD_KAFKA_CONSUMER_GROUP_TYPE__CNT; i++) {
if (!rd_strcasecmp(rd_kafka_consumer_group_type_names[i], name))
return i;
}
return RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN;
}

static void rd_kafka_DescribeGroups_resp_cb(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
Expand Down Expand Up @@ -4831,8 +4851,10 @@ static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,
struct list_groups_state *state;
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode;
int32_t ThrottleTimeMs;
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
char **grps = NULL;
int cnt, grpcnt, i = 0;
int16_t ApiVersion = request->rkbuf_reqhdr.ApiVersion;

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
/* 'state' is no longer in scope because
Expand All @@ -4848,6 +4870,10 @@ static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,
if (err)
goto err;

if (ApiVersion >= 1) {
rd_kafka_buf_read_i32(reply, &ThrottleTimeMs);
}

rd_kafka_buf_read_i16(reply, &ErrorCode);
if (ErrorCode) {
err = ErrorCode;
Expand All @@ -4867,11 +4893,19 @@ static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,
grps = rd_malloc(sizeof(*grps) * grpcnt);

while (cnt-- > 0) {
rd_kafkap_str_t grp, proto;
rd_kafkap_str_t grp, proto, grp_state, grp_type;

rd_kafka_buf_read_str(reply, &grp);
rd_kafka_buf_read_str(reply, &proto);

if (ApiVersion >= 4) {
rd_kafka_buf_read_str(reply, &grp_state);
}

if (ApiVersion >= 5) {
rd_kafka_buf_read_str(reply, &grp_type);
}

if (state->desired_group &&
rd_kafkap_str_cmp_str(&grp, state->desired_group))
continue;
Expand Down Expand Up @@ -4968,7 +5002,7 @@ rd_kafka_list_groups(rd_kafka_t *rk,
state.wait_cnt++;
rkb_cnt++;
error = rd_kafka_ListGroupsRequest(
rkb, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
rkb, 0, NULL, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
rd_kafka_ListGroups_resp_cb, &state);
if (error) {
rd_kafka_ListGroups_resp_cb(rk, rkb,
Expand Down
100 changes: 100 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -4590,6 +4590,32 @@ rd_kafka_consumer_group_metadata_new_with_genid(const char *group_id,
const char *member_id,
const char *group_instance_id);

/**
* @brief Get group id of a group metadata.
*
* @param group_metadata The group metadata
*
* @returns The group id contained in the passed \p group_metadata.
*
* @remark The returned pointer has the same lifetime as \p group_metadata.
*/
RD_EXPORT
const char *rd_kafka_consumer_group_metadata_group_id(
const rd_kafka_consumer_group_metadata_t *group_metadata);

/**
* @brief Get group instance id of a group metadata.
*
* @param group_metadata The group metadata
*
* @returns The group instance id contained in the passed \p group_metadata.
* or NULL.
*
* @remark The returned pointer has the same lifetime as \p group_metadata.
*/
RD_EXPORT
const char *rd_kafka_consumer_group_metadata_group_instance_id(
const rd_kafka_consumer_group_metadata_t *group_metadata);

/**
* @brief Get member id of a group metadata.
Expand All @@ -4604,6 +4630,17 @@ RD_EXPORT
const char *rd_kafka_consumer_group_metadata_member_id(
const rd_kafka_consumer_group_metadata_t *group_metadata);

/**
* @brief Get member epoch of a group metadata.
* Corresponds to the generation id in consumer protocol classic;
*
* @param group_metadata The group metadata
*
* @returns The member epoch id contained in the passed \p group_metadata.
*/
RD_EXPORT
int32_t rd_kafka_consumer_group_metadata_member_epoch(
const rd_kafka_consumer_group_metadata_t *group_metadata);

/**
* @brief Frees the consumer group metadata object as returned by
Expand Down Expand Up @@ -5158,6 +5195,18 @@ typedef enum {
RD_KAFKA_CONSUMER_GROUP_STATE__CNT
} rd_kafka_consumer_group_state_t;

/**
* @enum rd_kafka_consumer_group_type_t
*
* @brief Consumer group type.
*/
typedef enum {
RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN = 0,
RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER = 1,
RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC = 2,
RD_KAFKA_CONSUMER_GROUP_TYPE__CNT
} rd_kafka_consumer_group_type_t;

/**
* @brief Group information
*/
Expand Down Expand Up @@ -5242,6 +5291,28 @@ RD_EXPORT
rd_kafka_consumer_group_state_t
rd_kafka_consumer_group_state_code(const char *name);

/**
* @brief Returns a name for a group type code.
*
* @param type The group type value.
*
* @return The group type name corresponding to the provided group type value.
*/
RD_EXPORT
const char *
rd_kafka_consumer_group_type_name(rd_kafka_consumer_group_type_t type);

/**
* @brief Returns a code for a group type name.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should specify that we compare ignoring the case.

*
* @param name The group type name.
*
* @return The group type value corresponding to the provided group type name.
*/
RD_EXPORT
rd_kafka_consumer_group_type_t
rd_kafka_consumer_group_type_code(const char *name);

/**
* @brief Release list memory
*/
Expand Down Expand Up @@ -7208,6 +7279,24 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_states(
const rd_kafka_consumer_group_state_t *consumer_group_states,
size_t consumer_group_states_cnt);

/**
* @brief Set consumer groups types to query for.
*
* @param options Admin options.
* @param consumer_group_types Array of consumer group types.
* @param consumer_group_types_cnt Size of the \p consumer_group_types array.
*
* @return NULL on success, a new error instance that must be
* released with rd_kafka_error_destroy() in case of error.
*
* @remark This option is valid for ListConsumerGroups.
*/
RD_EXPORT
rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_types(
rd_kafka_AdminOptions_t *options,
const rd_kafka_consumer_group_type_t *consumer_group_types,
size_t consumer_group_types_cnt);

/**
* @brief Set Isolation Level to an allowed `rd_kafka_IsolationLevel_t` value.
*/
Expand Down Expand Up @@ -8532,6 +8621,17 @@ RD_EXPORT
rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupListing_state(
const rd_kafka_ConsumerGroupListing_t *grplist);

/**
* @brief Gets group_type for the \p grplist group.
*
* @param grplist The group listing.
*
* @return A group type.
*/
RD_EXPORT
rd_kafka_consumer_group_type_t rd_kafka_ConsumerGroupListing_type(
const rd_kafka_ConsumerGroupListing_t *grplist);
pranavrth marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Get an array of valid list groups from a ListConsumerGroups result.
*
Expand Down
Loading