Skip to content

Conversation

@k-raina
Copy link
Member

@k-raina k-raina commented Nov 27, 2025

Summary

Added __len__ method to AIOProducer class to correctly count all pending messages, including those in both the async batch buffer and librdkafka's output queue.

Problem

Previously, len(producer._producer) only counted messages in librdkafka's output queue, missing messages that were still in the async batch buffer waiting to be sent to librdkafka.

Solution

Implemented __len__ method that returns the sum of:

  • Messages in librdkafka queue: len(self._producer)
  • Messages in async batch buffer: self._batch_processor.get_buffer_size()

Tests

Unit tests

  • tests/test_AIOProducer.py - 4 new test cases added

Manual test

test_aio_producer_continuous.py

Starting continuous produce test:
  Topic: test-topic
  Rate: 10.0 msg/sec
  Duration: 10000000.0 seconds
  Batch size: 20
  Buffer timeout: 1.0 seconds

Producing messages continuously...

Time | Produced | len() | Buffer | librdkafka | Status
----------------------------------------------------------------------
  0.5s |        6 |     6 |      6 |          0 | ✅
  1.0s |       11 |    11 |     11 |          0 | ✅
  1.5s |       16 |    16 |     16 |          0 | ✅
  2.0s |       20 |    20 |      0 |         20 | ✅
  2.5s |       25 |    25 |      5 |         20 | ✅
  3.0s |       30 |    30 |     10 |         20 | ✅
  3.5s |       35 |    35 |     15 |         20 | ✅
  4.0s |       40 |    40 |      0 |         40 | ✅
  4.6s |       45 |    45 |      5 |         40 | ✅
  5.1s |       50 |    50 |     10 |         40 | ✅
  5.6s |       55 |    55 |     15 |         40 | ✅
  6.1s |       60 |    60 |      0 |         60 | ✅
  6.6s |       65 |    65 |      5 |         60 | ✅
  7.1s |       70 |    70 |     10 |         60 | ✅
  7.6s |       75 |    75 |     15 |         60 | ✅
  8.1s |       80 |    80 |      0 |         80 | ✅
  8.6s |       85 |    85 |      5 |         80 | ✅
  9.1s |       90 |    90 |     10 |         80 | ✅
  9.6s |       95 |    95 |     15 |         80 | ✅
 10.1s |      100 |   100 |      0 |        100 | ✅
 10.6s |      105 |   105 |      5 |        100 | ✅
 11.1s |      110 |   110 |     10 |        100 | ✅
 11.6s |      115 |   115 |     15 |        100 | ✅
 12.2s |      120 |    20 |      0 |         20 | ✅
 12.7s |      125 |    25 |      5 |         20 | ✅
 13.2s |      130 |    30 |     10 |         20 | ✅
 13.7s |      135 |    35 |     15 |         20 | ✅
 14.2s |      140 |    40 |      0 |         40 | ✅
 14.7s |      145 |    45 |      5 |         40 | ✅
 15.2s |      150 |    50 |     10 |         40 | ✅
 15.7s |      155 |    55 |     15 |         40 | ✅
 16.2s |      160 |    60 |      0 |         60 | ✅
 16.7s |      165 |    65 |      5 |         60 | ✅
 17.2s |      170 |    70 |     10 |         60 | ✅
 17.7s |      175 |    75 |     15 |         60 | ✅
 18.2s |      180 |    80 |      0 |         80 | ✅
 18.7s |      185 |    85 |      5 |         80 | ✅
 19.2s |      190 |    90 |     10 |         80 | ✅
 19.7s |      195 |    95 |     15 |         80 | ✅
 20.3s |      200 |   100 |      0 |        100 | ✅
 20.8s |      205 |   105 |      5 |        100 | ✅
 21.3s |      210 |   110 |     10 |        100 | ✅
 21.8s |      215 |   115 |     15 |        100 | ✅
 22.3s |      220 |    20 |      0 |         20 | ✅
 22.8s |      225 |    25 |      5 |         20 | ✅
 23.3s |      230 |    30 |     10 |         20 | ✅
 23.8s |      235 |    35 |     15 |         20 | ✅
 24.3s |      240 |    40 |      0 |         40 | ✅
 24.8s |      245 |    45 |      5 |         40 | ✅
 25.3s |      250 |    50 |     10 |         40 | ✅
 25.8s |      255 |    55 |     15 |         40 | ✅
 26.3s |      260 |    60 |      0 |         60 | ✅
 26.8s |      265 |    65 |      5 |         60 | ✅
 27.3s |      270 |    70 |     10 |         60 | ✅
 27.8s |      275 |    75 |     15 |         60 | ✅
 28.3s |      280 |    80 |      0 |         80 | ✅
 28.9s |      285 |    85 |      5 |         80 | ✅
 29.4s |      290 |    90 |     10 |         80 | ✅
 29.9s |      295 |    95 |     15 |         80 | ✅
 30.4s |      300 |   100 |      0 |        100 | ✅

Copilot AI review requested due to automatic review settings November 27, 2025 10:52
@k-raina k-raina requested review from a team and MSeal as code owners November 27, 2025 10:52
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a __len__ method to the AIOProducer class to provide accurate count of all pending messages across both the async batch buffer and librdkafka's output queue. Previously, only messages in librdkafka's queue were counted.

Key changes:

  • Implemented __len__ method that sums messages from both queues
  • Added comprehensive unit tests covering various producer states
  • Returns 0 when producer is closed

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
src/confluent_kafka/experimental/aio/producer/_AIOProducer.py Added __len__ method implementation with docstring and closed state handling
tests/test_AIOProducer.py Added 4 new test cases covering buffered messages, post-flush state, closed producer, and mixed state scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@pytest.mark.asyncio
async def test_aio_producer_len_with_buffered_messages(self, mock_producer, mock_common, basic_config):
"""Test that __len__ counts messages in async batch buffer"""
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test patches _flush_buffer to prevent auto-flush, but doesn't verify that the patch was actually called or not called. Consider adding an assertion like mock_flush.assert_not_called() after producing messages to ensure the buffer wasn't inadvertently flushed.

Copilot uses AI. Check for mistakes.
Comment on lines +603 to +605
with patch.object(producer, '_flush_buffer') as mock_flush:
for i in range(7):
await producer.produce('test-topic', value=f'msg-{i}'.encode())
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test patches _flush_buffer but doesn't verify the flushing behavior. With batch_size=5, the buffer should flush after the 5th message. Consider asserting mock_flush.call_count == 1 after the loop to confirm the expected flush occurred.

Copilot uses AI. Check for mistakes.
@k-raina k-raina changed the title Add length to producer Add _len_ function to AIOProducer Nov 27, 2025
@sonarqube-confluent
Copy link

Copy link
Member

@fangnx fangnx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants