diff --git a/kafkacat.c b/kafkacat.c index 9412208b..5c5ca7c9 100644 --- a/kafkacat.c +++ b/kafkacat.c @@ -52,6 +52,7 @@ static struct conf { char mode; int flags; #define CONF_F_KEY_DELIM 0x2 +#define CONF_F_OFFSET 0x4 /* Print offsets */ int delim; int key_delim; int msg_size; @@ -76,6 +77,7 @@ static struct conf { .partition = RD_KAFKA_PARTITION_UA, .msg_size = 1024*1024, .delim = '\n', + .key_delim = '\t', }; @@ -195,7 +197,8 @@ static ssize_t produce_file (const char *path) { return -1; } - INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n", path, (intmax_t)st.st_size); + INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n", + path, (intmax_t)st.st_size); produce(ptr, st.st_size, NULL, 0, RD_KAFKA_MSG_F_COPY); munmap(ptr, st.st_size); @@ -363,6 +366,10 @@ static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) { rd_kafka_message_errstr(rkmessage)); } + /* Print offset (using key delim), if desired */ + if (conf.flags & CONF_F_OFFSET) + fprintf(fp, "%"PRId64"%c", rkmessage->offset, conf.key_delim); + /* Print key, if desired */ if (conf.flags & CONF_F_KEY_DELIM) fprintf(fp, "%.*s%c", @@ -669,6 +676,7 @@ static void __attribute__((noreturn)) usage (const char *argv0, int exitcode, " -D Delimiter to separate messages on output\n" " -K Print message keys prefixing the message\n" " with specified delimiter.\n" + " -O Print message offset using -K delimiter\n" " -c Exit after consuming this number " "of messages\n" " -u Unbuffered output\n" @@ -727,7 +735,7 @@ static void argparse (int argc, char **argv) { int opt; while ((opt = getopt(argc, argv, - "PCLt:p:b:z:o:eD:K:d:qvX:c:u")) != -1) { + "PCLt:p:b:z:o:eD:K:Od:qvX:c:u")) != -1) { switch (opt) { case 'P': case 'C': @@ -773,6 +781,9 @@ static void argparse (int argc, char **argv) { conf.key_delim = parse_delim(optarg); conf.flags |= CONF_F_KEY_DELIM; break; + case 'O': + conf.flags |= CONF_F_OFFSET; + break; case 'c': conf.msg_cnt = strtoll(optarg, NULL, 10); break;