From 08c77499a2e8bc79d6788d70ef96d77752ed6325 Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Tue, 14 Aug 2018 15:17:23 +0300 Subject: [PATCH] Support produce with Kafka record headers --- README.rst | 4 ++++ kafka/producer/future.py | 10 +++++----- kafka/producer/kafka.py | 18 +++++++++++++----- kafka/producer/record_accumulator.py | 16 +++++++++------- test/test_producer.py | 10 +++++++++- 5 files changed, 40 insertions(+), 18 deletions(-) diff --git a/README.rst b/README.rst index 28cb7e77c..a82573bbf 100644 --- a/README.rst +++ b/README.rst @@ -117,6 +117,10 @@ for more details. >>> for i in range(1000): ... producer.send('foobar', b'msg %d' % i) +>>> # Include record headers. The format is list of tuples with string key +>>> # and bytes value. +>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + >>> # Get producer performance metrics >>> metrics = producer.metrics() diff --git a/kafka/producer/future.py b/kafka/producer/future.py index aa216c4e5..1c5d6d7bf 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -29,11 +29,11 @@ def wait(self, timeout=None): class FutureRecordMetadata(Future): - def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size): + def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): super(FutureRecordMetadata, self).__init__() self._produce_future = produce_future # packing args as a tuple is a minor speed optimization - self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size) + self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) produce_future.add_callback(self._produce_success) produce_future.add_errback(self.failure) @@ -42,7 +42,7 @@ def _produce_success(self, offset_and_timestamp): # Unpacking from args tuple is minor speed optimization (relative_offset, timestamp_ms, checksum, - serialized_key_size, serialized_value_size) = self.args + serialized_key_size, serialized_value_size, serialized_header_size) = self.args # None is when Broker does not support the API (<0.10) and # -1 is when the broker is configured for CREATE_TIME timestamps @@ -53,7 +53,7 @@ def _produce_success(self, offset_and_timestamp): tp = self._produce_future.topic_partition metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, checksum, serialized_key_size, - serialized_value_size) + serialized_value_size, serialized_header_size) self.success(metadata) def get(self, timeout=None): @@ -68,4 +68,4 @@ def get(self, timeout=None): RecordMetadata = collections.namedtuple( 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', - 'checksum', 'serialized_key_size', 'serialized_value_size']) + 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 24b58fe6d..4fc7bc687 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -513,7 +513,7 @@ def _estimate_size_in_bytes(self, key, value, headers=[]): return LegacyRecordBatchBuilder.estimate_size_in_bytes( magic, self.config['compression_type'], key, value) - def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): + def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None): """Publish a message to a topic. Arguments: @@ -534,6 +534,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer. + headers (optional): a list of header key value pairs. List items + are tuples of str key and bytes value. timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. @@ -563,13 +565,18 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) - message_size = self._estimate_size_in_bytes(key_bytes, value_bytes) + if headers is None: + headers = [] + assert type(headers) == list + assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers) + + message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers) self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) - log.debug("Sending (key=%r value=%r) to %s", key, value, tp) + log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp) result = self._accumulator.append(tp, timestamp_ms, - key_bytes, value_bytes, + key_bytes, value_bytes, headers, self.config['max_block_ms'], estimated_size=message_size) future, batch_is_full, new_batch_created = result @@ -588,7 +595,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): FutureProduceResult(TopicPartition(topic, partition)), -1, None, None, len(key_bytes) if key_bytes is not None else -1, - len(value_bytes) if value_bytes is not None else -1 + len(value_bytes) if value_bytes is not None else -1, + sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1, ).failure(e) def flush(self, timeout=None): diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 1cd541356..84b01d1b5 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -55,8 +55,8 @@ def __init__(self, tp, records, buffer): def record_count(self): return self.records.next_offset() - def try_append(self, timestamp_ms, key, value): - metadata = self.records.append(timestamp_ms, key, value) + def try_append(self, timestamp_ms, key, value, headers): + metadata = self.records.append(timestamp_ms, key, value, headers) if metadata is None: return None @@ -65,7 +65,8 @@ def try_append(self, timestamp_ms, key, value): future = FutureRecordMetadata(self.produce_future, metadata.offset, metadata.timestamp, metadata.crc, len(key) if key is not None else -1, - len(value) if value is not None else -1) + len(value) if value is not None else -1, + sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future def done(self, base_offset=None, timestamp_ms=None, exception=None): @@ -196,7 +197,7 @@ def __init__(self, **configs): self.muted = set() self._drain_index = 0 - def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms, + def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms, estimated_size=0): """Add a record to the accumulator, return the append result. @@ -209,6 +210,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms, timestamp_ms (int): The timestamp of the record (epoch ms) key (bytes): The key for the record value (bytes): The value for the record + headers (List[Tuple[str, bytes]]): The header fields for the record max_time_to_block_ms (int): The maximum time in milliseconds to block for buffer memory to be available @@ -231,7 +233,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms, dq = self._batches[tp] if dq: last = dq[-1] - future = last.try_append(timestamp_ms, key, value) + future = last.try_append(timestamp_ms, key, value, headers) if future is not None: batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False @@ -246,7 +248,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms, if dq: last = dq[-1] - future = last.try_append(timestamp_ms, key, value) + future = last.try_append(timestamp_ms, key, value, headers) if future is not None: # Somebody else found us a batch, return the one we # waited for! Hopefully this doesn't happen often... @@ -261,7 +263,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms, ) batch = ProducerBatch(tp, records, buf) - future = batch.try_append(timestamp_ms, key, value) + future = batch.try_append(timestamp_ms, key, value, headers) if not future: raise Exception() diff --git a/test/test_producer.py b/test/test_producer.py index 09d184f34..176b23988 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -91,10 +91,16 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): compression_type=compression) magic = producer._max_usable_produce_magic() + # record headers are supported in 0.11.0 + if version() < (0, 11, 0): + headers = None + else: + headers = [("Header Key", b"Header Value")] + topic = random_string(5) future = producer.send( topic, - value=b"Simple value", key=b"Simple key", timestamp_ms=9999999, + value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999, partition=0) record = future.get(timeout=5) assert record is not None @@ -116,6 +122,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): assert record.serialized_key_size == 10 assert record.serialized_value_size == 12 + if headers: + assert record.serialized_header_size == 22 # generated timestamp case is skipped for broker 0.9 and below if magic == 0: