From 50e6867641c80b887638fbd7c0a0c5b93bfc62c6 Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Thu, 8 Aug 2024 13:51:34 +0530 Subject: [PATCH 1/2] Fallback to fetch v12 (#10) If any toppars have a zero topic id --- src/rdkafka_fetcher.c | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_fetcher.c b/src/rdkafka_fetcher.c index 98f5e72f92..b7e0b17fac 100644 --- a/src/rdkafka_fetcher.c +++ b/src/rdkafka_fetcher.c @@ -943,7 +943,21 @@ static void rd_kafka_broker_fetch_reply(rd_kafka_t *rk, } } +/** + * @brief Check if any toppars have a non-zero topic id. + * + */ +rd_bool_t can_use_topic_id(rd_kafka_broker_t *rkb) { + rd_kafka_toppar_t *rktp = rkb->rkb_active_toppar_next; + do { + if (RD_KAFKA_UUID_IS_ZERO(rktp->rktp_rkt->rkt_topic_id)) + return rd_false; + } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp, + rktp_activelink)) != + rkb->rkb_active_toppar_next); + return rd_true; +} /** * @brief Build and send a Fetch request message for all underflowed toppars @@ -979,7 +993,12 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) { ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_Fetch, 0, 16, NULL); - rkbuf = rd_kafka_buf_new_flexver_request( + + /* Fallback to version 12 if topic id is null which can happen if + * inter.broker.protocol.version is old */ + ApiVersion = ApiVersion > 12 && can_use_topic_id(rkb) ? ApiVersion : 12; + + rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_Fetch, 1, /* MaxWaitTime+MinBytes+MaxBytes+IsolationLevel+ * SessionId+Epoch+TopicCnt */ From d7a7d788db2fa938a1eacafaa47ccf5dbbdf48c9 Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Tue, 24 Sep 2024 18:49:33 +0530 Subject: [PATCH 2/2] PR Feedback (#41) --- src/rdkafka_fetcher.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rdkafka_fetcher.c b/src/rdkafka_fetcher.c index b7e0b17fac..835271a4b0 100644 --- a/src/rdkafka_fetcher.c +++ b/src/rdkafka_fetcher.c @@ -944,10 +944,10 @@ static void rd_kafka_broker_fetch_reply(rd_kafka_t *rk, } /** - * @brief Check if any toppars have a non-zero topic id. + * @brief Check if any toppars have a zero topic id. * */ -rd_bool_t can_use_topic_id(rd_kafka_broker_t *rkb) { +static rd_bool_t can_use_topic_ids(rd_kafka_broker_t *rkb) { rd_kafka_toppar_t *rktp = rkb->rkb_active_toppar_next; do { if (RD_KAFKA_UUID_IS_ZERO(rktp->rktp_rkt->rkt_topic_id)) @@ -995,8 +995,9 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) { 0, 16, NULL); /* Fallback to version 12 if topic id is null which can happen if - * inter.broker.protocol.version is old */ - ApiVersion = ApiVersion > 12 && can_use_topic_id(rkb) ? ApiVersion : 12; + * inter.broker.protocol.version is < 2.8 */ + ApiVersion = + ApiVersion > 12 && can_use_topic_ids(rkb) ? ApiVersion : 12; rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_Fetch, 1,