Skip to content

Manually setting the consumer start offset

hrchu edited this page Apr 4, 2019 · 3 revisions

The high-level Kafka consumer (KafkaConsumer in C++) will start consuming at the last committed offset by default, if there is no previously committed offset for the topic+partition and group it will fall back on the topic configuration property auto.offset.reset which defaults to latest, thus starting to consume at the end of the partition (only new messages will be consumed).

To manually set the starting offset for a partition the assign() API allows you to specify the start offset for each partition by setting the .offset field in the topic_partition_t element to either an absolute offset (>=0) or one of the logical offsets (BEGINNING, END, STORED, TAIL(..)).

See following examples showing how to start consuming topic "mytopic" partition 3 at offset 1234.

C example

From a rebalance_cb:

void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
                      rd_kafka_topic_partition_list_t *partitions, void *opaque) {
   if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
       rd_kafka_topic_partition_t *part;
       if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3)))
           part->offset = 1234;
       rd_kafka_assign(rk, partitions);
   }  else {
       rd_kafka_assign(rk, NULL);
   }
}

Direct assign() with no subscription:

rd_kafka_topic_partition_list_t *partitions;
partitions = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234;
rd_kafka_assign(rk, partitions);
rd_kafka_topic_partition_list_destroy(partitions);  

C++ example

From a rebalance_cb:

class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer,
		     RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
      RdKafka::TopicPartition *part;
      // find the partition, through std::find() or other means
      ...
      if (part)
         part->set_offset(1234);
      consumer->assign(partitions);
    } else {
      consumer->unassign();
    }
  }
};

Direct assign() with no subscription:

std::vector<RdKafka::TopicPartition*> partitions;
partitions.push_back(RdKafka::TopicPartition::create("mytopic", 3, 1234));
consumer->assign(partitions);

python example

From https://github.com/confluentinc/confluent-kafka-python/issues/373#issuecomment-389095624:

def my_on_assign(consumer, partitions):
    for p in partitions:
         # some starting offset, or use OFFSET_BEGINNING, et, al.
         # the default offset is STORED which means use committed offsets, and if
         # no committed offsets are available use auto.offset.reset config (default latest)
        p.offset = 1234
    # call assign() to start fetching the given partitions.
    consumer.assign(partitions)

consumer.subscribe(mytopics, on_assign=my_on_assign)