diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index f59724343e..e22a762526 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -453,7 +453,8 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> A """ import concurrent.futures - with concurrent.futures.ThreadPoolExecutor() as executor: + executor = concurrent.futures.ThreadPoolExecutor() + try: future = executor.submit( self._execute_without_timeout, task_prompt=task_prompt, task=task ) @@ -461,13 +462,42 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> A try: return future.result(timeout=timeout) except concurrent.futures.TimeoutError as e: - future.cancel() + # Attempt to cancel, but if the task is already running, cancel() returns False + # In that case, we must wait for it to complete to avoid orphaned threads + cancelled = future.cancel() + if not cancelled: + # Task is already running - wait for it with a grace period to avoid + # indefinite blocking, but ensure thread cleanup + try: + # Wait a short time for natural completion, then force shutdown + # Catch all exceptions to prevent task exceptions from leaking through + future.result(timeout=1.0) + except Exception: + # Task either timed out again, was cancelled, or raised an exception + # In all cases, we ignore it here and raise TimeoutError below + # This ensures the API contract is maintained + pass raise TimeoutError( f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task." ) from e except Exception as e: - future.cancel() + # Attempt to cancel on other exceptions too + cancelled = future.cancel() + if not cancelled: + # Wait briefly for completion to avoid orphaned threads + # Catch all exceptions to prevent task exceptions from leaking through + try: + future.result(timeout=0.1) + except Exception: + # Task either timed out, was cancelled, or raised a different exception + # In all cases, we ignore it here and raise RuntimeError below + # This ensures the API contract is maintained + pass raise RuntimeError(f"Task execution failed: {e!s}") from e + finally: + # Always shutdown executor and wait for threads to complete + # This ensures no orphaned threads even if cancel() failed + executor.shutdown(wait=True) def _execute_without_timeout(self, task_prompt: str, task: Task) -> Any: """Execute a task without a timeout.