Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 99 additions & 7 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static int partition_cnt = 0;
static int eof_cnt = 0;
static int with_dr = 1;
static int read_hdrs = 0;
static int test_duration = 0;


static void stop(int sig) {
Expand All @@ -84,6 +85,21 @@ static void stop(int sig) {
run = 0;
}

/**
* @brief Logger callback to capture and display TLS version information
*/
static void
logger_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
/* Display TLS connection info */
if (strstr(fac, "SSLVERIFY") && strstr(buf, "TLS version:")) {
fprintf(stderr, "\n[INFO] TLS Connection Established: %s\n\n",
buf);
}

/* Print warnings and errors to stderr */
fprintf(stderr, "%%%d|%s|%s: %s\n", level, rd_kafka_name(rk), fac, buf);
Comment on lines +99 to +100
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger callback always prints all log messages to stderr, which will produce excessive output. This should be conditional based on log level or verbosity settings to avoid flooding the console with debug messages. Consider adding a level check (e.g., if (level <= LOG_WARNING)) or respecting the global verbosity setting.

Suggested change
/* Print warnings and errors to stderr */
fprintf(stderr, "%%%d|%s|%s: %s\n", level, rd_kafka_name(rk), fac, buf);
/* Only print warnings and errors by default, or more if verbosity is high */
/* Map verbosity: 1 = warnings/errors, 2 = info, 3 = debug, 4+ = all */
int min_level;
if (verbosity >= 4)
min_level = 7; /* RD_KAFKA_LOG_PRINT, print all */
else if (verbosity == 3)
min_level = 6; /* RD_KAFKA_LOG_DEBUG */
else if (verbosity == 2)
min_level = 5; /* RD_KAFKA_LOG_INFO */
else
min_level = 4; /* RD_KAFKA_LOG_WARNING */
if (level <= min_level)
fprintf(stderr, "%%%d|%s|%s: %s\n", level, rd_kafka_name(rk), fac, buf);

Copilot uses AI. Check for mistakes.
}

static long int msgs_wait_cnt = 0;
static long int msgs_wait_produce_cnt = 0;
static rd_ts_t t_end;
Expand Down Expand Up @@ -594,6 +610,22 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) {
}

