Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add chain_future to send method #143

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)

def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None, chain_future=None):
"""Publish a message to a topic.

Arguments:
Expand All @@ -563,6 +563,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
are tuples of str key and bytes value.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
chain_future (Future, optional): chained success and failure method

Returns:
FutureRecordMetadata: resolves to RecordMetadata
Expand Down Expand Up @@ -603,7 +604,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
estimated_size=message_size,
chain_future=chain_future)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
Expand Down
13 changes: 11 additions & 2 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time

import kafka.errors as Errors
from kafka.future import Future
from kafka.producer.buffer import SimpleBufferPool
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.record.memory_records import MemoryRecordsBuilder
Expand Down Expand Up @@ -198,7 +199,7 @@ def __init__(self, **configs):
self._drain_index = 0

def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
estimated_size=0):
estimated_size=0, chain_future=None):
"""Add a record to the accumulator, return the append result.

The append result will contain the future metadata, and flag for
Expand All @@ -213,12 +214,14 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
headers (List[Tuple[str, bytes]]): The header fields for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available

chain_future (Future): chain future
Returns:
tuple: (future, batch_is_full, new_batch_created)
"""
assert isinstance(tp, TopicPartition), 'not TopicPartition'
assert not self._closed, 'RecordAccumulator is closed'
if chain_future is not None:
assert isinstance(chain_future, Future), 'not Future'
# We keep track of the number of appending thread to make sure we do
# not miss batches in abortIncompleteBatches().
self._appends_in_progress.increment()
Expand All @@ -235,6 +238,8 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
last = dq[-1]
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
if chain_future:
future.chain(chain_future)
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

Expand All @@ -253,6 +258,8 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
self._free.deallocate(buf)
if chain_future:
future.chain(chain_future)
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

Expand All @@ -269,6 +276,8 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,

dq.append(batch)
self._incomplete.add(batch)
if chain_future:
future.chain(chain_future)
batch_is_full = len(dq) > 1 or batch.records.is_full()
return future, batch_is_full, True
finally:
Expand Down
Loading