From 4474b2cebc8ea92dfcc8a8181ad8e5828b5b033c Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 18:23:29 +0800 Subject: [PATCH 1/4] feat(agent): preload local agent classes to avoid import deadlocks on Windows --- python/valuecell/core/agent/connect.py | 50 ++++++++++++++++++++------ python/valuecell/server/api/app.py | 33 +++++++++++------ 2 files changed, 61 insertions(+), 22 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 4e34f5046..beabef6cb 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -123,9 +123,11 @@ async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: # Fast path: cache hit cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) if cached is not None: - logger.debug("_resolve_local_agent_class: cache hit for '{}'", spec) + logger.info("_resolve_local_agent_class: cache hit for '{}'", spec) return cached + logger.info("_resolve_local_agent_class: cache miss for '{}', delegating to executor", spec) + loop = asyncio.get_running_loop() # Delegate the synchronous import to the thread pool try: @@ -292,17 +294,43 @@ def preload_local_agent_classes(self) -> None: the main thread, we sidestep this issue entirely. """ self._ensure_remote_contexts_loaded() + preloaded_count = 0 for name, ctx in self._contexts.items(): - if ctx.agent_class_spec and ctx.agent_instance_class is None: - logger.info("Preloading agent class for '{}'", name) - cls = _resolve_local_agent_class_sync(ctx.agent_class_spec) - ctx.agent_instance_class = cls - if cls is None: - logger.warning( - "Failed to preload agent class '{}' for '{}'", - ctx.agent_class_spec, - name, - ) + if not ctx.agent_class_spec: + logger.debug( + "Skipping preload for '{}': no agent_class_spec", name + ) + continue + if ctx.agent_instance_class is not None: + logger.debug( + "Skipping preload for '{}': class already loaded", name + ) + continue + logger.info( + "Preloading agent class for '{}' (spec='{}')", + name, + ctx.agent_class_spec, + ) + cls = _resolve_local_agent_class_sync(ctx.agent_class_spec) + ctx.agent_instance_class = cls + if cls is None: + logger.warning( + "Failed to preload agent class '{}' for '{}'", + ctx.agent_class_spec, + name, + ) + else: + preloaded_count += 1 + logger.info( + "Successfully preloaded class '{}' for '{}'", + cls.__name__, + name, + ) + logger.info( + "Preload complete: {}/{} agent classes loaded", + preloaded_count, + len(self._contexts), + ) # Public helper primarily for tests or tooling to load from a custom dir def load_from_dir(self, config_dir: str) -> None: diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index d04074f93..093c845a8 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -138,17 +138,6 @@ async def lifespan(app: FastAPI): except Exception as e: logger.info(f"Error configuring adapters: {e}") - # Preload local agent classes to avoid import deadlocks on Windows - try: - from valuecell.core.agent.connect import RemoteConnections - - logger.info("Preloading local agent classes...") - rc = RemoteConnections() - rc.preload_local_agent_classes() - logger.info("✓ Local agent classes preloaded") - except Exception as e: - logger.warning(f"✗ Failed to preload local agent classes: {e}") - yield # Shutdown logger.info("ValueCell Server shutting down...") @@ -171,9 +160,31 @@ async def lifespan(app: FastAPI): # Add routes _add_routes(app, settings) + # Preload local agent classes synchronously at module load time + # to avoid import deadlocks on Windows when using thread pools later. + # This runs once when the app is created, before any requests. + _preload_agent_classes() + return app +def _preload_agent_classes() -> None: + """Preload local agent classes to avoid Windows import lock deadlocks. + + This must run in the main thread before any async operations that might + trigger imports in worker threads. + """ + try: + from valuecell.core.agent.connect import RemoteConnections + + logger.info("Preloading local agent classes...") + rc = RemoteConnections() + rc.preload_local_agent_classes() + logger.info("✓ Local agent classes preloaded") + except Exception as e: + logger.warning(f"✗ Failed to preload local agent classes: {e}") + + def _add_middleware(app: FastAPI, settings) -> None: """Add middleware to the application.""" # CORS middleware From 7f174a50014a0768c38dcb2b4a51402f8f0b187e Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 18:39:33 +0800 Subject: [PATCH 2/4] feat(agent): implement agent class preloading to prevent Windows import deadlocks --- python/valuecell/server/api/app.py | 22 ---------------- .../server/services/agent_stream_service.py | 26 +++++++++++++++++++ 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index 093c845a8..a35523d4a 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -160,31 +160,9 @@ async def lifespan(app: FastAPI): # Add routes _add_routes(app, settings) - # Preload local agent classes synchronously at module load time - # to avoid import deadlocks on Windows when using thread pools later. - # This runs once when the app is created, before any requests. - _preload_agent_classes() - return app -def _preload_agent_classes() -> None: - """Preload local agent classes to avoid Windows import lock deadlocks. - - This must run in the main thread before any async operations that might - trigger imports in worker threads. - """ - try: - from valuecell.core.agent.connect import RemoteConnections - - logger.info("Preloading local agent classes...") - rc = RemoteConnections() - rc.preload_local_agent_classes() - logger.info("✓ Local agent classes preloaded") - except Exception as e: - logger.warning(f"✗ Failed to preload local agent classes: {e}") - - def _add_middleware(app: FastAPI, settings) -> None: """Add middleware to the application.""" # CORS middleware diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 59ce9787e..c52e6eea0 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -7,6 +7,7 @@ from loguru import logger +from valuecell.core.agent.connect import RemoteConnections from valuecell.core.coordinate.orchestrator import AgentOrchestrator from valuecell.core.task.executor import TaskExecutor from valuecell.core.task.locator import get_task_service @@ -15,6 +16,27 @@ from valuecell.utils.uuid import generate_conversation_id _TASK_AUTORESTART_STARTED = False +_AGENT_CLASSES_PRELOADED = False + + +def _preload_agent_classes_once() -> None: + """Preload local agent classes once to avoid Windows import lock deadlocks. + + This must run in the main thread before any async operations that might + trigger imports in worker threads. Safe to call multiple times. + """ + global _AGENT_CLASSES_PRELOADED + if _AGENT_CLASSES_PRELOADED: + return + _AGENT_CLASSES_PRELOADED = True + + try: + logger.info("Preloading local agent classes...") + rc = RemoteConnections() + rc.preload_local_agent_classes() + logger.info("✓ Local agent classes preloaded") + except Exception as e: + logger.warning(f"✗ Failed to preload local agent classes: {e}") class AgentStreamService: @@ -22,6 +44,10 @@ class AgentStreamService: def __init__(self): """Initialize the agent stream service.""" + # Preload agent classes before creating orchestrator to avoid + # Windows import lock deadlocks when using thread pools + _preload_agent_classes_once() + self.orchestrator = AgentOrchestrator() logger.info("Agent stream service initialized") From d0f8b36a74c0eba5c54df6082171283eaa24b181 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 18:46:54 +0800 Subject: [PATCH 3/4] update tests --- .../core/agent/tests/test_connect.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 3d242f531..989846507 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -161,6 +161,85 @@ def test_preload_agent_classes(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): assert ctx.agent_instance_class is not None +def test_preload_skips_agents_without_class_spec(tmp_path: Path): + """Test that preload skips agents without agent_class_spec.""" + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + # Card without metadata (no local_agent_class) + card = make_card_dict( + "NoSpecAgent", "http://127.0.0.1:8102", push_notifications=False + ) + with open(dir_path / "NoSpecAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts["NoSpecAgent"] + assert ctx.agent_class_spec is None + + # Preload should skip this agent (no error, just skip) + rc.preload_local_agent_classes() + + # Still None since there was no spec + assert ctx.agent_instance_class is None + + +def test_preload_skips_already_loaded_class(tmp_path: Path): + """Test that preload skips agents with class already loaded.""" + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = make_card_dict( + "PreloadedAgent", "http://127.0.0.1:8103", push_notifications=False + ) + card["metadata"] = { + "local_agent_class": "valuecell.agents.prompt_strategy_agent.core:PromptBasedStrategyAgent" + } + with open(dir_path / "PreloadedAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts["PreloadedAgent"] + # Simulate class already loaded + sentinel_class = type("SentinelClass", (), {}) + ctx.agent_instance_class = sentinel_class + + # Preload should skip since class is already loaded + rc.preload_local_agent_classes() + + # Should still be sentinel, not replaced + assert ctx.agent_instance_class is sentinel_class + + +def test_preload_handles_failed_import(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Test that preload warns on failed import but continues.""" + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = make_card_dict( + "FailAgent", "http://127.0.0.1:8104", push_notifications=False + ) + card["metadata"] = {"local_agent_class": "nonexistent.module:FakeClass"} + with open(dir_path / "FailAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts["FailAgent"] + assert ctx.agent_class_spec == "nonexistent.module:FakeClass" + + # Preload should warn but not raise + rc.preload_local_agent_classes() + + # Class should remain None due to failed import + assert ctx.agent_instance_class is None + + @pytest.mark.asyncio async def test_start_agent_without_listener( tmp_path: Path, monkeypatch: pytest.MonkeyPatch From 7177ca27e686c630993359bf7dc7d1de24c276f7 Mon Sep 17 00:00:00 2001 From: Felix <24791380+vcfgv@users.noreply.github.com> Date: Fri, 5 Dec 2025 18:47:33 +0800 Subject: [PATCH 4/4] make format --- python/valuecell/core/agent/connect.py | 12 +++++------- .../server/services/agent_stream_service.py | 6 +++--- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index beabef6cb..09d7ee5f3 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -126,7 +126,9 @@ async def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: logger.info("_resolve_local_agent_class: cache hit for '{}'", spec) return cached - logger.info("_resolve_local_agent_class: cache miss for '{}', delegating to executor", spec) + logger.info( + "_resolve_local_agent_class: cache miss for '{}', delegating to executor", spec + ) loop = asyncio.get_running_loop() # Delegate the synchronous import to the thread pool @@ -297,14 +299,10 @@ def preload_local_agent_classes(self) -> None: preloaded_count = 0 for name, ctx in self._contexts.items(): if not ctx.agent_class_spec: - logger.debug( - "Skipping preload for '{}': no agent_class_spec", name - ) + logger.debug("Skipping preload for '{}': no agent_class_spec", name) continue if ctx.agent_instance_class is not None: - logger.debug( - "Skipping preload for '{}': class already loaded", name - ) + logger.debug("Skipping preload for '{}': class already loaded", name) continue logger.info( "Preloading agent class for '{}' (spec='{}')", diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index c52e6eea0..52155b32b 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -21,7 +21,7 @@ def _preload_agent_classes_once() -> None: """Preload local agent classes once to avoid Windows import lock deadlocks. - + This must run in the main thread before any async operations that might trigger imports in worker threads. Safe to call multiple times. """ @@ -29,7 +29,7 @@ def _preload_agent_classes_once() -> None: if _AGENT_CLASSES_PRELOADED: return _AGENT_CLASSES_PRELOADED = True - + try: logger.info("Preloading local agent classes...") rc = RemoteConnections() @@ -47,7 +47,7 @@ def __init__(self): # Preload agent classes before creating orchestrator to avoid # Windows import lock deadlocks when using thread pools _preload_agent_classes_once() - + self.orchestrator = AgentOrchestrator() logger.info("Agent stream service initialized")