diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index dab0b06b8f..00c6b7d050 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -76,6 +76,7 @@ static int partition_cnt = 0; static int eof_cnt = 0; static int with_dr = 1; static int read_hdrs = 0; +static int test_duration = 0; static void stop(int sig) { @@ -84,6 +85,21 @@ static void stop(int sig) { run = 0; } +/** + * @brief Logger callback to capture and display TLS version information + */ +static void +logger_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf) { + /* Display TLS connection info */ + if (strstr(fac, "SSLVERIFY") && strstr(buf, "TLS version:")) { + fprintf(stderr, "\n[INFO] TLS Connection Established: %s\n\n", + buf); + } + + /* Print warnings and errors to stderr */ + fprintf(stderr, "%%%d|%s|%s: %s\n", level, rd_kafka_name(rk), fac, buf); +} + static long int msgs_wait_cnt = 0; static long int msgs_wait_produce_cnt = 0; static rd_ts_t t_end; @@ -594,6 +610,22 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) { } if (otype & _OTYPE_SUMMARY) { + /* Add time remaining for duration-based tests */ + if (test_duration > 0 && cnt.t_start > 0) { + rd_ts_t elapsed_us = now - cnt.t_start; + rd_ts_t duration_us = + (rd_ts_t)test_duration * 1000 * 1000; + if (elapsed_us < duration_us) { + int remaining_secs = + (int)((duration_us - elapsed_us) / + 1000000); + extra_of += rd_snprintf( + extra + extra_of, + sizeof(extra) - extra_of, + ", %ds remaining", remaining_secs); + } + } + printf("%% %" PRIu64 " messages produced " "(%" PRIu64 @@ -607,13 +639,13 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) { "%.02f MB/s, " "%" PRIu64 " produce failures, %i in queue, " - "%s compression\n", + "%s compression%s\n", cnt.msgs, cnt.bytes, cnt.msgs_dr_ok, cnt.last_offset, cnt.msgs_dr_err, t_total / 1000, ((cnt.msgs_dr_ok * 1000000) / t_total), (float)((cnt.bytes_dr_ok) / (float)t_total), cnt.tx_err, rk ? rd_kafka_outq_len(rk) : 0, - compression); + compression, extra); } } else { @@ -894,7 +926,7 @@ int main(int argc, char **argv) { while ((opt = getopt(argc, argv, "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:" - "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) { + "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:E:")) != -1) { switch (opt) { case 'G': if (rd_kafka_conf_set(conf, "group.id", optarg, errstr, @@ -1118,12 +1150,32 @@ int main(int argc, char **argv) { with_dr = 0; break; + case 'E': + test_duration = atoi(optarg); + if (test_duration <= 0) { + fprintf(stderr, + "%% Invalid test duration: %s " + "(must be > 0)\n", + optarg); + exit(1); + } + break; + default: fprintf(stderr, "Unknown option: %c\n", opt); goto usage; } } + /* Validate that both -c and -E are not specified together */ + if (msgcnt != -1 && test_duration > 0) { + fprintf(stderr, + "%% Error: Cannot specify both -c (message count) " + "and -E (test duration)\n" + "%% Use either -c OR -E , not both\n"); + exit(1); + } + if (topics->cnt == 0 || optind != argc) { if (optind < argc) fprintf(stderr, "Unknown argument: %s\n", argv[optind]); @@ -1148,6 +1200,11 @@ int main(int argc, char **argv) { " -H Add header to message (producer)\n" " -H parse Read message headers (consumer)\n" " -c Messages to transmit/receive\n" + " -E Run test for specified duration in seconds " + "(producer)\n" + " Cannot be used with -c. When used, sends " + "unlimited messages\n" + " until time expires.\n" " -x Hard exit after transmitting " "messages (producer)\n" " -D Copy/Duplicate data buffer (producer)\n" @@ -1221,6 +1278,9 @@ int main(int argc, char **argv) { exit(1); } + /* Set logger callback to display TLS version info */ + rd_kafka_conf_set_log_cb(conf, logger_cb); + /* Always enable stats (for RTT extraction), and if user supplied * the -T option we let her take part of the stats aswell. */ rd_kafka_conf_set_stats_cb(conf, stats_cb); @@ -1368,12 +1428,20 @@ int main(int argc, char **argv) { rof += (off_t)xlen; } - if (msgcnt == -1) + /* Handle time-based testing */ + if (test_duration > 0) { + printf( + "%% Sending messages of size %i bytes for %i " + "seconds\n", + msgsize, test_duration); + msgcnt = -1; /* Unlimited messages */ + } else if (msgcnt == -1) { printf("%% Sending messages of size %i bytes\n", msgsize); - else + } else { printf("%% Sending %i messages of size %i bytes\n", msgcnt, msgsize); + } if (with_dr) rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered); @@ -1407,6 +1475,23 @@ int main(int argc, char **argv) { msgs_wait_produce_cnt = msgcnt; while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) { + /* Check if test duration has elapsed */ + if (test_duration > 0) { + rd_ts_t elapsed_us = rd_clock() - cnt.t_start; + rd_ts_t duration_us = + (rd_ts_t)test_duration * 1000 * + 1000; /* seconds to microseconds */ + if (elapsed_us >= duration_us) { + if (verbosity >= 1) + printf( + "%% Test duration of %d " + "seconds reached, " + "stopping...\n", + test_duration); + break; + } + } + /* Send/Produce message. */ if (idle) { @@ -1507,8 +1592,15 @@ int main(int argc, char **argv) { msgs_wait_cnt); /* Wait for messages to be delivered */ - while (run && rd_kafka_poll(rk, 1000) != -1) - print_stats(rk, mode, otype, compression); + if (test_duration > 0) { + /* For time-based tests, flush remaining messages with + * timeout */ + rd_kafka_flush(rk, 5000); /* 5 second flush timeout */ + } else { + /* For count-based tests, wait for all deliveries */ + while (run && rd_kafka_poll(rk, 1000) != -1) + print_stats(rk, mode, otype, compression); + } outq = rd_kafka_outq_len(rk); diff --git a/src/rdkafka_ssl.c b/src/rdkafka_ssl.c index 6747d346e6..9229141cc4 100644 --- a/src/rdkafka_ssl.c +++ b/src/rdkafka_ssl.c @@ -617,6 +617,11 @@ static int rd_kafka_transport_ssl_verify(rd_kafka_transport_t *rktrans) { rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY", "Broker SSL certificate verified"); + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY", + "TLS version: %s, Cipher: %s", + SSL_get_version(rktrans->rktrans_ssl), + SSL_get_cipher(rktrans->rktrans_ssl)); + return 0; }