Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Support for exactly once processing guarantee #875

@rubinatorz

Description

@rubinatorz

Hi @emmett9001

I was wondering if pykafka 2.8.0 supports the exactly once processing guarantee which is part of Kafka since 0.11 release (just read https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/).

The reason for asking is that I'm having a Kafka streaming application running with "processing.guarantee=exactly_once" enabled, which reads from one topic and branches into multiple topics based on some predicates. But when consuming from one of this topics (the ones that are branched, so the ones that are filled by the streaming application) the SimpleConsumer fails with this error:

Traceback (most recent call last): File "consume.py", line 47, in <module> p.start() File "consume.py", line 38, in start msg = self._consumer.consume(block=True) File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 483, in consume self._raise_worker_exceptions() File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 276, in _raise_worker_exceptions reraise(*self._worker_exception) File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 440, in fetcher self.fetch() File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 804, in fetch min_bytes=self._fetch_min_bytes File "/usr/local/lib/python2.7/dist-packages/pykafka/broker.py", line 45, in wrapped return fn(self, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/pykafka/broker.py", line 327, in fetch_messages return future.get(response_class, broker_version=self._broker_version) File "/usr/local/lib/python2.7/dist-packages/pykafka/handlers.py", line 76, in get return response_cls(self.response, **response_kwargs) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/fetch.py", line 216, in __init__ broker_version=broker_version) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/fetch.py", line 165, in __init__ broker_version=broker_version), File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/fetch.py", line 172, in _unpack_message_set message_set = MessageSet.decode(buff, partition_id=partition_id) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/message.py", line 272, in decode partition_id=partition_id) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/message.py", line 109, in decode (key, val) = struct_helpers.unpack_from('YY', buff, offset) File "/usr/local/lib/python2.7/dist-packages/pykafka/utils/struct_helpers.py", line 49, in unpack_from output = _unpack(fmt, buff, offset, 1)[0] File "/usr/local/lib/python2.7/dist-packages/pykafka/utils/struct_helpers.py", line 96, in _unpack items.extend(struct.unpack_from('!' + ch, buff, offset)) struct.error: unpack_from requires a buffer of at least 1088403925 bytes

I'm running Kafka 1.1.0 cluster with 3 nodes and with replication factor 3. I'm also providing the right broker_version to the KafkaClient.

I've been debugging this for a while and I found out that this "processing.guarantee=exactly_once" in the streaming app is causing the struct.error. When removing this setting, SimpleConsumer is consuming the branched topics like a charm. I also added some debug lines in the pykafka code, and it seems that there are some extra bytes in the buffer which could not be parsed to a message. So that brings me to my initial question, if pykafka is supporting this exactly once feature.

Furthermore another related issue is that I've seen gaps in the offsets of the branched topics, which pykafka cannot handle well: the enqueue function in SimpleConsumer handles it for non compacted topics like this:

(not self._is_compacted_topic and message.offset != self.next_offset):

Which skips items when there's a gap. message.offset goes for example from 3 to 5 but next_offset expects 4 and this never happens and skips all messages because message.offset will never turn to 4. This only occurs on those branched topics by the streaming app. Just mentioning because it may ring a bell.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions