diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 4e34f5046..09d7ee5f3 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -123,9 +123,13 @@ async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: # Fast path: cache hit cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) if cached is not None: - logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec) + logger.info("_resolve_local_agent_class: cache hit for '{}'", spec) return cached + logger.info( + "_resolve_local_agent_class: cache miss for '{}', delegating to executor", spec + ) + loop = asyncio.get_running_loop() # Delegate the synchronous import to the thread pool try: @@ -292,17 +296,39 @@ def preload_local_agent_classes(self) -> None: the main thread, we sidestep this issue entirely. """ self._ensure_remote_contexts_loaded() + preloaded_count = 0 for name, ctx in self._contexts.items(): - if ctx.agent_class_spec and ctx.agent_instance_class is None: - logger.info("Preloading agent class for '{}'", name) - cls = _resolve_local_agent_class_sync(ctx.agent_class_spec) - ctx.agent_instance_class = cls - if cls is None: - logger.warning( - "Failed to preload agent class '{}' for '{}'", - ctx.agent_class_spec, - name, - ) + if not ctx.agent_class_spec: + logger.debug("Skipping preload for '{}': no agent_class_spec", name) + continue + if ctx.agent_instance_class is not None: + logger.debug("Skipping preload for '{}': class already loaded", name) + continue + logger.info( + "Preloading agent class for '{}' (spec='{}')", + name, + ctx.agent_class_spec, + ) + cls = _resolve_local_agent_class_sync(ctx.agent_class_spec) + ctx.agent_instance_class = cls + if cls is None: + logger.warning( + "Failed to preload agent class '{}' for '{}'", + ctx.agent_class_spec, + name, + ) + else: + preloaded_count += 1 + logger.info( + "Successfully preloaded class '{}' for '{}'", + cls.__name__, + name, + ) + logger.info( + "Preload complete: {}/{} agent classes loaded", + preloaded_count, + len(self._contexts), + ) # Public helper primarily for tests or tooling to load from a custom dir def load_from_dir(self, config_dir: str) -> None: diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 3d242f531..989846507 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -161,6 +161,85 @@ def test_preload_agent_classes(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): assert ctx.agent_instance_class is not None +def test_preload_skips_agents_without_class_spec(tmp_path: Path): + """Test that preload skips agents without agent_class_spec.""" + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + # Card without metadata (no local_agent_class) + card = make_card_dict( + "NoSpecAgent", "http://127.0.0.1:8102", push_notifications=False + ) + with open(dir_path / "NoSpecAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts["NoSpecAgent"] + assert ctx.agent_class_spec is None + + # Preload should skip this agent (no error, just skip) + rc.preload_local_agent_classes() + + # Still None since there was no spec + assert ctx.agent_instance_class is None + + +def test_preload_skips_already_loaded_class(tmp_path: Path): + """Test that preload skips agents with class already loaded.""" + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = make_card_dict( + "PreloadedAgent", "http://127.0.0.1:8103", push_notifications=False + ) + card["metadata"] = { + "local_agent_class": "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + } + with open(dir_path / "PreloadedAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts["PreloadedAgent"] + # Simulate class already loaded + sentinel_class = type("SentinelClass", (), {}) + ctx.agent_instance_class = sentinel_class + + # Preload should skip since class is already loaded + rc.preload_local_agent_classes() + + # Should still be sentinel, not replaced + assert ctx.agent_instance_class is sentinel_class + + +def test_preload_handles_failed_import(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Test that preload warns on failed import but continues.""" + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = make_card_dict( + "FailAgent", "http://127.0.0.1:8104", push_notifications=False + ) + card["metadata"] = {"local_agent_class": "nonexistent.module:FakeClass"} + with open(dir_path / "FailAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts["FailAgent"] + assert ctx.agent_class_spec == "nonexistent.module:FakeClass" + + # Preload should warn but not raise + rc.preload_local_agent_classes() + + # Class should remain None due to failed import + assert ctx.agent_instance_class is None + + @pytest.mark.asyncio async def test_start_agent_without_listener( tmp_path: Path, monkeypatch: pytest.MonkeyPatch diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index d04074f93..a35523d4a 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -138,17 +138,6 @@ async def lifespan(app: FastAPI): except Exception as e: logger.info(f"Error configuring adapters: {e}") - # Preload local agent classes to avoid import deadlocks on Windows - try: - from valuecell.core.agent.connect import RemoteConnections - - logger.info("Preloading local agent classes...") - rc = RemoteConnections() - rc.preload_local_agent_classes() - logger.info("✓ Local agent classes preloaded") - except Exception as e: - logger.warning(f"✗ Failed to preload local agent classes: {e}") - yield # Shutdown logger.info("ValueCell Server shutting down...") diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 59ce9787e..52155b32b 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -7,6 +7,7 @@ from loguru import logger +from valuecell.core.agent.connect import RemoteConnections from valuecell.core.coordinate.orchestrator import AgentOrchestrator from valuecell.core.task.executor import TaskExecutor from valuecell.core.task.locator import get_task_service @@ -15,6 +16,27 @@ from valuecell.utils.uuid import generate_conversation_id _TASK_AUTORESTART_STARTED = False +_AGENT_CLASSES_PRELOADED = False + + +def _preload_agent_classes_once() -> None: + """Preload local agent classes once to avoid Windows import lock deadlocks. + + This must run in the main thread before any async operations that might + trigger imports in worker threads. Safe to call multiple times. + """ + global _AGENT_CLASSES_PRELOADED + if _AGENT_CLASSES_PRELOADED: + return + _AGENT_CLASSES_PRELOADED = True + + try: + logger.info("Preloading local agent classes...") + rc = RemoteConnections() + rc.preload_local_agent_classes() + logger.info("✓ Local agent classes preloaded") + except Exception as e: + logger.warning(f"✗ Failed to preload local agent classes: {e}") class AgentStreamService: @@ -22,6 +44,10 @@ class AgentStreamService: def __init__(self): """Initialize the agent stream service.""" + # Preload agent classes before creating orchestrator to avoid + # Windows import lock deadlocks when using thread pools + _preload_agent_classes_once() + self.orchestrator = AgentOrchestrator() logger.info("Agent stream service initialized")