diff --git a/src/rdkafka_fetcher.c b/src/rdkafka_fetcher.c index 98f5e72f92..835271a4b0 100644 --- a/src/rdkafka_fetcher.c +++ b/src/rdkafka_fetcher.c @@ -943,7 +943,21 @@ static void rd_kafka_broker_fetch_reply(rd_kafka_t *rk, } } +/** + * @brief Check if any toppars have a zero topic id. + * + */ +static rd_bool_t can_use_topic_ids(rd_kafka_broker_t *rkb) { + rd_kafka_toppar_t *rktp = rkb->rkb_active_toppar_next; + do { + if (RD_KAFKA_UUID_IS_ZERO(rktp->rktp_rkt->rkt_topic_id)) + return rd_false; + } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp, + rktp_activelink)) != + rkb->rkb_active_toppar_next); + return rd_true; +} /** * @brief Build and send a Fetch request message for all underflowed toppars @@ -979,7 +993,13 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) { ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_Fetch, 0, 16, NULL); - rkbuf = rd_kafka_buf_new_flexver_request( + + /* Fallback to version 12 if topic id is null which can happen if + * inter.broker.protocol.version is < 2.8 */ + ApiVersion = + ApiVersion > 12 && can_use_topic_ids(rkb) ? ApiVersion : 12; + + rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_Fetch, 1, /* MaxWaitTime+MinBytes+MaxBytes+IsolationLevel+ * SessionId+Epoch+TopicCnt */