diff --git a/python/configs/agent_cards/investment_research_agent.json b/python/configs/agent_cards/investment_research_agent.json index cdcd0fc5e..d8c91ce12 100644 --- a/python/configs/agent_cards/investment_research_agent.json +++ b/python/configs/agent_cards/investment_research_agent.json @@ -52,6 +52,7 @@ "tags": [ "sec filings", "fundamental analysis" - ] + ], + "local_agent_class": "valuecell.agents.research_agent.core:ResearchAgent" } } \ No newline at end of file diff --git a/python/configs/agent_cards/news_agent.json b/python/configs/agent_cards/news_agent.json index 2f70c77d0..fa6c4a0ce 100644 --- a/python/configs/agent_cards/news_agent.json +++ b/python/configs/agent_cards/news_agent.json @@ -50,6 +50,7 @@ "current events", "financial markets", "real-time search" - ] + ], + "local_agent_class": "valuecell.agents.news_agent.core:NewsAgent" } } \ No newline at end of file diff --git a/python/configs/agent_cards/strategy_agent.json b/python/configs/agent_cards/strategy_agent.json index b3fe27e63..4256f8b29 100644 --- a/python/configs/agent_cards/strategy_agent.json +++ b/python/configs/agent_cards/strategy_agent.json @@ -25,6 +25,7 @@ "version": "0.1.0", "author": "ValueCell Team", "tags": ["strategy", "trading", "llm", "demo"], - "notes": "This card is a lightweight example; replace model api_key and tune parameters for production use." + "notes": "This card is a lightweight example; replace model api_key and tune parameters for production use.", + "local_agent_class": "valuecell.agents.strategy_agent.agent:StrategyAgent" } } diff --git a/python/scripts/launch.py b/python/scripts/launch.py index 80fc3d7d5..b3634404e 100644 --- a/python/scripts/launch.py +++ b/python/scripts/launch.py @@ -125,25 +125,10 @@ def main(): processes = [] logfiles = [] - for selected_agent in selected_agents: - logfile_path = f"{log_dir}/{selected_agent}.log" - print(f"Starting agent: {selected_agent} - output to {logfile_path}") - # Open logfile for writing - logfile = open(logfile_path, "w") - logfiles.append(logfile) - - # Launch command using Popen with output redirected to logfile - process = subprocess.Popen( - MAP_NAME_COMMAND[selected_agent], shell=True, stdout=logfile, stderr=logfile - ) - processes.append(process) - print("All agents launched. Waiting for tasks...") - - for selected_agent in selected_agents: - print( - f"You can monitor {selected_agent} logs at {log_dir}/{selected_agent}.log or chat on: {FRONTEND_URL}/agent/{selected_agent}" - ) + print( + "Agents are now managed in-process by RemoteConnections; external processes are no longer started." + ) # Launch backend logfile_path = f"{log_dir}/backend.log" diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index e60a4b231..ddc8ed0e8 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -1,19 +1,21 @@ import asyncio import json -import logging from dataclasses import dataclass +from importlib import import_module from pathlib import Path -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional, Type from a2a.types import AgentCard +from loguru import logger from valuecell.core.agent.card import parse_local_agent_card_dict from valuecell.core.agent.client import AgentClient +from valuecell.core.agent.decorator import create_wrapped_agent from valuecell.core.agent.listener import NotificationListener -from valuecell.core.types import NotificationCallbackType +from valuecell.core.types import BaseAgent, NotificationCallbackType from valuecell.utils import get_next_available_port -logger = logging.getLogger(__name__) +AGENT_METADATA_CLASS_KEY = "local_agent_class" @dataclass @@ -37,6 +39,90 @@ class AgentContext: desired_listener_host: Optional[str] = None desired_listener_port: Optional[int] = None notification_callback: Optional[NotificationCallbackType] = None + # Local in-process agent runtime + # - `agent_class_spec`: original "module:Class" spec loaded from JSON + # We keep the spec so class resolution can be deferred (and performed + # off the event loop) when the agent is actually started. + # - `agent_instance`: concrete wrapped agent instance (created lazily) + # - `agent_instance_class`: resolved Python class for the agent, if imported + # - `agent_task`: asyncio.Task running the agent's HTTP server (if launched) + agent_class_spec: Optional[str] = None + agent_instance: Optional[BaseAgent] = None + agent_instance_class: Optional[Type[BaseAgent]] = None + agent_task: Optional[asyncio.Task] = None + + +_LOCAL_AGENT_CLASS_CACHE: Dict[str, Type[Any]] = {} + + +def _resolve_local_agent_class(spec: str) -> Optional[Type[Any]]: + """Resolve a `module:Class` spec to a Python class. + + This function is synchronous and performs a normal import. Callers that + need to avoid blocking the event loop should invoke this via + `asyncio.to_thread(_resolve_local_agent_class, spec)`. + + Results are cached in `_LOCAL_AGENT_CLASS_CACHE` to avoid repeated + imports/attribute lookups. + """ + + if not spec: + return None + + cached = _LOCAL_AGENT_CLASS_CACHE.get(spec) + if cached is not None: + return cached + + try: + module_path, class_name = spec.split(":", 1) + module = import_module(module_path) + agent_cls = getattr(module, class_name) + except (ValueError, AttributeError, ImportError) as exc: + logger.error("Failed to import agent class '{}': {}", spec, exc) + return None + + _LOCAL_AGENT_CLASS_CACHE[spec] = agent_cls + return agent_cls + + +async def _build_local_agent(ctx: AgentContext): + """Asynchronously produce a wrapped local agent instance for the + given `AgentContext`. + + Behavior: + - If `agent_instance_class` is already present, use it. + - Otherwise, if `agent_class_spec` is provided, resolve it off the + event loop (`asyncio.to_thread`) so imports don't block the loop. + - If resolution fails, log a warning and return `None` (caller will + treat missing factory as "no local agent available"). + - The actual wrapping call (`create_wrapped_agent`) is performed on + the event loop; this preserves any asyncio-related initialization + semantics required by the wrapper (if it needs loop context). + """ + + agent_cls = ctx.agent_instance_class + if agent_cls is None and ctx.agent_class_spec: + # Resolve the import in a worker thread to avoid blocking the loop. + agent_cls = await asyncio.to_thread( + _resolve_local_agent_class, ctx.agent_class_spec + ) + ctx.agent_instance_class = agent_cls + if agent_cls is None: + logger.warning( + "Unable to resolve local agent class '{}' for '{}'", + ctx.agent_class_spec, + ctx.name, + ) + return None + + if agent_cls is None: + # No factory available for this context + return None + + # `create_wrapped_agent` can perform setup that expects to run in the + # main thread / event loop context (e.g. uvicorn/async setup). Keep it + # synchronous here so any asyncio primitives are created correctly. + return create_wrapped_agent(agent_cls) class RemoteConnections: @@ -93,12 +179,21 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: continue if not agent_card_dict.get("enabled", True): continue + metadata = ( + agent_card_dict.get("metadata") + if isinstance(agent_card_dict.get("metadata"), dict) + else {} + ) + class_spec = ( + metadata.get(AGENT_METADATA_CLASS_KEY) + if isinstance(metadata, dict) + else None + ) + agent_instance_class = None # Detect planner passthrough from raw JSON (top-level or metadata) passthrough = bool(agent_card_dict.get("planner_passthrough")) if not passthrough: - meta = agent_card_dict.get("metadata") or {} - if isinstance(meta, dict): - passthrough = bool(meta.get("planner_passthrough")) + passthrough = bool(metadata.get("planner_passthrough")) local_agent_card = parse_local_agent_card_dict(agent_card_dict) if not local_agent_card: continue @@ -107,6 +202,10 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: url=local_agent_card.url, local_agent_card=local_agent_card, planner_passthrough=passthrough, + agent_instance_class=agent_instance_class, + agent_class_spec=( + class_spec if isinstance(class_spec, str) else None + ), ) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: logger.warning( @@ -147,6 +246,8 @@ async def start_agent( ctx.desired_listener_port = listener_port ctx.notification_callback = notification_callback + await self._ensure_agent_runtime(ctx) + # If already connected, return card if ctx.client and ctx.client.agent_card: return ctx.client.agent_card @@ -193,7 +294,7 @@ async def _ensure_client(self, ctx: AgentContext) -> None: # Initialize a temporary client; only assign to context on success tmp_client = AgentClient(url, push_notification_url=ctx.listener_url) try: - await tmp_client.ensure_initialized() + await self._initialize_client(tmp_client, ctx) # Ensure agent card was resolved by the resolver if not getattr(tmp_client, "agent_card", None): raise RuntimeError("Agent card resolution returned None") @@ -211,6 +312,65 @@ async def _ensure_client(self, ctx: AgentContext) -> None: logger.error(f"Failed to initialize client for '{ctx.name}' at {url}: {e}") raise + async def _ensure_agent_runtime(self, ctx: AgentContext) -> None: + """Launch the agent locally if a factory is available.""" + # Existing running task: keep as is + if ctx.agent_task and not ctx.agent_task.done(): + return + + # Clean up finished tasks and propagate failures + if ctx.agent_task and ctx.agent_task.done(): + try: + ctx.agent_task.result() + except Exception as exc: + raise RuntimeError(f"Agent '{ctx.name}' failed during startup") from exc + finally: + ctx.agent_task = None + ctx.agent_instance = None + + if ctx.agent_instance is None: + agent_instance = await _build_local_agent(ctx) + if agent_instance is None: + return + ctx.agent_instance = agent_instance + logger.info(f"Launching in-process agent '{ctx.name}'") + + if ctx.agent_task is None: + ctx.agent_task = asyncio.create_task(ctx.agent_instance.serve()) + # Give the event loop a chance to schedule startup work + await asyncio.sleep(0) + if ctx.agent_task.done(): + try: + ctx.agent_task.result() + except Exception as exc: + raise RuntimeError( + f"Agent '{ctx.name}' failed during startup" + ) from exc + finally: + ctx.agent_task = None + ctx.agent_instance = None + + async def _initialize_client(self, client: AgentClient, ctx: AgentContext) -> None: + """Initialize client with retry for local agents.""" + retries = 3 if ctx.agent_task else 1 + delay = 0.2 + for attempt in range(retries): + try: + await client.ensure_initialized() + return + except Exception as exc: + if attempt >= retries - 1: + raise + logger.debug( + "Retrying client initialization for '{}' ({}/{}): {}", + ctx.name, + attempt + 1, + retries, + exc, + ) + await asyncio.sleep(delay) + delay = min(delay * 2, 1.0) + async def _start_listener( self, host: str = "localhost", @@ -264,6 +424,28 @@ async def _cleanup_agent(self, agent_name: str): ctx = self._contexts.get(agent_name) if not ctx: return + agent_task = ctx.agent_task + if agent_task: + if ctx.agent_instance and hasattr(ctx.agent_instance, "shutdown"): + try: + await ctx.agent_instance.shutdown() + except Exception as exc: + logger.warning( + "Error shutting down agent '{}': {}", agent_name, exc + ) + try: + await asyncio.wait_for(agent_task, timeout=5) + except asyncio.TimeoutError: + agent_task.cancel() + try: + await agent_task + except asyncio.CancelledError: + pass + finally: + ctx.agent_task = None + ctx.agent_instance = None + elif ctx.agent_instance is not None: + ctx.agent_instance = None # Close client if ctx.client: await ctx.client.close() diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 037fdecbd..71c353ed2 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Type @@ -58,6 +59,7 @@ def __init__(self, *args, **kwargs): agent_card.url, default_scheme="http" ) self._executor = None + self._server: uvicorn.Server | None = None async def serve(self): # Create AgentExecutor wrapper @@ -88,9 +90,19 @@ async def serve(self): port=self._port, log_level="info", ) - server = uvicorn.Server(config) + self._server = uvicorn.Server(config) logger.info(f"Starting {agent_name} server at {self.agent_card.url}") - await server.serve() + try: + await self._server.serve() + finally: + await client.aclose() + self._server = None + + async def shutdown(self) -> None: + if not self._server: + return + self._server.should_exit = True + await asyncio.sleep(0) # Preserve original class metadata DecoratedAgent.__name__ = cls.__name__ diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 9e165f905..f4b1852bb 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -374,3 +374,259 @@ async def test_get_all_agent_cards_returns_local_cards(tmp_path: Path): assert set(all_cards.keys()) == {"CardOne", "CardTwo"} assert all(isinstance(card, AgentCard) for card in all_cards.values()) + + +@pytest.mark.asyncio +async def test_resolve_local_agent_class_from_metadata( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + # Prepare a card with metadata pointing to a fake spec + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + card = { + "name": "MetaAgent", + "url": "http://127.0.0.1:9001", + "enabled": True, + "metadata": {connect_mod.AGENT_METADATA_CLASS_KEY: "fake:Spec"}, + "skills": [], + } + with open(dir_path / "MetaAgent.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + # Monkeypatch resolver to return DummyAgent class for that spec + class DummyAgent: + pass + + monkeypatch.setattr( + connect_mod, + "_resolve_local_agent_class", + lambda spec: DummyAgent if spec == "fake:Spec" else None, + ) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + ctx = rc._contexts.get("MetaAgent") + assert ctx is not None + assert ctx.agent_instance_class is None + assert ctx.agent_class_spec == "fake:Spec" + + sentinel = object() + monkeypatch.setattr(connect_mod, "create_wrapped_agent", lambda cls: sentinel) + + result = await connect_mod._build_local_agent(ctx) + + assert result is sentinel + assert ctx.agent_instance_class is DummyAgent + + +@pytest.mark.asyncio +async def test_initialize_client_retries(): + rc = RemoteConnections() + + # create a context with agent_task truthy to trigger retries + ctx = connect_mod.AgentContext(name="RetryAgent") + ctx.agent_task = True + + class FlakyClient: + def __init__(self): + self.attempts = 0 + self.agent_card = None + + async def ensure_initialized(self): + self.attempts += 1 + # fail twice then succeed + if self.attempts < 3: + raise RuntimeError("temporary failure") + self.agent_card = AgentCard.model_validate( + { + "name": "X", + "url": "http://x/", + "description": "x", + "capabilities": {"streaming": True, "push_notifications": False}, + "default_input_modes": [], + "default_output_modes": [], + "version": "", + "skills": [], + } + ) + + client = FlakyClient() + + # Call private initializer directly to exercise retry logic + await rc._initialize_client(client, ctx) + + assert client.attempts >= 3 + assert client.agent_card is not None + + +def test_resolve_local_agent_class_empty_spec_returns_none(): + assert connect_mod._resolve_local_agent_class("") is None + + +def test_resolve_local_agent_class_cache_hit(): + spec = "cached:Spec" + sentinel = object() + connect_mod._LOCAL_AGENT_CLASS_CACHE[spec] = sentinel + try: + assert connect_mod._resolve_local_agent_class(spec) is sentinel + finally: + connect_mod._LOCAL_AGENT_CLASS_CACHE.pop(spec, None) + + +def test_resolve_local_agent_class_invalid_spec(): + spec = "valuecell.nonexistent:Missing" + result = connect_mod._resolve_local_agent_class(spec) + assert result is None + + +@pytest.mark.asyncio +async def test_build_local_agent_returns_none_when_no_class(): + ctx = connect_mod.AgentContext(name="NoClass") + ctx.agent_instance_class = None + assert await connect_mod._build_local_agent(ctx) is None + + +@pytest.mark.asyncio +async def test_build_local_agent_invokes_factory(monkeypatch: pytest.MonkeyPatch): + ctx = connect_mod.AgentContext(name="WithClass") + sentinel = object() + + class DummyAgent: + pass + + ctx.agent_instance_class = DummyAgent + monkeypatch.setattr( + connect_mod, + "create_wrapped_agent", + lambda cls: sentinel if cls is DummyAgent else None, + ) + + assert await connect_mod._build_local_agent(ctx) is sentinel + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_returns_when_task_running(): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="RunningAgent") + + async def never(): + await asyncio.Event().wait() + + task = asyncio.create_task(never()) + ctx.agent_task = task + + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_task is task + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_finished_task_failure(): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="FailedAgent") + fut = asyncio.Future() + fut.set_exception(RuntimeError("boom")) + ctx.agent_task = fut + + with pytest.raises(RuntimeError, match="FailedAgent"): + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_task is None + assert ctx.agent_instance is None + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_no_factory(monkeypatch: pytest.MonkeyPatch): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="NoFactory") + + async def _noop(_): + return None + + monkeypatch.setattr(connect_mod, "_build_local_agent", _noop) + + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_instance is None + assert ctx.agent_task is None + + +@pytest.mark.asyncio +async def test_ensure_agent_runtime_new_task_failure(monkeypatch: pytest.MonkeyPatch): + rc = RemoteConnections() + ctx = connect_mod.AgentContext(name="FailingAgent") + + class FailingAgent: + async def serve(self): + raise RuntimeError("serve failed") + + async def _factory(_): + return FailingAgent() + + monkeypatch.setattr(connect_mod, "_build_local_agent", _factory) + + with pytest.raises(RuntimeError, match="FailingAgent"): + await rc._ensure_agent_runtime(ctx) + + assert ctx.agent_task is None + assert ctx.agent_instance is None + + +@pytest.mark.asyncio +async def test_cleanup_agent_handles_timeout(monkeypatch: pytest.MonkeyPatch): + rc = RemoteConnections() + agent_name = "TimeoutAgent" + ctx = connect_mod.AgentContext(name=agent_name) + + shutdown_called = False + + class DummyInstance: + async def shutdown(self): + nonlocal shutdown_called + shutdown_called = True + + async def never(): + await asyncio.Event().wait() + + task = asyncio.create_task(never()) + ctx.agent_task = task + ctx.agent_instance = DummyInstance() + + async def fake_wait_for(task_obj, timeout): + raise asyncio.TimeoutError + + monkeypatch.setattr(connect_mod.asyncio, "wait_for", fake_wait_for) + rc._contexts[agent_name] = ctx + + await rc._cleanup_agent(agent_name) + + assert shutdown_called + assert ctx.agent_task is None + assert ctx.agent_instance is None + assert task.cancelled() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_cleanup_agent_clears_idle_resources(): + rc = RemoteConnections() + agent_name = "IdleAgent" + ctx = connect_mod.AgentContext(name=agent_name) + ctx.agent_instance = object() + listener = asyncio.create_task(asyncio.sleep(0)) + ctx.listener_task = listener + ctx.listener_url = "http://localhost:9999" + rc._contexts[agent_name] = ctx + + await rc._cleanup_agent(agent_name) + + assert ctx.agent_instance is None + assert ctx.listener_task is None + assert ctx.listener_url is None + assert listener.cancelled() or listener.done() diff --git a/python/valuecell/core/agent/tests/test_decorator.py b/python/valuecell/core/agent/tests/test_decorator.py index 57544b90e..527fcddf6 100644 --- a/python/valuecell/core/agent/tests/test_decorator.py +++ b/python/valuecell/core/agent/tests/test_decorator.py @@ -390,7 +390,9 @@ def __init__(self): mock_app.return_value = mock_app_instance # Mock other dependencies - mock_client.return_value = MagicMock() + mock_httpx_client = MagicMock() + mock_httpx_client.aclose = AsyncMock() + mock_client.return_value = mock_httpx_client mock_config_store.return_value = MagicMock() mock_task_store.return_value = MagicMock() mock_sender.return_value = MagicMock() @@ -406,6 +408,45 @@ def __init__(self): with pytest.raises(asyncio.CancelledError): await instance.serve() + @pytest.mark.asyncio + async def test_shutdown_sets_should_exit(self): + """Test shutdown flips server should_exit flag.""" + mock_card = AgentCard( + name="TestAgent", + url="http://localhost:8000", + description="Test agent", + capabilities=AgentCapabilities(streaming=True, push_notifications=False), + default_input_modes=["text"], + default_output_modes=["text"], + version="1.0.0", + skills=[ + { + "id": "test_skill", + "name": "Test Skill", + "description": "A test skill", + "tags": ["test"], + } + ], + ) + + decorator = _serve(mock_card) + + @decorator + class TestAgent2: + def __init__(self): + self._host = "localhost" + self._port = 8000 + + instance = TestAgent2() + + # Simulate a running uvicorn.Server stored on the instance + server = MagicMock() + server.should_exit = False + instance._server = server + + await instance.shutdown() + assert instance._server.should_exit is True + class TestCreateWrappedAgent: """Test create_wrapped_agent function."""