From 72f2085fce9baf5834ede92e5a71e7e54389b68d Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 17 Nov 2025 17:43:59 +0800 Subject: [PATCH 01/10] refactor: update agent configurations to include local agent class and streamline agent launching process --- .../investment_research_agent.json | 3 +- python/configs/agent_cards/news_agent.json | 3 +- .../configs/agent_cards/strategy_agent.json | 3 +- python/scripts/launch.py | 21 +-- python/valuecell/core/agent/connect.py | 147 +++++++++++++++++- python/valuecell/core/agent/decorator.py | 16 +- .../core/agent/tests/test_decorator.py | 4 +- 7 files changed, 168 insertions(+), 29 deletions(-) diff --git a/python/configs/agent_cards/investment_research_agent.json b/python/configs/agent_cards/investment_research_agent.json index cdcd0fc5e..d8c91ce12 100644 --- a/python/configs/agent_cards/investment_research_agent.json +++ b/python/configs/agent_cards/investment_research_agent.json @@ -52,6 +52,7 @@ "tags": [ "sec filings", "fundamental analysis" - ] + ], + "local_agent_class": "valuecell.agents.research_agent.core:ResearchAgent" } } \ No newline at end of file diff --git a/python/configs/agent_cards/news_agent.json b/python/configs/agent_cards/news_agent.json index 2f70c77d0..fa6c4a0ce 100644 --- a/python/configs/agent_cards/news_agent.json +++ b/python/configs/agent_cards/news_agent.json @@ -50,6 +50,7 @@ "current events", "financial markets", "real-time search" - ] + ], + "local_agent_class": "valuecell.agents.news_agent.core:NewsAgent" } } \ No newline at end of file diff --git a/python/configs/agent_cards/strategy_agent.json b/python/configs/agent_cards/strategy_agent.json index b3fe27e63..4256f8b29 100644 --- a/python/configs/agent_cards/strategy_agent.json +++ b/python/configs/agent_cards/strategy_agent.json @@ -25,6 +25,7 @@ "version": "0.1.0", "author": "ValueCell Team", "tags": ["strategy", "trading", "llm", "demo"], - "notes": "This card is a lightweight example; replace model api_key and tune parameters for production use." + "notes": "This card is a lightweight example; replace model api_key and tune parameters for production use.", + "local_agent_class": "valuecell.agents.strategy_agent.agent:StrategyAgent" } } diff --git a/python/scripts/launch.py b/python/scripts/launch.py index 80fc3d7d5..b3634404e 100644 --- a/python/scripts/launch.py +++ b/python/scripts/launch.py @@ -125,25 +125,10 @@ def main(): processes = [] logfiles = [] - for selected_agent in selected_agents: - logfile_path = f"{log_dir}/{selected_agent}.log" - print(f"Starting agent: {selected_agent} - output to {logfile_path}") - # Open logfile for writing - logfile = open(logfile_path, "w") - logfiles.append(logfile) - - # Launch command using Popen with output redirected to logfile - process = subprocess.Popen( - MAP_NAME_COMMAND[selected_agent], shell=True, stdout=logfile, stderr=logfile - ) - processes.append(process) - print("All agents launched. Waiting for tasks...") - - for selected_agent in selected_agents: - print( - f"You can monitor {selected_agent} logs at {log_dir}/{selected_agent}.log or chat on: {FRONTEND_URL}/agent/{selected_agent}" - ) + print( + "Agents are now managed in-process by RemoteConnections; external processes are no longer started." + ) # Launch backend logfile_path = f"{log_dir}/backend.log" diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index e60a4b231..3357914fe 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -2,13 +2,15 @@ import json import logging from dataclasses import dataclass +from importlib import import_module from pathlib import Path -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional, Type from a2a.types import AgentCard from valuecell.core.agent.card import parse_local_agent_card_dict from valuecell.core.agent.client import AgentClient +from valuecell.core.agent.decorator import create_wrapped_agent from valuecell.core.agent.listener import NotificationListener from valuecell.core.types import NotificationCallbackType from valuecell.utils import get_next_available_port @@ -16,6 +18,8 @@ logger = logging.getLogger(__name__) +AGENT_METADATA_CLASS_KEY = "local_agent_class" + @dataclass class AgentContext: """Unified context for remote agents. @@ -37,6 +41,40 @@ class AgentContext: desired_listener_host: Optional[str] = None desired_listener_port: Optional[int] = None notification_callback: Optional[NotificationCallbackType] = None + # Local in-process agent runtime + agent_instance: Optional[Any] = None + agent_task: Optional[asyncio.Task] = None + agent_instance_class: Optional[Type[Any]] = None + + +_LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {} + + +def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: + if not spec: + return None + + cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) + if cached is not None: + return cached + + try: + module_path, class_name = spec.split(":", 1) + module = import_module(module_path) + agent_cls = getattr(module, class_name) + except (ValueError, AttributeError, ImportError) as exc: + logger.error("Failed to import agent class '%s': %s", spec, exc) + return None + + _LOCAL_AGENT_CLASS_CACHE[spec] = agent_cls + return agent_cls + + +def _build_local_agent(ctx: AgentContext): + agent_cls = ctx.agent_instance_class + if agent_cls is None: + return None + return create_wrapped_agent(agent_cls) class RemoteConnections: @@ -93,12 +131,21 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: continue if not agent_card_dict.get("enabled", True): continue + metadata = ( + agent_card_dict.get("metadata") + if isinstance(agent_card_dict.get("metadata"), dict) + else {} + ) + class_spec = metadata.get(AGENT_METADATA_CLASS_KEY) + agent_instance_class = ( + _resolve_local_agent_class(class_spec) + if isinstance(class_spec, str) + else None + ) # Detect planner passthrough from raw JSON (top-level or metadata) passthrough = bool(agent_card_dict.get("planner_passthrough")) if not passthrough: - meta = agent_card_dict.get("metadata") or {} - if isinstance(meta, dict): - passthrough = bool(meta.get("planner_passthrough")) + passthrough = bool(metadata.get("planner_passthrough")) local_agent_card = parse_local_agent_card_dict(agent_card_dict) if not local_agent_card: continue @@ -107,7 +154,14 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: url=local_agent_card.url, local_agent_card=local_agent_card, planner_passthrough=passthrough, + agent_instance_class=agent_instance_class, ) + if class_spec and not agent_instance_class: + logger.warning( + "Unable to resolve local agent class '%s' for '%s'", + class_spec, + agent_name, + ) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: logger.warning( f"Failed to load agent card from {json_file}; skipping: {e}" @@ -147,6 +201,8 @@ async def start_agent( ctx.desired_listener_port = listener_port ctx.notification_callback = notification_callback + await self._ensure_agent_runtime(ctx) + # If already connected, return card if ctx.client and ctx.client.agent_card: return ctx.client.agent_card @@ -193,7 +249,7 @@ async def _ensure_client(self, ctx: AgentContext) -> None: # Initialize a temporary client; only assign to context on success tmp_client = AgentClient(url, push_notification_url=ctx.listener_url) try: - await tmp_client.ensure_initialized() + await self._initialize_client(tmp_client, ctx) # Ensure agent card was resolved by the resolver if not getattr(tmp_client, "agent_card", None): raise RuntimeError("Agent card resolution returned None") @@ -211,6 +267,65 @@ async def _ensure_client(self, ctx: AgentContext) -> None: logger.error(f"Failed to initialize client for '{ctx.name}' at {url}: {e}") raise + async def _ensure_agent_runtime(self, ctx: AgentContext) -> None: + """Launch the agent locally if a factory is available.""" + # Existing running task: keep as is + if ctx.agent_task and not ctx.agent_task.done(): + return + + # Clean up finished tasks and propagate failures + if ctx.agent_task and ctx.agent_task.done(): + try: + ctx.agent_task.result() + except Exception as exc: + raise RuntimeError(f"Agent '{ctx.name}' failed during startup") from exc + finally: + ctx.agent_task = None + ctx.agent_instance = None + + if ctx.agent_instance is None: + agent_instance = _build_local_agent(ctx) + if agent_instance is None: + return + ctx.agent_instance = agent_instance + logger.info(f"Launching in-process agent '{ctx.name}'") + + if ctx.agent_task is None: + ctx.agent_task = asyncio.create_task(ctx.agent_instance.serve()) + # Give the event loop a chance to schedule startup work + await asyncio.sleep(0) + if ctx.agent_task.done(): + try: + ctx.agent_task.result() + except Exception as exc: + raise RuntimeError( + f"Agent '{ctx.name}' failed during startup" + ) from exc + finally: + ctx.agent_task = None + ctx.agent_instance = None + + async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> None: + """Initialize client with retry for local agents.""" + retries = 3 if ctx.agent_task else 1 + delay = 0.2 + for attempt in range(retries): + try: + await client.ensure_initialized() + return + except Exception as exc: + if attempt >= retries - 1: + raise + logger.debug( + "Retrying client initialization for '%s' (%s/%s): %s", + ctx.name, + attempt + 1, + retries, + exc, + ) + await asyncio.sleep(delay) + delay = min(delay * 2, 1.0) + async def _start_listener( self, host: str = "localhost", @@ -264,6 +379,28 @@ async def _cleanup_agent(self, agent_name: str): ctx = self._contexts.get(agent_name) if not ctx: return + agent_task = ctx.agent_task + if agent_task: + if ctx.agent_instance and hasattr(ctx.agent_instance, "shutdown"): + try: + await ctx.agent_instance.shutdown() + except Exception as exc: + logger.warning( + "Error shutting down agent '%s': %s", agent_name, exc + ) + try: + await asyncio.wait_for(agent_task, timeout=5) + except asyncio.TimeoutError: + agent_task.cancel() + try: + await agent_task + except asyncio.CancelledError: + pass + finally: + ctx.agent_task = None + ctx.agent_instance = None + elif ctx.agent_instance is not None: + ctx.agent_instance = None # Close client if ctx.client: await ctx.client.close() diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 037fdecbd..71c353ed2 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Type @@ -58,6 +59,7 @@ def __init__(self, *args, **kwargs): agent_card.url, default_scheme="http" ) self._executor = None + self._server: uvicorn.Server | None = None async def serve(self): # Create AgentExecutor wrapper @@ -88,9 +90,19 @@ async def serve(self): port=self._port, log_level="info", ) - server = uvicorn.Server(config) + self._server = uvicorn.Server(config) logger.info(f"Starting {agent_name} server at {self.agent_card.url}") - await server.serve() + try: + await self._server.serve() + finally: + await client.aclose() + self._server = None + + async def shutdown(self) -> None: + if not self._server: + return + self._server.should_exit = True + await asyncio.sleep(0) # Preserve original class metadata DecoratedAgent.__name__ = cls.__name__ diff --git a/python/valuecell/core/agent/tests/test_decorator.py b/python/valuecell/core/agent/tests/test_decorator.py index 57544b90e..dead1ad01 100644 --- a/python/valuecell/core/agent/tests/test_decorator.py +++ b/python/valuecell/core/agent/tests/test_decorator.py @@ -390,7 +390,9 @@ def __init__(self): mock_app.return_value = mock_app_instance # Mock other dependencies - mock_client.return_value = MagicMock() + mock_httpx_client = MagicMock() + mock_httpx_client.aclose = AsyncMock() + mock_client.return_value = mock_httpx_client mock_config_store.return_value = MagicMock() mock_task_store.return_value = MagicMock() mock_sender.return_value = MagicMock() From 9be7329f37bd0486efe4160475ad80e101464839 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 17 Nov 2025 17:49:26 +0800 Subject: [PATCH 02/10] update tests --- .../core/agent/tests/test_connect.py | 71 +++++++++++++++++++ .../core/agent/tests/test_decorator.py | 39 ++++++++++ 2 files changed, 110 insertions(+) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 9e165f905..9da689c09 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -374,3 +374,74 @@ async def test_get_all_agent_cards_returns_local_cards(tmp_path: Path): assert set(all_cards.keys()) == {"CardOne", "CardTwo"} assert all(isinstance(card, AgentCard) for card in all_cards.values()) + + +def test_resolve_local_agent_class_from_metadata(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Prepare a card with metadata pointing to a fake spec + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = { + "name": "MetaAgent", + "url": "http://127.0.0.1:9001", + "enabled": True, + "metadata": {connect_mod.AGENT_METADATA_CLASS_KEY: "fake:Spec"}, + } + with open(dir_path / "MetaAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + # Monkeypatch resolver to return DummyAgent class for that spec + class DummyAgent: + pass + + monkeypatch.setattr( + connect_mod, + "_resolve_local_agent_class", + lambda spec: DummyAgent if spec == "fake:Spec" else None, + ) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts.get("MetaAgent") + assert ctx is not None + assert ctx.agent_instance_class is DummyAgent + + +@pytest.mark.asyncio +async def test_initialize_client_retries(): + rc = RemoteConnections() + + # create a context with agent_task truthy to trigger retries + ctx = connect_mod.AgentContext(name="RetryAgent") + ctx.agent_task = True + + class FlakyClient: + def __init__(self): + self.attempts = 0 + self.agent_card = None + + async def ensure_initialized(self): + self.attempts += 1 + # fail twice then succeed + if self.attempts < 3: + raise RuntimeError("temporary failure") + self.agent_card = AgentCard.model_validate( + { + "name": "X", + "url": "http://x/", + "description": "x", + "capabilities": {"streaming": True, "push_notifications": False}, + "default_input_modes": [], + "default_output_modes": [], + "version": "", + } + ) + + client = FlakyClient() + + # Call private initializer directly to exercise retry logic + await rc._initialize_client(client, ctx) + + assert client.attempts >= 3 + assert client.agent_card is not None diff --git a/python/valuecell/core/agent/tests/test_decorator.py b/python/valuecell/core/agent/tests/test_decorator.py index dead1ad01..527fcddf6 100644 --- a/python/valuecell/core/agent/tests/test_decorator.py +++ b/python/valuecell/core/agent/tests/test_decorator.py @@ -408,6 +408,45 @@ def __init__(self): with pytest.raises(asyncio.CancelledError): await instance.serve() + @pytest.mark.asyncio + async def test_shutdown_sets_should_exit(self): + """Test shutdown flips server should_exit flag.""" + mock_card = AgentCard( + name="TestAgent", + url="http://localhost:8000", + description="Test agent", + capabilities=AgentCapabilities(streaming=True, push_notifications=False), + default_input_modes=["text"], + default_output_modes=["text"], + version="1.0.0", + skills=[ + { + "id": "test_skill", + "name": "Test Skill", + "description": "A test skill", + "tags": ["test"], + } + ], + ) + + decorator = _serve(mock_card) + + @decorator + class TestAgent2: + def __init__(self): + self._host = "localhost" + self._port = 8000 + + instance = TestAgent2() + + # Simulate a running uvicorn.Server stored on the instance + server = MagicMock() + server.should_exit = False + instance._server = server + + await instance.shutdown() + assert instance._server.should_exit is True + class TestCreateWrappedAgent: """Test create_wrapped_agent function.""" From a11fffa0fdd24447750f35738ac54d5d28326a12 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 17 Nov 2025 17:49:38 +0800 Subject: [PATCH 03/10] make format --- python/valuecell/core/agent/connect.py | 1 + python/valuecell/core/agent/tests/test_connect.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 3357914fe..fc9e7b1b8 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -20,6 +20,7 @@ AGENT_METADATA_CLASS_KEY = "local_agent_class" + @dataclass class AgentContext: """Unified context for remote agents. diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 9da689c09..226e808ed 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -376,7 +376,9 @@ async def test_get_all_agent_cards_returns_local_cards(tmp_path: Path): assert all(isinstance(card, AgentCard) for card in all_cards.values()) -def test_resolve_local_agent_class_from_metadata(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): +def test_resolve_local_agent_class_from_metadata( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): # Prepare a card with metadata pointing to a fake spec dir_path = tmp_path / "agent_cards" dir_path.mkdir(parents=True) From 88cfc29a7871113bfc6b09d339a7063b44e20f24 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 17 Nov 2025 17:52:32 +0800 Subject: [PATCH 04/10] fix tests --- python/valuecell/core/agent/tests/test_connect.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 226e808ed..d5edf0acf 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -388,6 +388,7 @@ def test_resolve_local_agent_class_from_metadata( "url": "http://127.0.0.1:9001", "enabled": True, "metadata": {connect_mod.AGENT_METADATA_CLASS_KEY: "fake:Spec"}, + "skills": [], } with open(dir_path / "MetaAgent.json", "w", encoding="utf-8") as f: json.dump(card, f) @@ -437,6 +438,7 @@ async def ensure_initialized(self): "default_input_modes": [], "default_output_modes": [], "version": "", + "skills": [], } ) From d185e1274644153b1f3a2abcd1fd38312f99eed2 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 17 Nov 2025 17:59:04 +0800 Subject: [PATCH 05/10] improve coverage --- .../core/agent/tests/test_connect.py | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index d5edf0acf..14bbf8478 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -449,3 +449,167 @@ async def ensure_initialized(self): assert client.attempts >= 3 assert client.agent_card is not None + + +def test_resolve_local_agent_class_empty_spec_returns_none(): + assert connect_mod._resolve_local_agent_class("") is None + + +def test_resolve_local_agent_class_cache_hit(): + spec = "cached:Spec" + sentinel = object() + connect_mod._LOCAL_AGENT_CLASS_CACHE[spec] = sentinel + try: + assert connect_mod._resolve_local_agent_class(spec) is sentinel + finally: + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + +def test_resolve_local_agent_class_invalid_spec(caplog: pytest.LogCaptureFixture): + caplog.set_level("ERROR") + spec = "valuecell.nonexistent:Missing" + result = connect_mod._resolve_local_agent_class(spec) + assert result is None + assert any("Failed to import agent class" in r.message for r in caplog.records) + + +def test_build_local_agent_returns_none_when_no_class(): + ctx = connect_mod.AgentContext(name="NoClass") + ctx.agent_instance_class = None + assert connect_mod._build_local_agent(ctx) is None + + +def test_build_local_agent_invokes_factory(monkeypatch: pytest.MonkeyPatch): + ctx = connect_mod.AgentContext(name="WithClass") + sentinel = object() + + class DummyAgent: + pass + + ctx.agent_instance_class = DummyAgent + monkeypatch.setattr( + connect_mod, + "create_wrapped_agent", + lambda cls: sentinel if cls is DummyAgent else None, + ) + + assert connect_mod._build_local_agent(ctx) is sentinel + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_returns_when_task_running(): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="RunningAgent") + + async def never(): + await asyncio.Event().wait() + + task = asyncio.create_task(never()) + ctx.agent_task = task + + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_task is task + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_finished_task_failure(): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="FailedAgent") + fut = asyncio.Future() + fut.set_exception(RuntimeError("boom")) + ctx.agent_task = fut + + with pytest.raises(RuntimeError, match="FailedAgent"): + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_task is None + assert ctx.agent_instance is None + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_no_factory(monkeypatch: pytest.MonkeyPatch): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="NoFactory") + monkeypatch.setattr(connect_mod, "_build_local_agent", lambda _: None) + + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_instance is None + assert ctx.agent_task is None + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_new_task_failure(monkeypatch: pytest.MonkeyPatch): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="FailingAgent") + + class FailingAgent: + async def serve(self): + raise RuntimeError("serve failed") + + monkeypatch.setattr(connect_mod, "_build_local_agent", lambda _: FailingAgent()) + + with pytest.raises(RuntimeError, match="FailingAgent"): + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_task is None + assert ctx.agent_instance is None + + +@pytest.mark.asyncio +async def test_cleanup_agent_handles_timeout(monkeypatch: pytest.MonkeyPatch): + rc = RemoteConnections() + agent_name = "TimeoutAgent" + ctx = connect_mod.AgentContext(name=agent_name) + + shutdown_called = False + + class DummyInstance: + async def shutdown(self): + nonlocal shutdown_called + shutdown_called = True + + async def never(): + await asyncio.Event().wait() + + task = asyncio.create_task(never()) + ctx.agent_task = task + ctx.agent_instance = DummyInstance() + + async def fake_wait_for(task_obj, timeout): + raise asyncio.TimeoutError + + monkeypatch.setattr(connect_mod.asyncio, "wait_for", fake_wait_for) + rc._contexts[agent_name] = ctx + + await rc._cleanup_agent(agent_name) + + assert shutdown_called + assert ctx.agent_task is None + assert ctx.agent_instance is None + assert task.cancelled() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_cleanup_agent_clears_idle_resources(): + rc = RemoteConnections() + agent_name = "IdleAgent" + ctx = connect_mod.AgentContext(name=agent_name) + ctx.agent_instance = object() + listener = asyncio.create_task(asyncio.sleep(0)) + ctx.listener_task = listener + ctx.listener_url = "http://localhost:9999" + rc._contexts[agent_name] = ctx + + await rc._cleanup_agent(agent_name) + + assert ctx.agent_instance is None + assert ctx.listener_task is None + assert ctx.listener_url is None + assert listener.cancelled() or listener.done() From af157a024ab73dc139b8197a8bea58b62c74ae43 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 17 Nov 2025 18:53:22 +0800 Subject: [PATCH 06/10] refactor: enhance local agent class resolution with asyncio --- python/valuecell/core/agent/connect.py | 34 +++++++++++++++++--------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index fc9e7b1b8..3f9baa815 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -46,6 +46,7 @@ class AgentContext: agent_instance: Optional[Any] = None agent_task: Optional[asyncio.Task] = None agent_instance_class: Optional[Type[Any]] = None + agent_class_spec: Optional[str] = None _LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {} @@ -71,8 +72,20 @@ def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: return agent_cls -def _build_local_agent(ctx: AgentContext): +async def _build_local_agent(ctx: AgentContext): agent_cls = ctx.agent_instance_class + if agent_cls is None and ctx.agent_class_spec: + agent_cls = await asyncio.to_thread( + _resolve_local_agent_class, ctx.agent_class_spec + ) + ctx.agent_instance_class = agent_cls + if agent_cls is None: + logger.warning( + "Unable to resolve local agent class '%s' for '%s'", + ctx.agent_class_spec, + ctx.name, + ) + return None if agent_cls is None: return None return create_wrapped_agent(agent_cls) @@ -137,12 +150,12 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: if isinstance(agent_card_dict.get("metadata"), dict) else {} ) - class_spec = metadata.get(AGENT_METADATA_CLASS_KEY) - agent_instance_class = ( - _resolve_local_agent_class(class_spec) - if isinstance(class_spec, str) + class_spec = ( + metadata.get(AGENT_METADATA_CLASS_KEY) + if isinstance(metadata, dict) else None ) + agent_instance_class = None # Detect planner passthrough from raw JSON (top-level or metadata) passthrough = bool(agent_card_dict.get("planner_passthrough")) if not passthrough: @@ -156,13 +169,10 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: local_agent_card=local_agent_card, planner_passthrough=passthrough, agent_instance_class=agent_instance_class, + agent_class_spec=class_spec + if isinstance(class_spec, str) + else None, ) - if class_spec and not agent_instance_class: - logger.warning( - "Unable to resolve local agent class '%s' for '%s'", - class_spec, - agent_name, - ) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: logger.warning( f"Failed to load agent card from {json_file}; skipping: {e}" @@ -285,7 +295,7 @@ async def _ensure_agent_runtime(self, ctx: AgentContext) -> None: ctx.agent_instance = None if ctx.agent_instance is None: - agent_instance = _build_local_agent(ctx) + agent_instance = await _build_local_agent(ctx) if agent_instance is None: return ctx.agent_instance = agent_instance From 2cd6b68bfe1d0cea430d9cea63c96c6c2bf663f1 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 17 Nov 2025 18:57:24 +0800 Subject: [PATCH 07/10] fix tests --- .../core/agent/tests/test_connect.py | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 14bbf8478..2283ae67c 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -376,7 +376,8 @@ async def test_get_all_agent_cards_returns_local_cards(tmp_path: Path): assert all(isinstance(card, AgentCard) for card in all_cards.values()) -def test_resolve_local_agent_class_from_metadata( +@pytest.mark.asyncio +async def test_resolve_local_agent_class_from_metadata( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ): # Prepare a card with metadata pointing to a fake spec @@ -408,6 +409,15 @@ class DummyAgent: ctx = rc._contexts.get("MetaAgent") assert ctx is not None + assert ctx.agent_instance_class is None + assert ctx.agent_class_spec == "fake:Spec" + + sentinel = object() + monkeypatch.setattr(connect_mod, "create_wrapped_agent", lambda cls: sentinel) + + result = await connect_mod._build_local_agent(ctx) + + assert result is sentinel assert ctx.agent_instance_class is DummyAgent @@ -473,13 +483,15 @@ def test_resolve_local_agent_class_invalid_spec(caplog: pytest.LogCaptureFixture assert any("Failed to import agent class" in r.message for r in caplog.records) -def test_build_local_agent_returns_none_when_no_class(): +@pytest.mark.asyncio +async def test_build_local_agent_returns_none_when_no_class(): ctx = connect_mod.AgentContext(name="NoClass") ctx.agent_instance_class = None - assert connect_mod._build_local_agent(ctx) is None + assert await connect_mod._build_local_agent(ctx) is None -def test_build_local_agent_invokes_factory(monkeypatch: pytest.MonkeyPatch): +@pytest.mark.asyncio +async def test_build_local_agent_invokes_factory(monkeypatch: pytest.MonkeyPatch): ctx = connect_mod.AgentContext(name="WithClass") sentinel = object() @@ -493,7 +505,7 @@ class DummyAgent: lambda cls: sentinel if cls is DummyAgent else None, ) - assert connect_mod._build_local_agent(ctx) is sentinel + assert await connect_mod._build_local_agent(ctx) is sentinel @pytest.mark.asyncio @@ -534,7 +546,11 @@ async def test_ensure_agent_runtime_finished_task_failure(): async def test_ensure_agent_runtime_no_factory(monkeypatch: pytest.MonkeyPatch): rc = RemoteConnections() ctx = connect_mod.AgentContext(name="NoFactory") - monkeypatch.setattr(connect_mod, "_build_local_agent", lambda _: None) + + async def _noop(_): + return None + + monkeypatch.setattr(connect_mod, "_build_local_agent", _noop) await rc._ensure_agent_runtime(ctx) @@ -551,7 +567,10 @@ class FailingAgent: async def serve(self): raise RuntimeError("serve failed") - monkeypatch.setattr(connect_mod, "_build_local_agent", lambda _: FailingAgent()) + async def _factory(_): + return FailingAgent() + + monkeypatch.setattr(connect_mod, "_build_local_agent", _factory) with pytest.raises(RuntimeError, match="FailingAgent"): await rc._ensure_agent_runtime(ctx) From 54b93c7d3194883a020deeba9df10fae47e8dbe5 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 18 Nov 2025 09:47:16 +0800 Subject: [PATCH 08/10] refactor: replace logging string formatting --- python/valuecell/core/agent/connect.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 3f9baa815..b0611e355 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -1,12 +1,12 @@ import asyncio import json -import logging from dataclasses import dataclass from importlib import import_module from pathlib import Path from typing import Any, Dict, List, Optional, Type from a2a.types import AgentCard +from loguru import logger from valuecell.core.agent.card import parse_local_agent_card_dict from valuecell.core.agent.client import AgentClient @@ -15,9 +15,6 @@ from valuecell.core.types import NotificationCallbackType from valuecell.utils import get_next_available_port -logger = logging.getLogger(__name__) - - AGENT_METADATA_CLASS_KEY = "local_agent_class" @@ -65,7 +62,7 @@ def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: module = import_module(module_path) agent_cls = getattr(module, class_name) except (ValueError, AttributeError, ImportError) as exc: - logger.error("Failed to import agent class '%s': %s", spec, exc) + logger.error("Failed to import agent class '{}': {}", spec, exc) return None _LOCAL_AGENT_CLASS_CACHE[spec] = agent_cls @@ -81,7 +78,7 @@ async def _build_local_agent(ctx: AgentContext): ctx.agent_instance_class = agent_cls if agent_cls is None: logger.warning( - "Unable to resolve local agent class '%s' for '%s'", + "Unable to resolve local agent class '{}' for '{}'", ctx.agent_class_spec, ctx.name, ) @@ -169,9 +166,9 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: local_agent_card=local_agent_card, planner_passthrough=passthrough, agent_instance_class=agent_instance_class, - agent_class_spec=class_spec - if isinstance(class_spec, str) - else None, + agent_class_spec=( + class_spec if isinstance(class_spec, str) else None + ), ) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: logger.warning( @@ -328,7 +325,7 @@ async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> No if attempt >= retries - 1: raise logger.debug( - "Retrying client initialization for '%s' (%s/%s): %s", + "Retrying client initialization for '{}' ({}/{}}): {}", ctx.name, attempt + 1, retries, @@ -397,7 +394,7 @@ async def _cleanup_agent(self, agent_name: str): await ctx.agent_instance.shutdown() except Exception as exc: logger.warning( - "Error shutting down agent '%s': %s", agent_name, exc + "Error shutting down agent '{}': {}", agent_name, exc ) try: await asyncio.wait_for(agent_task, timeout=5) From b7693a17c6810c9cf9a8c5f8bff1f67f056373ac Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 18 Nov 2025 10:13:31 +0800 Subject: [PATCH 09/10] add docs --- python/valuecell/core/agent/connect.py | 45 +++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index b0611e355..11676840a 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -12,7 +12,7 @@ from valuecell.core.agent.client import AgentClient from valuecell.core.agent.decorator import create_wrapped_agent from valuecell.core.agent.listener import NotificationListener -from valuecell.core.types import NotificationCallbackType +from valuecell.core.types import NotificationCallbackType, BaseAgent from valuecell.utils import get_next_available_port AGENT_METADATA_CLASS_KEY = "local_agent_class" @@ -40,16 +40,32 @@ class AgentContext: desired_listener_port: Optional[int] = None notification_callback: Optional[NotificationCallbackType] = None # Local in-process agent runtime - agent_instance: Optional[Any] = None - agent_task: Optional[asyncio.Task] = None - agent_instance_class: Optional[Type[Any]] = None + # - `agent_class_spec`: original "module:Class" spec loaded from JSON + # We keep the spec so class resolution can be deferred (and performed + # off the event loop) when the agent is actually started. + # - `agent_instance`: concrete wrapped agent instance (created lazily) + # - `agent_instance_class`: resolved Python class for the agent, if imported + # - `agent_task`: asyncio.Task running the agent's HTTP server (if launched) agent_class_spec: Optional[str] = None + agent_instance: Optional[BaseAgent] = None + agent_instance_class: Optional[Type[BaseAgent]] = None + agent_task: Optional[asyncio.Task] = None _LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {} def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: + """Resolve a `module:Class` spec to a Python class. + + This function is synchronous and performs a normal import. Callers that + need to avoid blocking the event loop should invoke this via + `asyncio.to_thread(_resolve_local_agent_class, spec)`. + + Results are cached in `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated + imports/attribute lookups. + """ + if not spec: return None @@ -70,8 +86,23 @@ def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: async def _build_local_agent(ctx: AgentContext): + """Asynchronously produce a wrapped local agent instance for the + given `AgentContext`. + + Behavior: + - If `agent_instance_class` is already present, use it. + - Otherwise, if `agent_class_spec` is provided, resolve it off the + event loop (`asyncio.to_thread`) so imports don't block the loop. + - If resolution fails, log a warning and return `None` (caller will + treat missing factory as "no local agent available"). + - The actual wrapping call (`create_wrapped_agent`) is performed on + the event loop; this preserves any asyncio-related initialization + semantics required by the wrapper (if it needs loop context). + """ + agent_cls = ctx.agent_instance_class if agent_cls is None and ctx.agent_class_spec: + # Resolve the import in a worker thread to avoid blocking the loop. agent_cls = await asyncio.to_thread( _resolve_local_agent_class, ctx.agent_class_spec ) @@ -83,8 +114,14 @@ async def _build_local_agent(ctx: AgentContext): ctx.name, ) return None + if agent_cls is None: + # No factory available for this context return None + + # `create_wrapped_agent` can perform setup that expects to run in the + # main thread / event loop context (e.g. uvicorn/async setup). Keep it + # synchronous here so any asyncio primitives are created correctly. return create_wrapped_agent(agent_cls) From e9c4b0c9ddafa3f5c18591c01697ac913112837f Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 18 Nov 2025 10:20:42 +0800 Subject: [PATCH 10/10] fix tests --- python/valuecell/core/agent/connect.py | 4 ++-- python/valuecell/core/agent/tests/test_connect.py | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 11676840a..ddc8ed0e8 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -12,7 +12,7 @@ from valuecell.core.agent.client import AgentClient from valuecell.core.agent.decorator import create_wrapped_agent from valuecell.core.agent.listener import NotificationListener -from valuecell.core.types import NotificationCallbackType, BaseAgent +from valuecell.core.types import BaseAgent, NotificationCallbackType from valuecell.utils import get_next_available_port AGENT_METADATA_CLASS_KEY = "local_agent_class" @@ -362,7 +362,7 @@ async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> No if attempt >= retries - 1: raise logger.debug( - "Retrying client initialization for '{}' ({}/{}}): {}", + "Retrying client initialization for '{}' ({}/{}): {}", ctx.name, attempt + 1, retries, diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 2283ae67c..f4b1852bb 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -475,12 +475,10 @@ def test_resolve_local_agent_class_cache_hit(): connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) -def test_resolve_local_agent_class_invalid_spec(caplog: pytest.LogCaptureFixture): - caplog.set_level("ERROR") +def test_resolve_local_agent_class_invalid_spec(): spec = "valuecell.nonexistent:Missing" result = connect_mod._resolve_local_agent_class(spec) assert result is None - assert any("Failed to import agent class" in r.message for r in caplog.records) @pytest.mark.asyncio