Skip to content

Commit 6ccd07d

Browse files
committed
Added functionality behind Topic offset argument.
1 parent 4e13e73 commit 6ccd07d

File tree

1 file changed

+22
-1
lines changed

1 file changed

+22
-1
lines changed

slipstream/core.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,22 @@ async def admin(self) -> AIOKafkaClient:
167167
if k in params
168168
})
169169

170-
async def get_consumer(self):
170+
async def seek(self, offset: int, consumer: Optional[AIOKafkaConsumer]):
171+
"""Seek to offset."""
172+
if not (c := consumer or self.consumer):
173+
raise RuntimeError('No consumer provided.')
174+
partitions = c.assignment()
175+
if offset < READ_FROM_START:
176+
raise ValueError(f'Offset must be bigger than: {READ_FROM_START}.')
177+
if offset == READ_FROM_START:
178+
await c.seek_to_beginning(*partitions)
179+
elif offset == READ_FROM_END:
180+
await c.seek_to_end(*partitions)
181+
else:
182+
for p in partitions:
183+
c.seek(p, offset)
184+
185+
async def get_consumer(self) -> AIOKafkaConsumer:
171186
"""Get started instance of Kafka consumer."""
172187
params = get_params_names(AIOKafkaConsumer)
173188
if self.codec:
@@ -178,6 +193,12 @@ async def get_consumer(self):
178193
if k in params
179194
})
180195
await consumer.start()
196+
if self.starting_offset:
197+
try:
198+
await self.seek(self.starting_offset, consumer)
199+
except Exception:
200+
await consumer.stop()
201+
raise
181202
return consumer
182203

183204
async def get_producer(self):

0 commit comments

Comments
 (0)