-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Consumer offset management
librdkafka currently supports two consumer offset management methods:
- broker based offsets (default, requires broker 0.8.2 or later)
- local file based offsets
Offset management is configured through topic configuration properties and enabled by passing start_offset
as RD_KAFKA_OFFSET_STORED
to rd_kafka_consume_start()
.
The various rdkafka tools, such as rdkafka_example
and kafkacat accepts the -o stored
command-line argument.
- Commit - Offset committed to permanent storage (broker, file). When consumer restarts this is where it will start consuming from. The committed offset should be last_message_offset+1.
- Store - Offsets to be committed are stored in memory until the next call to commit() (without offsets specified) or the next auto commit.
However, the different permanent offset storages (broker, file) are referred to as offsets stores, and
RD_KAFKA_OFFSET_STORED
actually represents the committed offset, not the stored.
-
enable.auto.commit
- If true (default), periodically commit offset of the last message handed to the application. The committed offset will be used when the process restarts to pick up where it left off. -
auto.commit.interval.ms
- The frequency in milliseconds that the consumer offsets are commited (written) to offset storage. -
enable.auto.offset.store
- If true (default) the client will automatically store the offset+1 of the message just prior to passing the message to the application. The offset is stored in memory and will be used by the next call to commit() (without explicit offsets specified) or the next auto commit. If false andenable.auto.commit=true
, the application will manually have to callrd_kafka_offset_store()
to store the offset to auto commit. (optional).
The OffsetCommit API was added to Apache Kafka 0.8.2 and thus require you to run a broker of that version or later. This is the preferred method.
With this method offsets are written to the Kafka cluster through the Kafka protocol. This is not to be confused with Zookeeper based offsets which the official 0.8 Scala Kafka clients use, but the new 0.9 Java client uses broker based offset storage exclusively.
The consumer group id must be configured using the group.id
configuration property.
No additional configuration is required for this method.
Offsets are written to a local file, defaulting to {offset.store.path}/topicname-partition*.offset
.
-
offset.store.method
- Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker). -
offset.store.path
- Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. Defaults to the current directory. -
offset.store.sync.interval.ms
- fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write.