if (otype & _OTYPE_SUMMARY) {
/* Add time remaining for duration-based tests */
if (test_duration > 0 && cnt.t_start > 0) {
rd_ts_t elapsed_us = now - cnt.t_start;
rd_ts_t duration_us =
(rd_ts_t)test_duration * 1000 * 1000;
if (elapsed_us < duration_us) {
int remaining_secs =
(int)((duration_us - elapsed_us) /
1000000);
extra_of += rd_snprintf(
extra + extra_of,
sizeof(extra) - extra_of,
", %ds remaining", remaining_secs);
}
}

printf("%% %" PRIu64
" messages produced "
"(%" PRIu64
Expand All @@ -607,13 +639,13 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) {
"%.02f MB/s, "
"%" PRIu64
" produce failures, %i in queue, "
"%s compression\n",
"%s compression%s\n",
cnt.msgs, cnt.bytes, cnt.msgs_dr_ok,
cnt.last_offset, cnt.msgs_dr_err, t_total / 1000,
((cnt.msgs_dr_ok * 1000000) / t_total),
(float)((cnt.bytes_dr_ok) / (float)t_total),
cnt.tx_err, rk ? rd_kafka_outq_len(rk) : 0,
compression);
compression, extra);
}

} else {
Expand Down Expand Up @@ -894,7 +926,7 @@ int main(int argc, char **argv) {

while ((opt = getopt(argc, argv,
"PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) {
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:E:")) != -1) {
switch (opt) {
case 'G':
if (rd_kafka_conf_set(conf, "group.id", optarg, errstr,
Expand Down Expand Up @@ -1118,12 +1150,32 @@ int main(int argc, char **argv) {
with_dr = 0;
break;

case 'E':
test_duration = atoi(optarg);
if (test_duration <= 0) {
fprintf(stderr,
"%% Invalid test duration: %s "
"(must be > 0)\n",
optarg);
exit(1);
}
break;
Comment on lines +1153 to +1162
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using atoi() for parsing user input doesn't detect invalid input. If the user provides non-numeric input, atoi() returns 0, which will pass the <= 0 check but gives a misleading error message. Use strtol() with error checking to properly validate numeric input and provide accurate error messages for malformed input.

Suggested change
case 'E':
test_duration = atoi(optarg);
if (test_duration <= 0) {
fprintf(stderr,
"%% Invalid test duration: %s "
"(must be > 0)\n",
optarg);
exit(1);
}
break;
case 'E': {
char *endptr = NULL;
long val;
errno = 0;
val = strtol(optarg, &endptr, 10);
if (errno == ERANGE || val > INT_MAX || val <= 0 || endptr == optarg || *endptr != '\0') {
fprintf(stderr,
"%% Invalid test duration: %s "
"(must be a positive integer)\n",
optarg);
exit(1);
}
test_duration = (int)val;
break;
}

Copilot uses AI. Check for mistakes.

default:
fprintf(stderr, "Unknown option: %c\n", opt);
goto usage;
}
}

/* Validate that both -c and -E are not specified together */
if (msgcnt != -1 && test_duration > 0) {
fprintf(stderr,
"%% Error: Cannot specify both -c (message count) "
"and -E (test duration)\n"
"%% Use either -c <count> OR -E <seconds>, not both\n");
exit(1);
}

if (topics->cnt == 0 || optind != argc) {
if (optind < argc)
fprintf(stderr, "Unknown argument: %s\n", argv[optind]);
Expand All @@ -1148,6 +1200,11 @@ int main(int argc, char **argv) {
" -H <name[=value]> Add header to message (producer)\n"
" -H parse Read message headers (consumer)\n"
" -c <cnt> Messages to transmit/receive\n"
" -E <seconds> Run test for specified duration in seconds "
"(producer)\n"
" Cannot be used with -c. When used, sends "
"unlimited messages\n"
" until time expires.\n"
" -x <cnt> Hard exit after transmitting <cnt> "
"messages (producer)\n"
" -D Copy/Duplicate data buffer (producer)\n"
Expand Down Expand Up @@ -1221,6 +1278,9 @@ int main(int argc, char **argv) {
exit(1);
}

/* Set logger callback to display TLS version info */
rd_kafka_conf_set_log_cb(conf, logger_cb);

/* Always enable stats (for RTT extraction), and if user supplied
* the -T <intvl> option we let her take part of the stats aswell. */
rd_kafka_conf_set_stats_cb(conf, stats_cb);
Expand Down Expand Up @@ -1368,12 +1428,20 @@ int main(int argc, char **argv) {
rof += (off_t)xlen;
}

if (msgcnt == -1)
/* Handle time-based testing */
if (test_duration > 0) {
printf(
"%% Sending messages of size %i bytes for %i "
"seconds\n",
msgsize, test_duration);
msgcnt = -1; /* Unlimited messages */
} else if (msgcnt == -1) {
printf("%% Sending messages of size %i bytes\n",
msgsize);
else
} else {
printf("%% Sending %i messages of size %i bytes\n",
msgcnt, msgsize);
}

if (with_dr)
rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);
Expand Down Expand Up @@ -1407,6 +1475,23 @@ int main(int argc, char **argv) {
msgs_wait_produce_cnt = msgcnt;

while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) {
/* Check if test duration has elapsed */
if (test_duration > 0) {
rd_ts_t elapsed_us = rd_clock() - cnt.t_start;
rd_ts_t duration_us =
(rd_ts_t)test_duration * 1000 *
1000; /* seconds to microseconds */
if (elapsed_us >= duration_us) {
if (verbosity >= 1)
printf(
"%% Test duration of %d "
"seconds reached, "
"stopping...\n",
test_duration);
break;
}
}

/* Send/Produce message. */

if (idle) {
Expand Down Expand Up @@ -1507,8 +1592,15 @@ int main(int argc, char **argv) {
msgs_wait_cnt);

/* Wait for messages to be delivered */
while (run && rd_kafka_poll(rk, 1000) != -1)
print_stats(rk, mode, otype, compression);
if (test_duration > 0) {
/* For time-based tests, flush remaining messages with
* timeout */
rd_kafka_flush(rk, 5000); /* 5 second flush timeout */
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The 5-second flush timeout is hardcoded with no explanation of why this specific duration was chosen. Consider documenting the rationale for this timeout value or making it configurable, especially since duration-based tests might involve different message volumes that could require different flush timeouts.

Copilot uses AI. Check for mistakes.
} else {
/* For count-based tests, wait for all deliveries */
while (run && rd_kafka_poll(rk, 1000) != -1)
print_stats(rk, mode, otype, compression);
}


outq = rd_kafka_outq_len(rk);
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,11 @@ static int rd_kafka_transport_ssl_verify(rd_kafka_transport_t *rktrans) {

rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY",
"Broker SSL certificate verified");
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY",
"TLS version: %s, Cipher: %s",
SSL_get_version(rktrans->rktrans_ssl),
SSL_get_cipher(rktrans->rktrans_ssl));

return 0;
}

Expand Down