Skip to content
Merged
122 changes: 108 additions & 14 deletions python/valuecell/core/agent/connect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from importlib import import_module
from pathlib import Path
Expand Down Expand Up @@ -70,29 +71,37 @@ def hidden(self) -> bool:

_LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {}

# Global thread pool for offloading imports. Using a fixed executor allows
# better control and avoids unbounded thread creation when many imports are
# requested concurrently.
executor = ThreadPoolExecutor(max_workers=4)

def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]:
"""Resolve a `module:Class` spec to a Python class.

This function is synchronous and performs a normal import. Callers that
need to avoid blocking the event loop should invoke this via
`asyncio.to_thread(_resolve_local_agent_class, spec)`.
def _resolve_local_agent_class_sync(spec: str) -> Optional[Type[Any]]:
"""Synchronous resolver used for fallback and direct calls.

Results are cached in `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated
imports/attribute lookups.
Keeps the original import behavior but is extracted so it can be invoked
from a thread pool via `run_in_executor`.
"""

if not spec:
return None

cached = _LOCAL_AGENT_CLASS_CACHE.get(spec)
if cached is not None:
logger.debug("_resolve_local_agent_class_sync: cache hit for '{}'", spec)
return cached

try:
module_path, class_name = spec.split(":", 1)
logger.info(
"_resolve_local_agent_class_sync: importing module '{}' for class '{}'",
module_path,
class_name,
)
module = import_module(module_path)
logger.info("_resolve_local_agent_class_sync: module imported, getting class")
agent_cls = getattr(module, class_name)
logger.info("_resolve_local_agent_class_sync: class '{}' resolved", class_name)
except (ValueError, AttributeError, ImportError) as exc:
logger.error("Failed to import agent class '{}': {}", spec, exc)
return None
Expand All @@ -101,6 +110,37 @@ def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]:
return agent_cls


async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]:
"""Asynchronously resolve a `module:Class` spec to a Python class.

The actual import is executed in a thread pool via `loop.run_in_executor`
to avoid blocking the event loop. Results are cached in
`_LOCAL_AGENT_CLASS_CACHE` to avoid repeated imports.
"""
if not spec:
return None

# Fast path: cache hit
cached = _LOCAL_AGENT_CLASS_CACHE.get(spec)
if cached is not None:
logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec)
return cached

loop = asyncio.get_running_loop()
# Delegate the synchronous import to the thread pool
try:
agent_cls = await loop.run_in_executor(
executor, _resolve_local_agent_class_sync, spec
)
except Exception as exc:
logger.error(
"_resolve_local_agent_class: threaded import failed for '{}': {}", spec, exc
)
return None

return agent_cls


async def _build_local_agent(ctx: AgentContext):
"""Asynchronously produce a wrapped local agent instance for the
given `AgentContext`.
Expand All @@ -109,19 +149,34 @@ async def _build_local_agent(ctx: AgentContext):
- If `agent_instance_class` is already present, use it.
- Otherwise, if `agent_class_spec` is provided, resolve it off the
event loop (`asyncio.to_thread`) so imports don't block the loop.
- If resolution fails, log a warning and return `None` (caller will
treat missing factory as "no local agent available").
A timeout is applied to prevent hangs on Windows where import lock
contention between threads and the event loop can cause deadlocks.
- If resolution fails or times out, fall back to synchronous import.
- The actual wrapping call (`create_wrapped_agent`) is performed on
the event loop; this preserves any asyncio-related initialization
semantics required by the wrapper (if it needs loop context).
"""

agent_cls = ctx.agent_instance_class
if agent_cls is None and ctx.agent_class_spec:
# Resolve the import in a worker thread to avoid blocking the loop.
agent_cls = await asyncio.to_thread(
_resolve_local_agent_class, ctx.agent_class_spec
)
# Try resolving the import in a worker thread with a timeout.
# Use the async resolver which delegates to a thread pool. If the
# operation times out, attempt a direct import in the executor as a
# final fallback.
try:
agent_cls = await asyncio.wait_for(
_resolve_local_agent_class(ctx.agent_class_spec), timeout=5.0
)
except asyncio.TimeoutError:
logger.warning(
"Threaded import timed out for '{}', falling back to executor sync import",
ctx.agent_class_spec,
)
loop = asyncio.get_running_loop()
agent_cls = await loop.run_in_executor(
executor, _resolve_local_agent_class_sync, ctx.agent_class_spec
)

ctx.agent_instance_class = agent_cls
if agent_cls is None:
logger.warning(
Expand Down Expand Up @@ -185,6 +240,7 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None:
)
return

logger.info(f"Loading agent cards from {agent_card_dir}")
for json_file in agent_card_dir.glob("*.json"):
try:
# Read name minimally to resolve via helper
Expand Down Expand Up @@ -217,12 +273,37 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None:
f"Failed to load agent card from {json_file}; skipping: {e}"
)
continue
logger.info(
f"Loaded {len(self._contexts)} agent card(s) from {agent_card_dir}: {list(self._contexts.keys())}"
)
self._remote_contexts_loaded = True

def _ensure_remote_contexts_loaded(self) -> None:
if not self._remote_contexts_loaded:
self._load_remote_contexts()

def preload_local_agent_classes(self) -> None:
"""Preload all local agent classes synchronously at startup.

This method should be called during application startup (before the
event loop processes requests) to avoid import deadlocks on Windows.
Importing Python modules in a worker thread while the main thread holds
the import lock can cause hangs. By importing everything upfront in
the main thread, we sidestep this issue entirely.
"""
self._ensure_remote_contexts_loaded()
for name, ctx in self._contexts.items():
if ctx.agent_class_spec and ctx.agent_instance_class is None:
logger.info("Preloading agent class for '{}'", name)
cls = _resolve_local_agent_class_sync(ctx.agent_class_spec)
ctx.agent_instance_class = cls
if cls is None:
logger.warning(
"Failed to preload agent class '{}' for '{}'",
ctx.agent_class_spec,
name,
)

# Public helper primarily for tests or tooling to load from a custom dir
def load_from_dir(self, config_dir: str) -> None:
"""Load agent contexts from a specific directory of JSON card files."""
Expand All @@ -242,6 +323,9 @@ async def start_agent(
"""
# Use agent-specific lock to prevent concurrent starts of the same agent
agent_lock = self._get_agent_lock(agent_name)
logger.info(
f"Request to start agent '{agent_name}' (with_listener={with_listener})"
)
async with agent_lock:
ctx = await self._get_or_create_context(agent_name)

Expand Down Expand Up @@ -297,6 +381,9 @@ async def _ensure_client(self, ctx: AgentContext) -> None:
if not url:
raise ValueError(f"Unable to determine URL for agent '{ctx.name}'")
# Initialize a temporary client; only assign to context on success
logger.info(
f"Initializing client for '{ctx.name}' at {url} (listener_url={ctx.listener_url})"
)
tmp_client = AgentClient(url, push_notification_url=ctx.listener_url)
try:
await self._initialize_client(tmp_client, ctx)
Expand Down Expand Up @@ -341,6 +428,7 @@ async def _ensure_agent_runtime(self, ctx: AgentContext) -> None:
logger.info(f"Launching in-process agent '{ctx.name}'")

if ctx.agent_task is None:
logger.info(f"Creating task to run in-process agent '{ctx.name}'")
ctx.agent_task = asyncio.create_task(ctx.agent_instance.serve())
# Give the event loop a chance to schedule startup work
await asyncio.sleep(0)
Expand All @@ -359,9 +447,15 @@ async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> No
"""Initialize client with retry for local agents."""
retries = 3 if ctx.agent_task else 1
delay = 0.2
logger.info(
f"_initialize_client: initializing client for '{ctx.name}' (retries={retries})"
)
for attempt in range(retries):
try:
await client.ensure_initialized()
logger.info(
f"Client initialized for '{ctx.name}' on attempt {attempt + 1}"
)
return
except Exception as exc:
if attempt >= retries - 1:
Expand Down
Loading