Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions lib/crewai/src/crewai/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,21 +453,51 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> A
"""
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor:
executor = concurrent.futures.ThreadPoolExecutor()
try:
future = executor.submit(
self._execute_without_timeout, task_prompt=task_prompt, task=task
)

try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError as e:
future.cancel()
# Attempt to cancel, but if the task is already running, cancel() returns False
# In that case, we must wait for it to complete to avoid orphaned threads
cancelled = future.cancel()
if not cancelled:
# Task is already running - wait for it with a grace period to avoid
# indefinite blocking, but ensure thread cleanup
try:
# Wait a short time for natural completion, then force shutdown
# Catch all exceptions to prevent task exceptions from leaking through
future.result(timeout=1.0)
except Exception:
# Task either timed out again, was cancelled, or raised an exception
# In all cases, we ignore it here and raise TimeoutError below
# This ensures the API contract is maintained
pass
raise TimeoutError(
f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task."
) from e
except Exception as e:
future.cancel()
# Attempt to cancel on other exceptions too
cancelled = future.cancel()
if not cancelled:
# Wait briefly for completion to avoid orphaned threads
# Catch all exceptions to prevent task exceptions from leaking through
try:
future.result(timeout=0.1)
except Exception:
# Task either timed out, was cancelled, or raised a different exception
# In all cases, we ignore it here and raise RuntimeError below
# This ensures the API contract is maintained
pass
raise RuntimeError(f"Task execution failed: {e!s}") from e
finally:
# Always shutdown executor and wait for threads to complete
# This ensures no orphaned threads even if cancel() failed
executor.shutdown(wait=True)

def _execute_without_timeout(self, task_prompt: str, task: Task) -> Any:
"""Execute a task without a timeout.
Expand Down