From 5fcbc31fa8ddc657c640302a64e94a03402011ab Mon Sep 17 00:00:00 2001 From: rakshit-gen Date: Sat, 20 Dec 2025 20:31:11 +0530 Subject: [PATCH] fix: prevent task output loss when mixing sync and async tasks Fixes #4137 When processing async tasks, `_process_async_tasks` returns a new list containing only async task outputs. The caller was replacing the existing `task_outputs` list instead of extending it, causing all previous sync task outputs to be silently lost. Changed assignment (`=`) to `extend()` at all affected locations: - `_execute_tasks()` lines 1155, 1169 - `_aexecute_tasks()` lines 960, 976 - `_handle_conditional_task()` line 1182 - `_ahandle_conditional_task()` line 990 This ensures sync task outputs accumulated before async tasks are preserved when async tasks are collected. --- lib/crewai/src/crewai/crew.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 2c7f583b98..eaa11ca936 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -957,9 +957,9 @@ async def _aexecute_tasks( pending_tasks.append((task, async_task, task_index)) else: if pending_tasks: - task_outputs = await self._aprocess_async_tasks( + task_outputs.extend(await self._aprocess_async_tasks( pending_tasks, was_replayed - ) + )) pending_tasks.clear() context = self._get_context(task, task_outputs) @@ -973,7 +973,7 @@ async def _aexecute_tasks( self._store_execution_log(task, task_output, task_index, was_replayed) if pending_tasks: - task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed) + task_outputs.extend(await self._aprocess_async_tasks(pending_tasks, was_replayed)) return self._create_crew_output(task_outputs) @@ -987,7 +987,7 @@ async def _ahandle_conditional_task( ) -> TaskOutput | None: """Handle conditional task evaluation using native async.""" if pending_tasks: - task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed) + task_outputs.extend(await self._aprocess_async_tasks(pending_tasks, was_replayed)) pending_tasks.clear() return check_conditional_skip( @@ -1152,7 +1152,7 @@ def _execute_tasks( futures.append((task, future, task_index)) else: if futures: - task_outputs = self._process_async_tasks(futures, was_replayed) + task_outputs.extend(self._process_async_tasks(futures, was_replayed)) futures.clear() context = self._get_context(task, task_outputs) @@ -1166,7 +1166,7 @@ def _execute_tasks( self._store_execution_log(task, task_output, task_index, was_replayed) if futures: - task_outputs = self._process_async_tasks(futures, was_replayed) + task_outputs.extend(self._process_async_tasks(futures, was_replayed)) return self._create_crew_output(task_outputs) @@ -1179,7 +1179,7 @@ def _handle_conditional_task( was_replayed: bool, ) -> TaskOutput | None: if futures: - task_outputs = self._process_async_tasks(futures, was_replayed) + task_outputs.extend(self._process_async_tasks(futures, was_replayed)) futures.clear() return check_conditional_skip(