From dfe88410ec4020078bc52655e12fd5b73fa32c1b Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 13:55:34 +0800 Subject: [PATCH 01/11] fix(agent): add timeout handling for local agent class resolution to prevent deadlocks on Windows --- python/valuecell/core/agent/connect.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 29feaeee3..1ba6c8ec8 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -109,8 +109,9 @@ async def _build_local_agent(ctx: AgentContext): - If `agent_instance_class` is already present, use it. - Otherwise, if `agent_class_spec` is provided, resolve it off the event loop (`asyncio.to_thread`) so imports don't block the loop. - - If resolution fails, log a warning and return `None` (caller will - treat missing factory as "no local agent available"). + A timeout is applied to prevent hangs on Windows where import lock + contention between threads and the event loop can cause deadlocks. + - If resolution fails or times out, fall back to synchronous import. - The actual wrapping call (`create_wrapped_agent`) is performed on the event loop; this preserves any asyncio-related initialization semantics required by the wrapper (if it needs loop context). @@ -118,10 +119,21 @@ async def _build_local_agent(ctx: AgentContext): agent_cls = ctx.agent_instance_class if agent_cls is None and ctx.agent_class_spec: - # Resolve the import in a worker thread to avoid blocking the loop. - agent_cls = await asyncio.to_thread( - _resolve_local_agent_class, ctx.agent_class_spec - ) + # Try resolving the import in a worker thread with a timeout. + # On Windows, import lock contention can cause hangs, so we fall back + # to synchronous import if the threaded approach times out. + try: + agent_cls = await asyncio.wait_for( + asyncio.to_thread(_resolve_local_agent_class, ctx.agent_class_spec), + timeout=5.0, + ) + except asyncio.TimeoutError: + logger.warning( + "Threaded import timed out for '{}', falling back to sync import", + ctx.agent_class_spec, + ) + agent_cls = _resolve_local_agent_class(ctx.agent_class_spec) + ctx.agent_instance_class = agent_cls if agent_cls is None: logger.warning( From fe1d239dcdd9833aa301354249ce84d6e94089bb Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 14:08:36 +0800 Subject: [PATCH 02/11] fix(agent): add tests for threaded local agent resolution with timeout fallback --- .../core/agent/tests/test_connect.py | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 8fdd8879c..cc618b5e9 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import inspect import json from dataclasses import dataclass from pathlib import Path @@ -9,6 +10,7 @@ import pytest from a2a.client.client_factory import minimal_agent_card from a2a.types import AgentCard + from valuecell.core.agent import connect as connect_mod from valuecell.core.agent.connect import RemoteConnections @@ -547,6 +549,63 @@ class DummyAgent: assert await connect_mod._build_local_agent(ctx) is sentinel +@pytest.mark.asyncio +async def test_build_local_agent_threaded_success(monkeypatch: pytest.MonkeyPatch): + """When asyncio.to_thread returns quickly, the threaded path should be used.""" + spec = "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + + # Ensure cache is clear for deterministic behavior + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + async def fake_to_thread(func, arg): + # emulate correct threaded resolution + return connect_mod._resolve_local_agent_class(arg) + + monkeypatch.setattr(asyncio, "to_thread", fake_to_thread) + + ctx = connect_mod.AgentContext( + name="PromptBasedStrategyAgent", agent_class_spec=spec + ) + + inst = await connect_mod._build_local_agent(ctx) + + assert inst is not None + # returned object should be an instance (callable class instance) + assert not inspect.isclass(inst) + + +@pytest.mark.asyncio +async def test_build_local_agent_threaded_timeout_fallback( + monkeypatch: pytest.MonkeyPatch, +): + """If the threaded import times out, code should fall back to sync import.""" + spec = "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + # Make asyncio.wait_for raise TimeoutError to exercise the except branch + original_wait_for = asyncio.wait_for + + def fake_wait_for(awaitable, timeout=None): + raise asyncio.TimeoutError() + + monkeypatch.setattr(asyncio, "wait_for", fake_wait_for) + + # Sync resolver should still succeed + ctx = connect_mod.AgentContext( + name="PromptBasedStrategyAgent", agent_class_spec=spec + ) + + try: + inst = await connect_mod._build_local_agent(ctx) + finally: + # restore + monkeypatch.setattr(asyncio, "wait_for", original_wait_for) + + assert inst is not None + assert not inspect.isclass(inst) + + @pytest.mark.asyncio async def test_ensure_agent_runtime_returns_when_task_running(): rc = RemoteConnections() From 1d0d7542a35ce355dfd9fee40bfa953d723a63d1 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 14:24:55 +0800 Subject: [PATCH 03/11] fix(logging): enhance logging for agent and task execution processes --- python/valuecell/core/agent/connect.py | 17 +++++++++++ .../valuecell/core/coordinate/orchestrator.py | 29 +++++++++++++++++++ python/valuecell/core/task/executor.py | 17 ++++++++++- 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 1ba6c8ec8..847ed6b7e 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -197,6 +197,7 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: ) return + logger.info(f"Loading agent cards from {agent_card_dir}") for json_file in agent_card_dir.glob("*.json"): try: # Read name minimally to resolve via helper @@ -229,6 +230,9 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: f"Failed to load agent card from {json_file}; skipping: {e}" ) continue + logger.info( + f"Loaded {len(self._contexts)} agent card(s) from {agent_card_dir}: {list(self._contexts.keys())}" + ) self._remote_contexts_loaded = True def _ensure_remote_contexts_loaded(self) -> None: @@ -254,6 +258,9 @@ async def start_agent( """ # Use agent-specific lock to prevent concurrent starts of the same agent agent_lock = self._get_agent_lock(agent_name) + logger.info( + f"Request to start agent '{agent_name}' (with_listener={with_listener})" + ) async with agent_lock: ctx = await self._get_or_create_context(agent_name) @@ -309,6 +316,9 @@ async def _ensure_client(self, ctx: AgentContext) -> None: if not url: raise ValueError(f"Unable to determine URL for agent '{ctx.name}'") # Initialize a temporary client; only assign to context on success + logger.info( + f"Initializing client for '{ctx.name}' at {url} (listener_url={ctx.listener_url})" + ) tmp_client = AgentClient(url, push_notification_url=ctx.listener_url) try: await self._initialize_client(tmp_client, ctx) @@ -353,6 +363,7 @@ async def _ensure_agent_runtime(self, ctx: AgentContext) -> None: logger.info(f"Launching in-process agent '{ctx.name}'") if ctx.agent_task is None: + logger.info(f"Creating task to run in-process agent '{ctx.name}'") ctx.agent_task = asyncio.create_task(ctx.agent_instance.serve()) # Give the event loop a chance to schedule startup work await asyncio.sleep(0) @@ -371,9 +382,15 @@ async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> No """Initialize client with retry for local agents.""" retries = 3 if ctx.agent_task else 1 delay = 0.2 + logger.info( + f"_initialize_client: initializing client for '{ctx.name}' (retries={retries})" + ) for attempt in range(retries): try: await client.ensure_initialized() + logger.info( + f"Client initialized for '{ctx.name}' on attempt {attempt + 1}" + ) return except Exception as exc: if attempt >= retries - 1: diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 931e29433..7f8fc1e9c 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -121,6 +121,10 @@ async def emit(item: Optional[BaseResponse]): # Never fail producer due to queue issues; just drop pass + logger.info( + "process_user_input: starting background session for conversation {}", + user_input.meta.conversation_id, + ) # Start background producer asyncio.create_task(self._run_session(user_input, emit)) @@ -193,11 +197,19 @@ async def _generate_responses( yield await self.event_service.emit(started) if conversation.status == ConversationStatus.REQUIRE_USER_INPUT: + logger.info( + "_generate_responses: resuming conversation {} in REQUIRE_USER_INPUT", + conversation_id, + ) async for response in self._handle_conversation_continuation( user_input ): yield response else: + logger.info( + "_generate_responses: handling new request for conversation {}", + conversation_id, + ) async for response in self._handle_new_request(user_input): yield response @@ -369,10 +381,19 @@ async def _handle_new_request( # 2) Planner phase (existing logic) # Create planning task with user input callback + logger.info( + "_handle_new_request: starting planner for conversation {}, thread {}", + conversation_id, + thread_id, + ) context_aware_callback = self._create_context_aware_callback(conversation_id) planning_task = self.plan_service.start_planning_task( user_input, thread_id, context_aware_callback ) + logger.info( + "_handle_new_request: planner task started for conversation {}", + conversation_id, + ) # Monitor planning progress async for response in self._monitor_planning_task( @@ -428,6 +449,10 @@ async def _monitor_planning_task( ) # Wait for planning completion or user input request + logger.info( + "_monitor_planning_task: monitoring planning task for conversation {}", + conversation_id, + ) while not planning_task.done(): if self.plan_service.has_pending_request(conversation_id): # Save planning context @@ -454,6 +479,10 @@ async def _monitor_planning_task( await asyncio.sleep(ASYNC_SLEEP_INTERVAL) + logger.info( + "_monitor_planning_task: planning completed for conversation {}; executing plan", + conversation_id, + ) # Planning completed, execute plan plan: "ExecutionPlan" = await planning_task diff --git a/python/valuecell/core/task/executor.py b/python/valuecell/core/task/executor.py index c2e1beda4..81536a69f 100644 --- a/python/valuecell/core/task/executor.py +++ b/python/valuecell/core/task/executor.py @@ -343,7 +343,17 @@ async def _execute_single_task_run( ) ) + logger.info( + "_execute_single_task_run: acquiring client for agent {} (task={})", + agent_name, + task.task_id, + ) client = await self._agent_connections.get_client(agent_name) + logger.info( + "_execute_single_task_run: acquired client for agent {} (task={})", + agent_name, + task.task_id, + ) if not client: # Emit a TOOL_CALL_COMPLETED with a failure message (no client) yield await self._event_service.emit( @@ -372,6 +382,11 @@ async def _execute_single_task_run( ) ) + logger.info( + "_execute_single_task_run: sending message to agent {} (task={})", + agent_name, + task.task_id, + ) remote_response = await client.send_message( task.query, conversation_id=task.conversation_id, @@ -415,7 +430,7 @@ async def _execute_single_task_run( if isinstance(event, TaskArtifactUpdateEvent): logger.info( - "Received unexpected artifact update for task %s: %s", + "Received unexpected artifact update for task {}: {}", task.task_id, event, ) From 71e7d6a5e1546afb0ce4e658e0ddac29896ad678 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 15:27:06 +0800 Subject: [PATCH 04/11] fix(agent): implement local agent class preloading to prevent import deadlocks on Windows --- python/valuecell/core/agent/connect.py | 30 ++++++++++++++++++++++++++ python/valuecell/server/api/app.py | 11 ++++++++++ 2 files changed, 41 insertions(+) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 847ed6b7e..ed57fdeb7 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -87,12 +87,20 @@ def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) if cached is not None: + logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec) return cached try: module_path, class_name = spec.split(":", 1) + logger.info( + "_resolve_local_agent_class: importing module '{}' for class '{}'", + module_path, + class_name, + ) module = import_module(module_path) + logger.info("_resolve_local_agent_class: module imported, getting class") agent_cls = getattr(module, class_name) + logger.info("_resolve_local_agent_class: class '{}' resolved", class_name) except (ValueError, AttributeError, ImportError) as exc: logger.error("Failed to import agent class '{}': {}", spec, exc) return None @@ -239,6 +247,28 @@ def _ensure_remote_contexts_loaded(self) -> None: if not self._remote_contexts_loaded: self._load_remote_contexts() + def preload_local_agent_classes(self) -> None: + """Preload all local agent classes synchronously at startup. + + This method should be called during application startup (before the + event loop processes requests) to avoid import deadlocks on Windows. + Importing Python modules in a worker thread while the main thread holds + the import lock can cause hangs. By importing everything upfront in + the main thread, we sidestep this issue entirely. + """ + self._ensure_remote_contexts_loaded() + for name, ctx in self._contexts.items(): + if ctx.agent_class_spec and ctx.agent_instance_class is None: + logger.info("Preloading agent class for '{}'", name) + cls = _resolve_local_agent_class(ctx.agent_class_spec) + ctx.agent_instance_class = cls + if cls is None: + logger.warning( + "Failed to preload agent class '{}' for '{}'", + ctx.agent_class_spec, + name, + ) + # Public helper primarily for tests or tooling to load from a custom dir def load_from_dir(self, config_dir: str) -> None: """Load agent contexts from a specific directory of JSON card files.""" diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index a35523d4a..d04074f93 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -138,6 +138,17 @@ async def lifespan(app: FastAPI): except Exception as e: logger.info(f"Error configuring adapters: {e}") + # Preload local agent classes to avoid import deadlocks on Windows + try: + from valuecell.core.agent.connect import RemoteConnections + + logger.info("Preloading local agent classes...") + rc = RemoteConnections() + rc.preload_local_agent_classes() + logger.info("✓ Local agent classes preloaded") + except Exception as e: + logger.warning(f"✗ Failed to preload local agent classes: {e}") + yield # Shutdown logger.info("ValueCell Server shutting down...") From bcfe456b87aea5058ec45d440639d238879c4603 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 15:30:24 +0800 Subject: [PATCH 05/11] fix(main): add ssl import to resolve potential security issues --- python/valuecell/server/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/valuecell/server/main.py b/python/valuecell/server/main.py index 6e075df3a..5cd0cca8a 100644 --- a/python/valuecell/server/main.py +++ b/python/valuecell/server/main.py @@ -4,6 +4,7 @@ import io import os +import ssl as _ # noqa: F401 import sys import threading from typing import Callable, Optional, TextIO From dfb0dcbd3589f571fd61f2e8653f6e6e27091140 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:02:10 +0800 Subject: [PATCH 06/11] fix(agent): enhance local agent class resolution with async support and timeout handling --- python/valuecell/core/agent/connect.py | 67 +++++++++++++++++++------- 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index ed57fdeb7..87245e275 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -2,6 +2,7 @@ import json from dataclasses import dataclass from importlib import import_module +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Any, Dict, List, Optional, Type @@ -70,37 +71,37 @@ def hidden(self) -> bool: _LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {} +# Global thread pool for offloading imports. Using a fixed executor allows +# better control and avoids unbounded thread creation when many imports are +# requested concurrently. +executor = ThreadPoolExecutor(max_workers=4) -def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: - """Resolve a `module:Class` spec to a Python class. - This function is synchronous and performs a normal import. Callers that - need to avoid blocking the event loop should invoke this via - `asyncio.to_thread(_resolve_local_agent_class, spec)`. +def _resolve_local_agent_class_sync(spec: str) -> Optional[Type[Any]]: + """Synchronous resolver used for fallback and direct calls. - Results are cached in `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated - imports/attribute lookups. + Keeps the original import behavior but is extracted so it can be invoked + from a thread pool via `run_in_executor`. """ - if not spec: return None cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) if cached is not None: - logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec) + logger.debug("_resolve_local_agent_class_sync: cache hit for '{}'", spec) return cached try: module_path, class_name = spec.split(":", 1) logger.info( - "_resolve_local_agent_class: importing module '{}' for class '{}'", + "_resolve_local_agent_class_sync: importing module '{}' for class '{}'", module_path, class_name, ) module = import_module(module_path) - logger.info("_resolve_local_agent_class: module imported, getting class") + logger.info("_resolve_local_agent_class_sync: module imported, getting class") agent_cls = getattr(module, class_name) - logger.info("_resolve_local_agent_class: class '{}' resolved", class_name) + logger.info("_resolve_local_agent_class_sync: class '{}' resolved", class_name) except (ValueError, AttributeError, ImportError) as exc: logger.error("Failed to import agent class '{}': {}", spec, exc) return None @@ -109,6 +110,33 @@ def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: return agent_cls +async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: + """Asynchronously resolve a `module:Class` spec to a Python class. + + The actual import is executed in a thread pool via `loop.run_in_executor` + to avoid blocking the event loop. Results are cached in + `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated imports. + """ + if not spec: + return None + + # Fast path: cache hit + cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) + if cached is not None: + logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec) + return cached + + loop = asyncio.get_running_loop() + # Delegate the synchronous import to the thread pool + try: + agent_cls = await loop.run_in_executor(executor, _resolve_local_agent_class_sync, spec) + except Exception as exc: + logger.error("_resolve_local_agent_class: threaded import failed for '{}': {}", spec, exc) + return None + + return agent_cls + + async def _build_local_agent(ctx: AgentContext): """Asynchronously produce a wrapped local agent instance for the given `AgentContext`. @@ -128,19 +156,22 @@ async def _build_local_agent(ctx: AgentContext): agent_cls = ctx.agent_instance_class if agent_cls is None and ctx.agent_class_spec: # Try resolving the import in a worker thread with a timeout. - # On Windows, import lock contention can cause hangs, so we fall back - # to synchronous import if the threaded approach times out. + # Use the async resolver which delegates to a thread pool. If the + # operation times out, attempt a direct import in the executor as a + # final fallback. try: agent_cls = await asyncio.wait_for( - asyncio.to_thread(_resolve_local_agent_class, ctx.agent_class_spec), - timeout=5.0, + _resolve_local_agent_class(ctx.agent_class_spec), timeout=5.0 ) except asyncio.TimeoutError: logger.warning( - "Threaded import timed out for '{}', falling back to sync import", + "Threaded import timed out for '{}', falling back to executor sync import", ctx.agent_class_spec, ) - agent_cls = _resolve_local_agent_class(ctx.agent_class_spec) + loop = asyncio.get_running_loop() + agent_cls = await loop.run_in_executor( + executor, _resolve_local_agent_class_sync, ctx.agent_class_spec + ) ctx.agent_instance_class = agent_cls if agent_cls is None: From b829ee26a733175ed57f52d60f43a66ba4e295b8 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:08:11 +0800 Subject: [PATCH 07/11] make format --- python/valuecell/core/agent/connect.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 87245e275..14f7b16df 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -1,8 +1,8 @@ import asyncio import json +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from importlib import import_module -from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Any, Dict, List, Optional, Type @@ -129,9 +129,13 @@ async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: loop = asyncio.get_running_loop() # Delegate the synchronous import to the thread pool try: - agent_cls = await loop.run_in_executor(executor, _resolve_local_agent_class_sync, spec) + agent_cls = await loop.run_in_executor( + executor, _resolve_local_agent_class_sync, spec + ) except Exception as exc: - logger.error("_resolve_local_agent_class: threaded import failed for '{}': {}", spec, exc) + logger.error( + "_resolve_local_agent_class: threaded import failed for '{}': {}", spec, exc + ) return None return agent_cls From dd540eba709f7e37e54359f3a9e618dc11f03383 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:23:48 +0800 Subject: [PATCH 08/11] Revert "fix(main): add ssl import to resolve potential security issues" This reverts commit bcfe456b87aea5058ec45d440639d238879c4603. --- python/valuecell/server/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/valuecell/server/main.py b/python/valuecell/server/main.py index 5cd0cca8a..6e075df3a 100644 --- a/python/valuecell/server/main.py +++ b/python/valuecell/server/main.py @@ -4,7 +4,6 @@ import io import os -import ssl as _ # noqa: F401 import sys import threading from typing import Callable, Optional, TextIO From 8a5a8f680cc5466d0bfc714209d648aa24bbf20e Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:26:01 +0800 Subject: [PATCH 09/11] fix(tests) --- .../core/agent/tests/test_connect.py | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index cc618b5e9..899b9f509 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -441,11 +441,10 @@ async def test_resolve_local_agent_class_from_metadata( class DummyAgent: pass - monkeypatch.setattr( - connect_mod, - "_resolve_local_agent_class", - lambda spec: DummyAgent if spec == "fake:Spec" else None, - ) + async def _fake_resolver(spec): + return DummyAgent if spec == "fake:Spec" else None + + monkeypatch.setattr(connect_mod, "_resolve_local_agent_class", _fake_resolver) rc = RemoteConnections() rc.load_from_dir(str(dir_path)) @@ -505,7 +504,8 @@ async def ensure_initialized(self): def test_resolve_local_agent_class_empty_spec_returns_none(): - assert connect_mod._resolve_local_agent_class("") is None + # Use the synchronous resolver helper for direct, non-async assertions + assert connect_mod._resolve_local_agent_class_sync("") is None def test_resolve_local_agent_class_cache_hit(): @@ -513,14 +513,16 @@ def test_resolve_local_agent_class_cache_hit(): sentinel = object() connect_mod._LOCAL_AGENT_CLASS_CACHE[spec] = sentinel try: - assert connect_mod._resolve_local_agent_class(spec) is sentinel + # Call the synchronous resolver to verify cache hit + assert connect_mod._resolve_local_agent_class_sync(spec) is sentinel finally: connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) def test_resolve_local_agent_class_invalid_spec(): spec = "valuecell.nonexistent:Missing" - result = connect_mod._resolve_local_agent_class(spec) + # Use synchronous resolver helper for direct check + result = connect_mod._resolve_local_agent_class_sync(spec) assert result is None @@ -587,7 +589,23 @@ async def test_build_local_agent_threaded_timeout_fallback( original_wait_for = asyncio.wait_for def fake_wait_for(awaitable, timeout=None): - raise asyncio.TimeoutError() + # If a coroutine/future was passed, ensure it's closed/cancelled so + # the test does not leave an un-awaited coroutine (which would emit + # a RuntimeWarning). Real asyncio.wait_for would cancel the inner + # awaitable on timeout; emulate that behavior here. + try: + if hasattr(awaitable, "cancel"): + try: + awaitable.cancel() + except Exception: + pass + elif hasattr(awaitable, "close"): + try: + awaitable.close() + except Exception: + pass + finally: + raise asyncio.TimeoutError() monkeypatch.setattr(asyncio, "wait_for", fake_wait_for) From 1f130a0c6731782e2909930db5c0640d77cf65a4 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:38:32 +0800 Subject: [PATCH 10/11] fix(tests) --- python/valuecell/core/agent/connect.py | 2 +- .../core/agent/tests/test_connect.py | 51 +++++++++++++++++++ .../coordinate/tests/test_orchestrator.py | 28 ++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 14f7b16df..4e34f5046 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -295,7 +295,7 @@ def preload_local_agent_classes(self) -> None: for name, ctx in self._contexts.items(): if ctx.agent_class_spec and ctx.agent_instance_class is None: logger.info("Preloading agent class for '{}'", name) - cls = _resolve_local_agent_class(ctx.agent_class_spec) + cls = _resolve_local_agent_class_sync(ctx.agent_class_spec) ctx.agent_instance_class = cls if cls is None: logger.warning( diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 899b9f509..8f4acded6 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -132,6 +132,31 @@ async def test_load_from_dir_and_list(tmp_path: Path, monkeypatch: pytest.Monkey assert set(all_agents) == {"AgentAlpha", "AgentBeta"} +def test_preload_agent_classes(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Prepare an agent card with a local class spec + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = make_card_dict("LocalAgent", "http://127.0.0.1:8101", push_notifications=False) + # Add metadata with local_agent_class + card["metadata"] = {"local_agent_class": "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent"} + with open(dir_path / "LocalAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + # Before preload, agent_instance_class should be None + ctx = rc._contexts["LocalAgent"] + assert ctx.agent_instance_class is None + + # Call preload + rc.preload_local_agent_classes() + + # After preload, agent_instance_class should be set + assert ctx.agent_instance_class is not None + + @pytest.mark.asyncio async def test_start_agent_without_listener( tmp_path: Path, monkeypatch: pytest.MonkeyPatch @@ -526,6 +551,32 @@ def test_resolve_local_agent_class_invalid_spec(): assert result is None +@pytest.mark.asyncio +async def test_resolve_local_agent_class_async_cache_hit(): + spec = "cached:Spec" + sentinel = object() + connect_mod._LOCAL_AGENT_CLASS_CACHE[spec] = sentinel + try: + # Call the async resolver to verify cache hit + result = await connect_mod._resolve_local_agent_class(spec) + assert result is sentinel + finally: + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + +@pytest.mark.asyncio +async def test_resolve_local_agent_class_async_import_failure(monkeypatch: pytest.MonkeyPatch): + spec = "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + # Mock the sync resolver to raise an exception + def failing_resolver(spec_arg): + raise ImportError("Simulated import failure") + + monkeypatch.setattr(connect_mod, "_resolve_local_agent_class_sync", failing_resolver) + + result = await connect_mod._resolve_local_agent_class(spec) + assert result is None + + @pytest.mark.asyncio async def test_build_local_agent_returns_none_when_no_class(): ctx = connect_mod.AgentContext(name="NoClass") diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index ccb7cbab7..160bf7e2d 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -537,3 +537,31 @@ async def _run(user_input): if getattr(resp, "data", None) and getattr(resp.data, "payload", None) ] assert any("Concise reply" in content for content in payload_contents) + + +@pytest.mark.asyncio +async def test_handle_user_input_for_require_user_input_conversation( + orchestrator: AgentOrchestrator, + sample_user_input: UserInput, + mock_conversation_manager: Mock, +): + # Mock a conversation in REQUIRE_USER_INPUT status + mock_conversation = Mock() + mock_conversation.status = ConversationStatus.REQUIRE_USER_INPUT + mock_conversation.id = "test-conv-id" + mock_conversation_manager.get_conversation.return_value = mock_conversation + + # Mock _handle_conversation_continuation to yield a response + mock_response = Mock() + async def mock_continuation(user_input): + yield mock_response + + orchestrator._handle_conversation_continuation = mock_continuation + + # Execute + responses = [] + async for resp in orchestrator.process_user_input(sample_user_input): + responses.append(resp) + + # Verify the response was yielded + assert mock_response in responses From 0da85aadbf33fe917786d5a3754574f72676e4b6 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:40:53 +0800 Subject: [PATCH 11/11] fix tests --- .../core/agent/tests/test_connect.py | 35 +++++++++++++------ .../coordinate/tests/test_orchestrator.py | 3 +- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 8f4acded6..3d242f531 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -137,9 +137,13 @@ def test_preload_agent_classes(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): dir_path = tmp_path / "agent_cards" dir_path.mkdir(parents=True) - card = make_card_dict("LocalAgent", "http://127.0.0.1:8101", push_notifications=False) + card = make_card_dict( + "LocalAgent", "http://127.0.0.1:8101", push_notifications=False + ) # Add metadata with local_agent_class - card["metadata"] = {"local_agent_class": "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent"} + card["metadata"] = { + "local_agent_class": "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + } with open(dir_path / "LocalAgent.json", "w", encoding="utf-8") as f: json.dump(card, f) @@ -565,16 +569,25 @@ async def test_resolve_local_agent_class_async_cache_hit(): @pytest.mark.asyncio -async def test_resolve_local_agent_class_async_import_failure(monkeypatch: pytest.MonkeyPatch): +async def test_resolve_local_agent_class_async_import_failure( + monkeypatch: pytest.MonkeyPatch, +): spec = "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" - # Mock the sync resolver to raise an exception - def failing_resolver(spec_arg): - raise ImportError("Simulated import failure") - - monkeypatch.setattr(connect_mod, "_resolve_local_agent_class_sync", failing_resolver) - - result = await connect_mod._resolve_local_agent_class(spec) - assert result is None + # Clear cache to ensure we hit the import path + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + try: + # Mock the sync resolver to raise an exception + def failing_resolver(spec_arg): + raise ImportError("Simulated import failure") + + monkeypatch.setattr( + connect_mod, "_resolve_local_agent_class_sync", failing_resolver + ) + + result = await connect_mod._resolve_local_agent_class(spec) + assert result is None + finally: + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) @pytest.mark.asyncio diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index 160bf7e2d..252d5687f 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -553,9 +553,10 @@ async def test_handle_user_input_for_require_user_input_conversation( # Mock _handle_conversation_continuation to yield a response mock_response = Mock() + async def mock_continuation(user_input): yield mock_response - + orchestrator._handle_conversation_continuation = mock_continuation # Execute