Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/confluent_kafka/experimental/aio/producer/_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,27 @@ def __del__(self) -> None:
if hasattr(self, '_buffer_timeout_manager'):
self._buffer_timeout_manager.stop_timeout_monitoring()

def __len__(self) -> int:
"""Return the total number of pending messages.

This includes:
- Messages in librdkafka's output queue (waiting to be delivered to broker)
- Messages in the async batch buffer (waiting to be sent to librdkafka)

Returns:
int: Total number of pending messages across both queues
"""
if self._is_closed:
return 0

# Count messages in librdkafka queue
librdkafka_count = len(self._producer)

# Count messages in async batch buffer
buffer_count = self._batch_processor.get_buffer_size()

return librdkafka_count + buffer_count

# ========================================================================
# CORE PRODUCER OPERATIONS - Main public API
# ========================================================================
Expand Down
97 changes: 97 additions & 0 deletions tests/test_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,100 @@ async def test_edge_cases_batching(self, mock_producer, mock_common, basic_confi
assert mock_flush.call_count >= 1 # At least one flush

await producer.close()

@pytest.mark.asyncio
async def test_aio_producer_len_with_buffered_messages(self, mock_producer, mock_common, basic_config):
"""Test that __len__ counts messages in async batch buffer"""
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test patches _flush_buffer to prevent auto-flush, but doesn't verify that the patch was actually called or not called. Consider adding an assertion like mock_flush.assert_not_called() after producing messages to ensure the buffer wasn't inadvertently flushed.

Copilot uses AI. Check for mistakes.

# Produce 5 messages (less than batch_size, so they stay in buffer)
with patch.object(producer, '_flush_buffer') as mock_flush: # Prevent auto-flush
for i in range(5):
await producer.produce('test-topic', value=f'msg-{i}'.encode())

# Verify flush was not called (messages stayed in buffer)
mock_flush.assert_not_called()

# len() should count messages in buffer
assert len(producer) == 5
assert producer._batch_processor.get_buffer_size() == 5
assert len(producer._producer) == 0 # Nothing in librdkafka yet
# Verify len() equals the sum
assert len(producer) == producer._batch_processor.get_buffer_size() + len(producer._producer)

await producer.close()

@pytest.mark.asyncio
async def test_aio_producer_len_after_flush(self, mock_producer, mock_common, basic_config):
"""Test that __len__ counts messages after flush to librdkafka"""
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)

# Produce and flush
with patch.object(producer, '_flush_buffer'): # Prevent auto-flush
for i in range(5):
await producer.produce('test-topic', value=f'msg-{i}'.encode())

# Flush to move messages to librdkafka
await producer.flush()

# After flush, messages move to librdkafka queue
# Verify len() correctly equals the sum of buffer + librdkafka
buffer_count = producer._batch_processor.get_buffer_size()
librdkafka_count = len(producer._producer)
total_len = len(producer)

# Buffer should be empty after flush
assert buffer_count == 0
# Verify len() equals the sum (this validates the __len__ implementation)
assert total_len == buffer_count + librdkafka_count
# Messages should be in librdkafka queue after flush
assert total_len == librdkafka_count

await producer.close()

@pytest.mark.asyncio
async def test_aio_producer_len_closed_producer(self, mock_producer, mock_common, basic_config):
"""Test that __len__ returns 0 for closed producer"""
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)

# Produce some messages
with patch.object(producer, '_flush_buffer'): # Prevent auto-flush
for i in range(3):
await producer.produce('test-topic', value=f'msg-{i}'.encode())

# Verify messages are there
assert len(producer) == 3

# Close producer
await producer.close()

# len() should return 0 for closed producer
assert len(producer) == 0

@pytest.mark.asyncio
async def test_aio_producer_len_mixed_state(self, mock_producer, mock_common, basic_config):
"""Test __len__ when messages are in both buffer and librdkafka queue"""
producer = AIOProducer(basic_config, batch_size=5, buffer_timeout=0)

# Produce 7 messages - first 5 should flush (batch_size=5), last 2 stay in buffer
with patch.object(producer, '_flush_buffer') as mock_flush:
for i in range(7):
await producer.produce('test-topic', value=f'msg-{i}'.encode())
Comment on lines +613 to +615
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test patches _flush_buffer but doesn't verify the flushing behavior. With batch_size=5, the buffer should flush after the 5th message. Consider asserting mock_flush.call_count == 1 after the loop to confirm the expected flush occurred.

Copilot uses AI. Check for mistakes.

# With batch_size=5, flush should be called after 5th message
# Verify flush was called (at least once when batch_size reached)
assert mock_flush.call_count >= 1

# After batch_size messages, some may have flushed
# Total should be sum of buffer + librdkafka queue
buffer_count = producer._batch_processor.get_buffer_size()
librdkafka_count = len(producer._producer)
total_count = len(producer)

# Verify len() correctly equals the sum (this validates the __len__ implementation)
assert total_count == buffer_count + librdkafka_count
# At least the messages beyond batch_size should be in buffer
# (exact count depends on flush behavior)
assert total_count >= 2 # At least the last 2 should be pending

await producer.close()