From dac6263cdc5c3a4c6ba45a0cf6c6477bbcc815b5 Mon Sep 17 00:00:00 2001 From: rakshit-gen Date: Sat, 20 Dec 2025 20:22:07 +0530 Subject: [PATCH 1/2] fix: prevent orphaned threads in agent timeout handling - Replace context manager with explicit try/finally block - Check cancel() return value and handle both success/failure cases - Always call executor.shutdown(wait=True) to ensure thread cleanup - Prevents resource leaks and thread accumulation under load Fixes issue where tasks timing out would create orphaned threads that hold resources indefinitely, leading to memory leaks and potential crashes in production environments. --- lib/crewai/src/crewai/agent/core.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index f59724343e..9896d8df70 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -453,7 +453,8 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> A """ import concurrent.futures - with concurrent.futures.ThreadPoolExecutor() as executor: + executor = concurrent.futures.ThreadPoolExecutor() + try: future = executor.submit( self._execute_without_timeout, task_prompt=task_prompt, task=task ) @@ -461,13 +462,35 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> A try: return future.result(timeout=timeout) except concurrent.futures.TimeoutError as e: - future.cancel() + # Attempt to cancel, but if the task is already running, cancel() returns False + # In that case, we must wait for it to complete to avoid orphaned threads + cancelled = future.cancel() + if not cancelled: + # Task is already running - wait for it with a grace period to avoid + # indefinite blocking, but ensure thread cleanup + try: + # Wait a short time for natural completion, then force shutdown + future.result(timeout=1.0) + except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError): + # Task still running after grace period - will be cleaned up by executor shutdown + pass raise TimeoutError( f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task." ) from e except Exception as e: - future.cancel() + # Attempt to cancel on other exceptions too + cancelled = future.cancel() + if not cancelled: + # Wait briefly for completion to avoid orphaned threads + try: + future.result(timeout=0.1) + except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError): + pass raise RuntimeError(f"Task execution failed: {e!s}") from e + finally: + # Always shutdown executor and wait for threads to complete + # This ensures no orphaned threads even if cancel() failed + executor.shutdown(wait=True) def _execute_without_timeout(self, task_prompt: str, task: Task) -> Any: """Execute a task without a timeout. From e18234dcc082ddbb4e3e3b8c33a00567acc96e47 Mon Sep 17 00:00:00 2001 From: rakshit-gen Date: Sat, 20 Dec 2025 20:33:04 +0530 Subject: [PATCH 2/2] fix: catch all exceptions in cleanup to maintain API contract When future.result() is called during cleanup, it can re-raise task exceptions. The previous code only caught TimeoutError and CancelledError, allowing other exceptions to leak through and break the API contract. Now catch all exceptions during cleanup to ensure: - TimeoutError is always raised for timeouts (not task exceptions) - RuntimeError is always raised for task failures (not original exceptions) This maintains the documented API contract while still ensuring proper thread cleanup. --- lib/crewai/src/crewai/agent/core.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 9896d8df70..e22a762526 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -470,9 +470,12 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> A # indefinite blocking, but ensure thread cleanup try: # Wait a short time for natural completion, then force shutdown + # Catch all exceptions to prevent task exceptions from leaking through future.result(timeout=1.0) - except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError): - # Task still running after grace period - will be cleaned up by executor shutdown + except Exception: + # Task either timed out again, was cancelled, or raised an exception + # In all cases, we ignore it here and raise TimeoutError below + # This ensures the API contract is maintained pass raise TimeoutError( f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task." @@ -482,9 +485,13 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> A cancelled = future.cancel() if not cancelled: # Wait briefly for completion to avoid orphaned threads + # Catch all exceptions to prevent task exceptions from leaking through try: future.result(timeout=0.1) - except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError): + except Exception: + # Task either timed out, was cancelled, or raised a different exception + # In all cases, we ignore it here and raise RuntimeError below + # This ensures the API contract is maintained pass raise RuntimeError(f"Task execution failed: {e!s}") from e finally: