Skip to content

Commit 0f76fed

Browse files
authored
Fix async producer transaction behavior + add transactional produce benchmark test (#2072)
* update * linter fix
1 parent 825905e commit 0f76fed

11 files changed

+867
-355
lines changed

src/confluent_kafka/aio/producer/_AIOProducer.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async def purge(self, *args, **kwargs):
206206
"""Purges messages from internal queues - may block during cleanup"""
207207
# Cancel all pending futures
208208
self._batch_processor.cancel_pending_futures()
209-
209+
210210
# Clear local message buffer and futures
211211
self._batch_processor.clear_buffer()
212212

@@ -238,12 +238,21 @@ async def send_offsets_to_transaction(self, *args, **kwargs):
238238
*args, **kwargs)
239239

240240
async def commit_transaction(self, *args, **kwargs):
241-
"""Network call to commit transaction"""
241+
"""Commit transaction after flushing all buffered messages"""
242+
243+
# First, flush all messages (both local buffer and librdkafka queue)
244+
await self.flush()
245+
246+
# Then commit transaction
242247
return await self._call(self._producer.commit_transaction,
243248
*args, **kwargs)
244249

245250
async def abort_transaction(self, *args, **kwargs):
246251
"""Network call to abort transaction"""
252+
253+
# Clear mesasges pending in the local buffer
254+
await self.purge()
255+
247256
return await self._call(self._producer.abort_transaction,
248257
*args, **kwargs)
249258

tests/ducktape/README.md

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Ducktape Producer Tests
1+
# Ducktape Tests
22

3-
Ducktape-based producer tests for the Confluent Kafka Python client with comprehensive performance metrics.
3+
Ducktape-based tests for the Confluent Kafka Python client with comprehensive performance metrics.
44

55
## Prerequisites
66

@@ -15,18 +15,12 @@ Ducktape-based producer tests for the Confluent Kafka Python client with compreh
1515
./tests/ducktape/run_ducktape_test.py
1616

1717
# Run all tests in a file
18-
./tests/ducktape/run_ducktape_test.py test_producer.py
18+
./tests/ducktape/run_ducktape_test.py producer
1919

2020
# Run a specific test with metrics
21-
./tests/ducktape/run_ducktape_test.py test_producer.py SimpleProducerTest.test_basic_produce
21+
./tests/ducktape/run_ducktape_test.py producer SimpleProducerTest.test_basic_produce
2222
```
2323

24-
## Test Cases
25-
26-
- **test_basic_produce**: Basic message production with integrated metrics tracking
27-
- **test_produce_multiple_batches**: Parameterized tests (2s, 5s, 10s durations) with metrics
28-
- **test_produce_with_compression**: Matrix tests (none, gzip, snappy) with compression-aware metrics
29-
3024
## Integrated Performance Metrics Features
3125

3226
Every test automatically includes:
@@ -41,7 +35,7 @@ Every test automatically includes:
4135

4236
## Configuration
4337

44-
Performance bounds are loaded from an environment-based JSON config file. By default, it loads `benchmark_bounds.json`, but you can override this with the `BENCHMARK_BOUNDS_CONFIG` environment variable.
38+
Performance bounds are loaded from an environment-based JSON config file. By default, it loads `producer_benchmark_bounds.json`, but you can override this with the `BENCHMARK_BOUNDS_CONFIG` environment variable.
4539

4640
### Environment-Based Configuration
4741

@@ -98,9 +92,9 @@ BENCHMARK_BOUNDS_CONFIG=custom_bounds.json ./run_ducktape_test.py
9892
```
9993

10094
```python
101-
from benchmark_metrics import MetricsBounds
95+
from producer_benchmark_metrics import MetricsBounds
10296

103-
# Loads from BENCHMARK_BOUNDS_CONFIG env var, or benchmark_bounds.json if not set
97+
# Loads from BENCHMARK_BOUNDS_CONFIG env var, or producer_benchmark_bounds.json if not set
10498
bounds = MetricsBounds()
10599

106100
# Or load from a specific config file

0 commit comments

Comments
 (0)