diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 29feaeee3..4e34f5046 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -1,5 +1,6 @@ import asyncio import json +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from importlib import import_module from pathlib import Path @@ -70,29 +71,37 @@ def hidden(self) -> bool: _LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {} +# Global thread pool for offloading imports. Using a fixed executor allows +# better control and avoids unbounded thread creation when many imports are +# requested concurrently. +executor = ThreadPoolExecutor(max_workers=4) -def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: - """Resolve a `module:Class` spec to a Python class. - This function is synchronous and performs a normal import. Callers that - need to avoid blocking the event loop should invoke this via - `asyncio.to_thread(_resolve_local_agent_class, spec)`. +def _resolve_local_agent_class_sync(spec: str) -> Optional[Type[Any]]: + """Synchronous resolver used for fallback and direct calls. - Results are cached in `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated - imports/attribute lookups. + Keeps the original import behavior but is extracted so it can be invoked + from a thread pool via `run_in_executor`. """ - if not spec: return None cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) if cached is not None: + logger.debug("_resolve_local_agent_class_sync: cache hit for '{}'", spec) return cached try: module_path, class_name = spec.split(":", 1) + logger.info( + "_resolve_local_agent_class_sync: importing module '{}' for class '{}'", + module_path, + class_name, + ) module = import_module(module_path) + logger.info("_resolve_local_agent_class_sync: module imported, getting class") agent_cls = getattr(module, class_name) + logger.info("_resolve_local_agent_class_sync: class '{}' resolved", class_name) except (ValueError, AttributeError, ImportError) as exc: logger.error("Failed to import agent class '{}': {}", spec, exc) return None @@ -101,6 +110,37 @@ def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: return agent_cls +async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: + """Asynchronously resolve a `module:Class` spec to a Python class. + + The actual import is executed in a thread pool via `loop.run_in_executor` + to avoid blocking the event loop. Results are cached in + `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated imports. + """ + if not spec: + return None + + # Fast path: cache hit + cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) + if cached is not None: + logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec) + return cached + + loop = asyncio.get_running_loop() + # Delegate the synchronous import to the thread pool + try: + agent_cls = await loop.run_in_executor( + executor, _resolve_local_agent_class_sync, spec + ) + except Exception as exc: + logger.error( + "_resolve_local_agent_class: threaded import failed for '{}': {}", spec, exc + ) + return None + + return agent_cls + + async def _build_local_agent(ctx: AgentContext): """Asynchronously produce a wrapped local agent instance for the given `AgentContext`. @@ -109,8 +149,9 @@ async def _build_local_agent(ctx: AgentContext): - If `agent_instance_class` is already present, use it. - Otherwise, if `agent_class_spec` is provided, resolve it off the event loop (`asyncio.to_thread`) so imports don't block the loop. - - If resolution fails, log a warning and return `None` (caller will - treat missing factory as "no local agent available"). + A timeout is applied to prevent hangs on Windows where import lock + contention between threads and the event loop can cause deadlocks. + - If resolution fails or times out, fall back to synchronous import. - The actual wrapping call (`create_wrapped_agent`) is performed on the event loop; this preserves any asyncio-related initialization semantics required by the wrapper (if it needs loop context). @@ -118,10 +159,24 @@ async def _build_local_agent(ctx: AgentContext): agent_cls = ctx.agent_instance_class if agent_cls is None and ctx.agent_class_spec: - # Resolve the import in a worker thread to avoid blocking the loop. - agent_cls = await asyncio.to_thread( - _resolve_local_agent_class, ctx.agent_class_spec - ) + # Try resolving the import in a worker thread with a timeout. + # Use the async resolver which delegates to a thread pool. If the + # operation times out, attempt a direct import in the executor as a + # final fallback. + try: + agent_cls = await asyncio.wait_for( + _resolve_local_agent_class(ctx.agent_class_spec), timeout=5.0 + ) + except asyncio.TimeoutError: + logger.warning( + "Threaded import timed out for '{}', falling back to executor sync import", + ctx.agent_class_spec, + ) + loop = asyncio.get_running_loop() + agent_cls = await loop.run_in_executor( + executor, _resolve_local_agent_class_sync, ctx.agent_class_spec + ) + ctx.agent_instance_class = agent_cls if agent_cls is None: logger.warning( @@ -185,6 +240,7 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: ) return + logger.info(f"Loading agent cards from {agent_card_dir}") for json_file in agent_card_dir.glob("*.json"): try: # Read name minimally to resolve via helper @@ -217,12 +273,37 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: f"Failed to load agent card from {json_file}; skipping: {e}" ) continue + logger.info( + f"Loaded {len(self._contexts)} agent card(s) from {agent_card_dir}: {list(self._contexts.keys())}" + ) self._remote_contexts_loaded = True def _ensure_remote_contexts_loaded(self) -> None: if not self._remote_contexts_loaded: self._load_remote_contexts() + def preload_local_agent_classes(self) -> None: + """Preload all local agent classes synchronously at startup. + + This method should be called during application startup (before the + event loop processes requests) to avoid import deadlocks on Windows. + Importing Python modules in a worker thread while the main thread holds + the import lock can cause hangs. By importing everything upfront in + the main thread, we sidestep this issue entirely. + """ + self._ensure_remote_contexts_loaded() + for name, ctx in self._contexts.items(): + if ctx.agent_class_spec and ctx.agent_instance_class is None: + logger.info("Preloading agent class for '{}'", name) + cls = _resolve_local_agent_class_sync(ctx.agent_class_spec) + ctx.agent_instance_class = cls + if cls is None: + logger.warning( + "Failed to preload agent class '{}' for '{}'", + ctx.agent_class_spec, + name, + ) + # Public helper primarily for tests or tooling to load from a custom dir def load_from_dir(self, config_dir: str) -> None: """Load agent contexts from a specific directory of JSON card files.""" @@ -242,6 +323,9 @@ async def start_agent( """ # Use agent-specific lock to prevent concurrent starts of the same agent agent_lock = self._get_agent_lock(agent_name) + logger.info( + f"Request to start agent '{agent_name}' (with_listener={with_listener})" + ) async with agent_lock: ctx = await self._get_or_create_context(agent_name) @@ -297,6 +381,9 @@ async def _ensure_client(self, ctx: AgentContext) -> None: if not url: raise ValueError(f"Unable to determine URL for agent '{ctx.name}'") # Initialize a temporary client; only assign to context on success + logger.info( + f"Initializing client for '{ctx.name}' at {url} (listener_url={ctx.listener_url})" + ) tmp_client = AgentClient(url, push_notification_url=ctx.listener_url) try: await self._initialize_client(tmp_client, ctx) @@ -341,6 +428,7 @@ async def _ensure_agent_runtime(self, ctx: AgentContext) -> None: logger.info(f"Launching in-process agent '{ctx.name}'") if ctx.agent_task is None: + logger.info(f"Creating task to run in-process agent '{ctx.name}'") ctx.agent_task = asyncio.create_task(ctx.agent_instance.serve()) # Give the event loop a chance to schedule startup work await asyncio.sleep(0) @@ -359,9 +447,15 @@ async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> No """Initialize client with retry for local agents.""" retries = 3 if ctx.agent_task else 1 delay = 0.2 + logger.info( + f"_initialize_client: initializing client for '{ctx.name}' (retries={retries})" + ) for attempt in range(retries): try: await client.ensure_initialized() + logger.info( + f"Client initialized for '{ctx.name}' on attempt {attempt + 1}" + ) return except Exception as exc: if attempt >= retries - 1: diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 8fdd8879c..3d242f531 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import inspect import json from dataclasses import dataclass from pathlib import Path @@ -9,6 +10,7 @@ import pytest from a2a.client.client_factory import minimal_agent_card from a2a.types import AgentCard + from valuecell.core.agent import connect as connect_mod from valuecell.core.agent.connect import RemoteConnections @@ -130,6 +132,35 @@ async def test_load_from_dir_and_list(tmp_path: Path, monkeypatch: pytest.Monkey assert set(all_agents) == {"AgentAlpha", "AgentBeta"} +def test_preload_agent_classes(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Prepare an agent card with a local class spec + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = make_card_dict( + "LocalAgent", "http://127.0.0.1:8101", push_notifications=False + ) + # Add metadata with local_agent_class + card["metadata"] = { + "local_agent_class": "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + } + with open(dir_path / "LocalAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + # Before preload, agent_instance_class should be None + ctx = rc._contexts["LocalAgent"] + assert ctx.agent_instance_class is None + + # Call preload + rc.preload_local_agent_classes() + + # After preload, agent_instance_class should be set + assert ctx.agent_instance_class is not None + + @pytest.mark.asyncio async def test_start_agent_without_listener( tmp_path: Path, monkeypatch: pytest.MonkeyPatch @@ -439,11 +470,10 @@ async def test_resolve_local_agent_class_from_metadata( class DummyAgent: pass - monkeypatch.setattr( - connect_mod, - "_resolve_local_agent_class", - lambda spec: DummyAgent if spec == "fake:Spec" else None, - ) + async def _fake_resolver(spec): + return DummyAgent if spec == "fake:Spec" else None + + monkeypatch.setattr(connect_mod, "_resolve_local_agent_class", _fake_resolver) rc = RemoteConnections() rc.load_from_dir(str(dir_path)) @@ -503,7 +533,8 @@ async def ensure_initialized(self): def test_resolve_local_agent_class_empty_spec_returns_none(): - assert connect_mod._resolve_local_agent_class("") is None + # Use the synchronous resolver helper for direct, non-async assertions + assert connect_mod._resolve_local_agent_class_sync("") is None def test_resolve_local_agent_class_cache_hit(): @@ -511,17 +542,54 @@ def test_resolve_local_agent_class_cache_hit(): sentinel = object() connect_mod._LOCAL_AGENT_CLASS_CACHE[spec] = sentinel try: - assert connect_mod._resolve_local_agent_class(spec) is sentinel + # Call the synchronous resolver to verify cache hit + assert connect_mod._resolve_local_agent_class_sync(spec) is sentinel finally: connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) def test_resolve_local_agent_class_invalid_spec(): spec = "valuecell.nonexistent:Missing" - result = connect_mod._resolve_local_agent_class(spec) + # Use synchronous resolver helper for direct check + result = connect_mod._resolve_local_agent_class_sync(spec) assert result is None +@pytest.mark.asyncio +async def test_resolve_local_agent_class_async_cache_hit(): + spec = "cached:Spec" + sentinel = object() + connect_mod._LOCAL_AGENT_CLASS_CACHE[spec] = sentinel + try: + # Call the async resolver to verify cache hit + result = await connect_mod._resolve_local_agent_class(spec) + assert result is sentinel + finally: + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + +@pytest.mark.asyncio +async def test_resolve_local_agent_class_async_import_failure( + monkeypatch: pytest.MonkeyPatch, +): + spec = "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + # Clear cache to ensure we hit the import path + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + try: + # Mock the sync resolver to raise an exception + def failing_resolver(spec_arg): + raise ImportError("Simulated import failure") + + monkeypatch.setattr( + connect_mod, "_resolve_local_agent_class_sync", failing_resolver + ) + + result = await connect_mod._resolve_local_agent_class(spec) + assert result is None + finally: + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + @pytest.mark.asyncio async def test_build_local_agent_returns_none_when_no_class(): ctx = connect_mod.AgentContext(name="NoClass") @@ -547,6 +615,79 @@ class DummyAgent: assert await connect_mod._build_local_agent(ctx) is sentinel +@pytest.mark.asyncio +async def test_build_local_agent_threaded_success(monkeypatch: pytest.MonkeyPatch): + """When asyncio.to_thread returns quickly, the threaded path should be used.""" + spec = "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + + # Ensure cache is clear for deterministic behavior + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + async def fake_to_thread(func, arg): + # emulate correct threaded resolution + return connect_mod._resolve_local_agent_class(arg) + + monkeypatch.setattr(asyncio, "to_thread", fake_to_thread) + + ctx = connect_mod.AgentContext( + name="PromptBasedStrategyAgent", agent_class_spec=spec + ) + + inst = await connect_mod._build_local_agent(ctx) + + assert inst is not None + # returned object should be an instance (callable class instance) + assert not inspect.isclass(inst) + + +@pytest.mark.asyncio +async def test_build_local_agent_threaded_timeout_fallback( + monkeypatch: pytest.MonkeyPatch, +): + """If the threaded import times out, code should fall back to sync import.""" + spec = "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + # Make asyncio.wait_for raise TimeoutError to exercise the except branch + original_wait_for = asyncio.wait_for + + def fake_wait_for(awaitable, timeout=None): + # If a coroutine/future was passed, ensure it's closed/cancelled so + # the test does not leave an un-awaited coroutine (which would emit + # a RuntimeWarning). Real asyncio.wait_for would cancel the inner + # awaitable on timeout; emulate that behavior here. + try: + if hasattr(awaitable, "cancel"): + try: + awaitable.cancel() + except Exception: + pass + elif hasattr(awaitable, "close"): + try: + awaitable.close() + except Exception: + pass + finally: + raise asyncio.TimeoutError() + + monkeypatch.setattr(asyncio, "wait_for", fake_wait_for) + + # Sync resolver should still succeed + ctx = connect_mod.AgentContext( + name="PromptBasedStrategyAgent", agent_class_spec=spec + ) + + try: + inst = await connect_mod._build_local_agent(ctx) + finally: + # restore + monkeypatch.setattr(asyncio, "wait_for", original_wait_for) + + assert inst is not None + assert not inspect.isclass(inst) + + @pytest.mark.asyncio async def test_ensure_agent_runtime_returns_when_task_running(): rc = RemoteConnections() diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 931e29433..7f8fc1e9c 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -121,6 +121,10 @@ async def emit(item: Optional[BaseResponse]): # Never fail producer due to queue issues; just drop pass + logger.info( + "process_user_input: starting background session for conversation {}", + user_input.meta.conversation_id, + ) # Start background producer asyncio.create_task(self._run_session(user_input, emit)) @@ -193,11 +197,19 @@ async def _generate_responses( yield await self.event_service.emit(started) if conversation.status == ConversationStatus.REQUIRE_USER_INPUT: + logger.info( + "_generate_responses: resuming conversation {} in REQUIRE_USER_INPUT", + conversation_id, + ) async for response in self._handle_conversation_continuation( user_input ): yield response else: + logger.info( + "_generate_responses: handling new request for conversation {}", + conversation_id, + ) async for response in self._handle_new_request(user_input): yield response @@ -369,10 +381,19 @@ async def _handle_new_request( # 2) Planner phase (existing logic) # Create planning task with user input callback + logger.info( + "_handle_new_request: starting planner for conversation {}, thread {}", + conversation_id, + thread_id, + ) context_aware_callback = self._create_context_aware_callback(conversation_id) planning_task = self.plan_service.start_planning_task( user_input, thread_id, context_aware_callback ) + logger.info( + "_handle_new_request: planner task started for conversation {}", + conversation_id, + ) # Monitor planning progress async for response in self._monitor_planning_task( @@ -428,6 +449,10 @@ async def _monitor_planning_task( ) # Wait for planning completion or user input request + logger.info( + "_monitor_planning_task: monitoring planning task for conversation {}", + conversation_id, + ) while not planning_task.done(): if self.plan_service.has_pending_request(conversation_id): # Save planning context @@ -454,6 +479,10 @@ async def _monitor_planning_task( await asyncio.sleep(ASYNC_SLEEP_INTERVAL) + logger.info( + "_monitor_planning_task: planning completed for conversation {}; executing plan", + conversation_id, + ) # Planning completed, execute plan plan: "ExecutionPlan" = await planning_task diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index ccb7cbab7..252d5687f 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -537,3 +537,32 @@ async def _run(user_input): if getattr(resp, "data", None) and getattr(resp.data, "payload", None) ] assert any("Concise reply" in content for content in payload_contents) + + +@pytest.mark.asyncio +async def test_handle_user_input_for_require_user_input_conversation( + orchestrator: AgentOrchestrator, + sample_user_input: UserInput, + mock_conversation_manager: Mock, +): + # Mock a conversation in REQUIRE_USER_INPUT status + mock_conversation = Mock() + mock_conversation.status = ConversationStatus.REQUIRE_USER_INPUT + mock_conversation.id = "test-conv-id" + mock_conversation_manager.get_conversation.return_value = mock_conversation + + # Mock _handle_conversation_continuation to yield a response + mock_response = Mock() + + async def mock_continuation(user_input): + yield mock_response + + orchestrator._handle_conversation_continuation = mock_continuation + + # Execute + responses = [] + async for resp in orchestrator.process_user_input(sample_user_input): + responses.append(resp) + + # Verify the response was yielded + assert mock_response in responses diff --git a/python/valuecell/core/task/executor.py b/python/valuecell/core/task/executor.py index c2e1beda4..81536a69f 100644 --- a/python/valuecell/core/task/executor.py +++ b/python/valuecell/core/task/executor.py @@ -343,7 +343,17 @@ async def _execute_single_task_run( ) ) + logger.info( + "_execute_single_task_run: acquiring client for agent {} (task={})", + agent_name, + task.task_id, + ) client = await self._agent_connections.get_client(agent_name) + logger.info( + "_execute_single_task_run: acquired client for agent {} (task={})", + agent_name, + task.task_id, + ) if not client: # Emit a TOOL_CALL_COMPLETED with a failure message (no client) yield await self._event_service.emit( @@ -372,6 +382,11 @@ async def _execute_single_task_run( ) ) + logger.info( + "_execute_single_task_run: sending message to agent {} (task={})", + agent_name, + task.task_id, + ) remote_response = await client.send_message( task.query, conversation_id=task.conversation_id, @@ -415,7 +430,7 @@ async def _execute_single_task_run( if isinstance(event, TaskArtifactUpdateEvent): logger.info( - "Received unexpected artifact update for task %s: %s", + "Received unexpected artifact update for task {}: {}", task.task_id, event, ) diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index a35523d4a..d04074f93 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -138,6 +138,17 @@ async def lifespan(app: FastAPI): except Exception as e: logger.info(f"Error configuring adapters: {e}") + # Preload local agent classes to avoid import deadlocks on Windows + try: + from valuecell.core.agent.connect import RemoteConnections + + logger.info("Preloading local agent classes...") + rc = RemoteConnections() + rc.preload_local_agent_classes() + logger.info("✓ Local agent classes preloaded") + except Exception as e: + logger.warning(f"✗ Failed to preload local agent classes: {e}") + yield # Shutdown logger.info("ValueCell Server shutting down...")