Skip to content

Commit

Permalink
Print out correct offset
Browse files Browse the repository at this point in the history
  • Loading branch information
usmanm committed Mar 29, 2016
1 parent a6c610d commit f9d6e5d
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions pipeline_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ typedef struct KafkaConsumerProc
{
Oid id;
Oid consumer_id;
int64 offset;
int64 start_offset;
int partition_group;
NameData dbname;
BackgroundWorkerHandle worker;
Expand Down Expand Up @@ -278,7 +278,7 @@ get_all_brokers(void)
* Load all offsets for all of this consumer's partitions
*/
static void
load_consumer_offsets(KafkaConsumer *consumer, struct rd_kafka_metadata_topic *meta, int64_t offset)
load_consumer_offsets(KafkaConsumer *consumer, struct rd_kafka_metadata_topic *meta, int64_t start_offset)
{
MemoryContext old;
ScanKeyData skey[1];
Expand All @@ -297,7 +297,7 @@ load_consumer_offsets(KafkaConsumer *consumer, struct rd_kafka_metadata_topic *m

/* by default, begin consuming from the end of a stream */
for (i = 0; i < meta->partition_cnt; i++)
consumer->offsets[i] = offset;
consumer->offsets[i] = start_offset;

consumer->num_partitions = meta->partition_cnt;

Expand All @@ -306,6 +306,8 @@ load_consumer_offsets(KafkaConsumer *consumer, struct rd_kafka_metadata_topic *m
Datum d;
bool isnull;
int partition;
int64_t offset;

ExecStoreTuple(tup, slot, InvalidBuffer, false);

d = slot_getattr(slot, OFFSETS_ATTR_PARTITION, &isnull);
Expand All @@ -314,18 +316,27 @@ load_consumer_offsets(KafkaConsumer *consumer, struct rd_kafka_metadata_topic *m
if(partition > consumer->num_partitions)
elog(ERROR, "invalid partition id: %d", partition);

if (offset == RD_KAFKA_OFFSET_NULL)
if (start_offset == RD_KAFKA_OFFSET_NULL)
{
d = slot_getattr(slot, OFFSETS_ATTR_OFFSET, &isnull);
if (isnull)
offset = RD_KAFKA_OFFSET_END;
else
offset = DatumGetInt64(d);
}
else
offset = start_offset;

consumer->offsets[partition] = DatumGetInt64(offset);
}

/* If no offset was saved and we passed it a NULL start_offset, set it to END */
for (i = 0; i < meta->partition_cnt; i++)
{
if (consumer->offsets[i] == RD_KAFKA_OFFSET_NULL)
consumer->offsets[i] = RD_KAFKA_OFFSET_END;
}

ExecDropSingleTupleTableSlot(slot);
heap_endscan(scan);
heap_close(offsets, RowExclusiveLock);
Expand Down Expand Up @@ -678,7 +689,7 @@ kafka_consume_main(Datum arg)

Assert(meta->topic_cnt == 1);
topic_meta = meta->topics[0];
load_consumer_offsets(&consumer, &topic_meta, proc->offset);
load_consumer_offsets(&consumer, &topic_meta, proc->start_offset);
CommitTransactionCommand();

/*
Expand Down Expand Up @@ -966,7 +977,7 @@ launch_consumer_group(Relation consumers, KafkaConsumer *consumer, int64 offset)

proc->consumer_id = consumer->id;
proc->partition_group = i;
proc->offset = offset;
proc->start_offset = offset;
namestrcpy(&proc->dbname, get_database_name(MyDatabaseId));

if (!RegisterDynamicBackgroundWorker(&worker, &handle))
Expand Down

0 comments on commit f9d6e5d

Please sign in to comment.