diff --git a/quixstreams/app.py b/quixstreams/app.py index 30d71ab4e..7bdb47664 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -756,7 +756,7 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic self._source_manager.register( source, topic, - self._get_internal_producer(transactional=False), + self._get_internal_producer(), self._get_internal_consumer( extra_config_overrides=consumer_extra_config_overrides ), diff --git a/tests/test_quixstreams/test_app.py b/tests/test_quixstreams/test_app.py index fa39be29d..3c015914d 100644 --- a/tests/test_quixstreams/test_app.py +++ b/tests/test_quixstreams/test_app.py @@ -2354,6 +2354,37 @@ def _run_app(source, done): ) _run_app(source, finished) + @pytest.mark.parametrize("processing_guarantee", ["at-least-once", "exactly-once"]) + def test_source_producer_transactional_setting( + self, app_factory, processing_guarantee + ): + """ + Test that source producers respect the processing_guarantee setting. + + When processing_guarantee="exactly-once", the source producer should be transactional. + When processing_guarantee="at-least-once", the source producer should not be transactional. + """ + app = app_factory(processing_guarantee=processing_guarantee) + + # Create a dummy source + source = DummySource(values=[{"test": "data"}]) + + # Add the source to the app - this should create a producer with the correct transactional setting + app.add_source(source) + + # Get the source process from the source manager + source_process = app._source_manager.processes[0] + + # Verify the producer's transactional setting matches the processing guarantee + expected_transactional = processing_guarantee == "exactly-once" + assert ( + source_process._producer._producer.transactional == expected_transactional + ), ( + f"Expected producer.transactional={expected_transactional} for " + f"processing_guarantee='{processing_guarantee}', but got " + f"producer.transactional={source_process._producer._producer.transactional}" + ) + class TestApplicationMultipleSdf: def test_multiple_sdfs(