-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Consumer lag monitoring
Consumer lag is the difference between the last produced message and the last consumed message, it is expressed in the number of messages rather than a time unit.
librdkafka automatically monitors consumer lag for RD_KAFKA_CONSUMER
handles and the
information is available in the statistics JSON object as emitted by stats_cb
.
A statistics callback is set up with rd_kafka_conf_set_stats_cb()
and the callback is called
at the configured interval (statistics.interval.ms
) from an rd_kafka_poll()
call.
The partition high and low message watermarks are pulled from broker at regular intervals (statistics.interval.ms
) and is thus not strictly exact. librdkafka automatically compensates for
this and will never return a negative number for consumer lag.
Example JSON excerpt for topic test
partition 0 with a current lag of 238952 messages:
....
"topics": {
"test": {
"topic": "test",
"metadata_age": 991,
"partitions": {
"0": {
"partition": 0,
"leader": 1,
.....
"fetch_state": "active",
"query_offset": -2,
"next_offset": 342044334, <==== NEXT MESSAGE TO BE FETCHED FROM BROKER
"app_offset": 0,
"stored_offset": 342044333, <==== MOST RECENTLY STORED/HANDLED MESSAGE OFFSET
.....
"lo_offset": 24290101, <==== BROKER'S OLDEST MESSAGE (OFFSET) FOR PARTITION
"hi_offset": 342283286, <==== BROKER'S MOST RECENT MESSAGE FOR PARTITION
"consumer_lag": 238952, <==== CONSUMER LAG
.....
},
....