diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8e2f77e0b..ce601e8ec 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -63,7 +63,7 @@ Starting with **librdkafka 2.12.0**, the next generation consumer group rebalanc
big-endian architectures (#5183, @paravoid).
### Consumer fixes
-
+
* Issues: #5199
Fixed an issue where topic partition errors were not cleared after a successful
commit. Previously, a partition could retain a stale error state even though the
@@ -165,7 +165,7 @@ librdkafka v2.11.0 is a feature release:
librdkafka v2.10.1 is a maintenance release:
-* Fix to add locks when updating the metadata cache for the consumer
+* Fix to add locks when updating the metadata cache for the consumer
after no broker connection is available (@marcin-krystianc, #5066).
* Fix to the re-bootstrap case when `bootstrap.servers` is `NULL` and
brokers were added manually through `rd_kafka_brokers_add` (#5067).
@@ -259,7 +259,7 @@ librdkafka v2.10.0 is a feature release:
- Deprecated and disallowed the following properties for the `consumer` group protocol:
- `session.timeout.ms`
- `heartbeat.interval.ms`
- - `group.protocol.type`
+ - `group.protocol.type`
Attempting to set any of these will result in an error.
- Enhanced handling for `subscribe()` and `unsubscribe()` edge cases.
@@ -274,7 +274,7 @@ librdkafka v2.10.0 is a feature release:
removed along with their threads. Brokers and their threads are added back
when they appear in a Metadata RPC response again. When no brokers are left
or they're not reachable, the client will start a re-bootstrap sequence
- by default. `metadata.recovery.strategy` controls this,
+ by default. `metadata.recovery.strategy` controls this,
which defaults to `rebootstrap`.
Setting `metadata.recovery.strategy` to `none` avoids any re-bootstrapping and
leaves only the broker received in last successful metadata response.
@@ -399,7 +399,7 @@ librdkafka v2.10.0 is a feature release:
during a rebalance.
Happening since v1.6.0 (#4908)
* Issues: #4970
- When switching to a different leader a consumer could wait 500ms
+ When switching to a different leader a consumer could wait 500ms
(`fetch.error.backoff.ms`) before starting to fetch again. The fetch backoff wasn't reset when joining the new broker.
Solved by resetting it, given it's not needed to backoff
the first fetch on a different node. This way faster leader switches are
@@ -512,7 +512,7 @@ librdkafka v2.6.1 is a maintenance release:
### General fixes
* SASL/SCRAM authentication fix: avoid concatenating
- client side nonce once more, as it's already prepended in
+ client side nonce once more, as it's already prepended in
server sent nonce.
librdkafka was incorrectly concatenating the client side nonce again, leading to [this fix](https://github.com/apache/kafka/commit/0a004562b8475d48a9961d6dab3a6aa24021c47f) being made on AK side, released with 3.8.1, with `endsWith` instead of `equals`.
Happening since v0.0.99 (#4895).
@@ -527,7 +527,7 @@ librdkafka v2.6.1 is a maintenance release:
A consumer configured with the `cooperative-sticky` partition assignment
strategy could get stuck in an infinite loop, with corresponding spike of
main thread CPU usage.
- That happened with some particular orders of members and potential
+ That happened with some particular orders of members and potential
assignable partitions.
Solved by removing the infinite loop cause.
Happening since: 1.6.0 (#4800).
@@ -575,7 +575,7 @@ librdkafka v2.6.0 is a feature release:
### Consumer fixes
* Issues: #4806
- Fix for permanent fetch errors when brokers support a Fetch RPC version greater than 12
+ Fix for permanent fetch errors when brokers support a Fetch RPC version greater than 12
but cluster is configured to use an inter broker protocol that is less than 2.8.
In this case returned topic ids are zero valued and Fetch has to fall back
to version 12, using topic names.
@@ -603,7 +603,7 @@ Happening since 2.5.0 (#4826).
# librdkafka v2.5.0
> [!WARNING]
-This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
+This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
>
> You won't face any problem if:
> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability).
@@ -611,7 +611,7 @@ This version has introduced a regression in which an assert is triggered during
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client.
->
+>
> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all.
librdkafka v2.5.0 is a feature release.
@@ -622,7 +622,7 @@ librdkafka v2.5.0 is a feature release.
* Fix for an idempotent producer error, with a message batch not reconstructed
identically when retried (#4750)
* Removed support for CentOS 6 and CentOS 7 (#4775).
-* [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) Client
+* [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) Client
metrics and observability (#4721).
## Upgrade considerations
diff --git a/src/rdkafka_ssl.c b/src/rdkafka_ssl.c
index 6747d346e..67b342f46 100644
--- a/src/rdkafka_ssl.c
+++ b/src/rdkafka_ssl.c
@@ -449,6 +449,45 @@ static int rd_kafka_transport_ssl_cert_verify_cb(int preverify_ok,
return 1; /* verification successful */
}
+/**
+ * @brief Normalize hostname for SSL certificate verification.
+ *
+ * Strips trailing dot from hostname as X.509 certificates (per RFC 5280)
+ * don't include them in Subject Alternative Names (SANs).
+ * The trailing dot is used in DNS to indicate an absolute FQDN,
+ * but certificate SANs use a different representation without it.
+ *
+ * @param hostname Input hostname (may have trailing dot)
+ * @param normalized Output buffer for normalized hostname
+ * @param size Size of output buffer
+ *
+ * @returns The normalized hostname (same as \p normalized)
+ *
+ * @remark This function is exposed for testing via ENABLE_DEVEL.
+ */
+#if ENABLE_DEVEL
+RD_EXPORT
+#else
+static
+#endif
+const char *
+rd_kafka_ssl_normalize_hostname(const char *hostname,
+ char *normalized,
+ size_t size) {
+ size_t len;
+
+ rd_snprintf(normalized, size, "%s", hostname);
+ len = strlen(normalized);
+
+ /* Strip trailing dot (unless it's a single dot) */
+ if (len > 1 && normalized[len - 1] == '.') {
+ normalized[len - 1] = '\0';
+ }
+
+ return normalized;
+}
+
+
/**
* @brief Set TLSEXT hostname for SNI and optionally enable
* SSL endpoint identification verification.
@@ -459,6 +498,7 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,
char *errstr,
size_t errstr_size) {
char name[RD_KAFKA_NODENAME_SIZE];
+ char name_for_verify[RD_KAFKA_NODENAME_SIZE];
char *t;
rd_kafka_broker_lock(rktrans->rktrans_rkb);
@@ -470,13 +510,17 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,
if ((t = strrchr(name, ':')))
*t = '\0';
+ /* Normalize hostname (remove trailing dot) for both SNI and certificate verification */
+ rd_kafka_ssl_normalize_hostname(name, name_for_verify,
+ sizeof(name_for_verify));
+
#if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT)
/* If non-numerical hostname, send it for SNI */
- if (!(/*ipv6*/ (strchr(name, ':') &&
- strspn(name, "0123456789abcdefABCDEF:.[]%") ==
- strlen(name)) ||
- /*ipv4*/ strspn(name, "0123456789.") == strlen(name)) &&
- !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name))
+ if (!(/*ipv6*/ (strchr(name_for_verify, ':') &&
+ strspn(name_for_verify, "0123456789abcdefABCDEF:.[]%") ==
+ strlen(name_for_verify)) ||
+ /*ipv4*/ strspn(name_for_verify, "0123456789.") == strlen(name_for_verify)) &&
+ !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name_for_verify))
goto fail;
#endif
@@ -484,18 +528,61 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,
RD_KAFKA_SSL_ENDPOINT_ID_NONE)
return 0;
+ /* Log if we stripped a trailing dot */
+ if (strcmp(name, name_for_verify) != 0) {
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "ENDPOINT",
+ "Stripped trailing dot from hostname for "
+ "certificate verification: %s -> %s",
+ name, name_for_verify);
+ }
+
+ /* Check if connecting to an IP address */
+ rd_bool_t is_ip = rd_false;
+ if (/*ipv6*/ (strchr(name_for_verify, ':') &&
+ strspn(name_for_verify, "0123456789abcdefABCDEF:.[]%") ==
+ strlen(name_for_verify)) ||
+ /*ipv4*/ strspn(name_for_verify, "0123456789.") == strlen(name_for_verify)) {
+ is_ip = rd_true;
+ }
+
#if OPENSSL_VERSION_NUMBER >= 0x10100000 && !defined(OPENSSL_IS_BORINGSSL)
- if (!SSL_set1_host(rktrans->rktrans_ssl, name))
+ if (!SSL_set1_host(rktrans->rktrans_ssl, name_for_verify))
goto fail;
+ /* OpenSSL 1.1.0+ has SSL_set1_host for hostnames
+ * but IP addresses should use the IP-specific function */
+ if (is_ip) {
+#if OPENSSL_VERSION_NUMBER >= 0x10100000
+ /* Use IP-specific function for proper IP matching */
+ X509_VERIFY_PARAM *param = SSL_get0_param(rktrans->rktrans_ssl);
+ if (!X509_VERIFY_PARAM_set1_ip_asc(param, name_for_verify))
+ goto fail;
+#else
+ if (!SSL_set1_host(rktrans->rktrans_ssl, name_for_verify))
+ goto fail;
+#endif
+ } else {
+ if (!SSL_set1_host(rktrans->rktrans_ssl, name_for_verify))
+ goto fail;
+ }
#elif OPENSSL_VERSION_NUMBER >= 0x1000200fL /* 1.0.2 */
{
X509_VERIFY_PARAM *param;
param = SSL_get0_param(rktrans->rktrans_ssl);
- if (!X509_VERIFY_PARAM_set1_host(param, name,
- strnlen(name, sizeof(name))))
+ if (!X509_VERIFY_PARAM_set1_host(param, name_for_verify,
+ strnlen(name_for_verify,
+ sizeof(name_for_verify))))
goto fail;
+ /* Use IP-specific function for IP addresses */
+ if (is_ip) {
+ if (!X509_VERIFY_PARAM_set1_ip_asc(param, name_for_verify))
+ goto fail;
+ } else {
+ if (!X509_VERIFY_PARAM_set1_host(param, name_for_verify,
+ strnlen(name_for_verify, sizeof(name_for_verify))))
+ goto fail;
+ }
}
#else
rd_snprintf(errstr, errstr_size,
@@ -506,7 +593,8 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,
#endif
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "ENDPOINT",
- "Enabled endpoint identification using hostname %s", name);
+ "Enabled endpoint identification using hostname %s",
+ name_for_verify);
return 0;
@@ -608,10 +696,20 @@ static int rd_kafka_transport_ssl_verify(rd_kafka_transport_t *rktrans) {
}
if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) {
+ char subject[256] = "";
+ char issuer[256] = "";
+ if (cert) {
+ X509_NAME_oneline(X509_get_subject_name(cert), subject,
+ sizeof(subject));
+ X509_NAME_oneline(X509_get_issuer_name(cert), issuer,
+ sizeof(issuer));
+ }
rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
RD_KAFKA_RESP_ERR__SSL,
- "Failed to verify broker certificate: %s",
- X509_verify_cert_error_string(rl));
+ "Failed to verify broker certificate: %s "
+ "(subject=%s, issuer=%s, openssl=0x%lx)",
+ X509_verify_cert_error_string(rl),
+ subject, issuer, OPENSSL_VERSION_NUMBER);
return -1;
}
@@ -1215,6 +1313,11 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,
return -1;
}
+ rd_kafka_dbg(rk, SECURITY, "SSL",
+ "Loaded CA certificates from %s: %s",
+ is_dir ? "directory" : "file",
+ rk->rk_conf.ssl.ca_location);
+
ca_probe = rd_false;
}
@@ -1910,6 +2013,12 @@ int rd_kafka_ssl_ctx_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) {
}
/* Set up broker certificate verification. */
+ rd_kafka_dbg(rk, SECURITY, "SSL",
+ "Setting up verification: enable_verify=%d, "
+ "cert_verify_cb=%s, security_level=%d",
+ rk->rk_conf.ssl.enable_verify,
+ rk->rk_conf.ssl.cert_verify_cb ? "set" : "NULL",
+ SSL_CTX_get_security_level(ctx));
SSL_CTX_set_verify(ctx,
rk->rk_conf.ssl.enable_verify ? SSL_VERIFY_PEER
: SSL_VERIFY_NONE,
diff --git a/tests/0154-ssl_hostname_normalize.c b/tests/0154-ssl_hostname_normalize.c
new file mode 100644
index 000000000..f33c0ecdc
--- /dev/null
+++ b/tests/0154-ssl_hostname_normalize.c
@@ -0,0 +1,133 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2025, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Unit test for SSL hostname normalization (trailing dot stripping).
+ *
+ * Tests the fix for issue #4348 where brokers advertising hostnames with
+ * trailing dots (absolute FQDNs) would fail SSL certificate verification
+ * because X.509 certificates don't include trailing dots in SANs.
+ *
+ * This test calls the actual rd_kafka_ssl_normalize_hostname() function
+ * from rdkafka_ssl.c to verify it handles various edge cases correctly.
+ */
+
+#include "test.h"
+
+#if WITH_SSL
+
+/* Function under test (exposed via ENABLE_DEVEL) */
+extern const char *rd_kafka_ssl_normalize_hostname(const char *hostname,
+ char *normalized,
+ size_t size);
+
+
+/**
+ * @brief Test hostname normalization with edge cases.
+ *
+ * Tests rd_kafka_ssl_normalize_hostname() from rdkafka_ssl.c with various
+ * inputs to verify the trailing dot stripping logic works correctly.
+ *
+ * This test verifies:
+ * - Hostname with trailing dot should have dot removed
+ * - Hostname without trailing dot should remain unchanged
+ * - Single dot should remain unchanged (edge case: len > 1 check)
+ * - Empty string should remain unchanged (edge case)
+ */
+static void test_hostname_normalize(void) {
+ /* Test data: input hostname and expected output after normalization */
+ struct {
+ const char *input;
+ const char *expected;
+ const char *description;
+ } test_cases[] = {
+ {"broker.example.com.", "broker.example.com",
+ "FQDN with trailing dot"},
+ {"broker.example.com", "broker.example.com",
+ "FQDN without trailing dot"},
+ {"localhost.", "localhost", "localhost with trailing dot"},
+ {"localhost", "localhost", "localhost without trailing dot"},
+ {".", ".", "single dot (edge case - should remain unchanged)"},
+ {"", "", "empty string (edge case)"},
+ {"broker-1.example.com.", "broker-1.example.com",
+ "hostname with dash and trailing dot"},
+ {"192.168.1.1", "192.168.1.1", "IP address (no trailing dot)"},
+ {NULL, NULL, NULL}};
+
+ int i;
+
+ SUB_TEST_QUICK();
+
+ TEST_SAY("Testing hostname normalization edge cases\n");
+
+ for (i = 0; test_cases[i].input != NULL; i++) {
+ char normalized[256];
+ const char *input = test_cases[i].input;
+ const char *expected = test_cases[i].expected;
+ const char *desc = test_cases[i].description;
+ const char *result;
+
+ /* Call the actual function under test */
+ result = rd_kafka_ssl_normalize_hostname(input, normalized,
+ sizeof(normalized));
+
+ TEST_SAYL(3, "Test case %d: %s\n", i + 1, desc);
+ TEST_SAYL(3, " Input: \"%s\"\n", input);
+ TEST_SAYL(3, " Expected: \"%s\"\n", expected);
+ TEST_SAYL(3, " Got: \"%s\"\n", result);
+
+ TEST_ASSERT(result == normalized,
+ "Function should return the normalized buffer");
+
+ TEST_ASSERT(!strcmp(result, expected),
+ "Hostname normalization failed for %s: "
+ "expected \"%s\" but got \"%s\"",
+ desc, expected, result);
+
+ TEST_SAY("✓ Test case %d passed: %s\n", i + 1, desc);
+ }
+
+ TEST_SAY("All %d hostname normalization edge cases passed\n", i);
+
+ SUB_TEST_PASS();
+}
+
+#endif /* WITH_SSL */
+
+
+int main_0154_ssl_hostname_normalize(int argc, char **argv) {
+#if WITH_SSL
+ test_hostname_normalize();
+ return 0;
+#else
+ TEST_SKIP("Test requires SSL support\n");
+ return 0;
+#endif
+}
diff --git a/tests/test.c b/tests/test.c
index 42e525a9c..6b4982a41 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -272,6 +272,7 @@ _TEST_DECL(0150_telemetry_mock);
_TEST_DECL(0151_purge_brokers_mock);
_TEST_DECL(0152_rebootstrap_local);
_TEST_DECL(0153_memberid);
+_TEST_DECL(0154_ssl_hostname_normalize);
/* Manual tests */
_TEST_DECL(8000_idle);
@@ -540,6 +541,7 @@ struct test tests[] = {
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
_TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)),
+ _TEST(0154_ssl_hostname_normalize, TEST_F_LOCAL),
/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),
diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj
index 812a2674d..17aba21bc 100644
--- a/win32/tests/tests.vcxproj
+++ b/win32/tests/tests.vcxproj
@@ -234,6 +234,7 @@
+