From 2d8024c9d3f3e4ae658443290ce9a39ecbe5c2e2 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:18:34 +0800 Subject: [PATCH 1/6] refactor: remove unused functions --- python/valuecell/core/agent/connect.py | 73 ++------------------------ 1 file changed, 3 insertions(+), 70 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index af3517705..2add1c66e 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -105,9 +105,9 @@ async def load_remote_agents(self, config_dir: str = None) -> None: resolver = A2ACardResolver( httpx_client=httpx_client, base_url=card_data["url"] ) - self._remote_agent_cards[ - agent_name - ] = await resolver.get_agent_card() + self._remote_agent_cards[agent_name] = ( + await resolver.get_agent_card() + ) loaded_count += 1 logger.info( f"Loaded remote agent card: {agent_name} from {json_file.name}" @@ -485,70 +485,3 @@ def get_remote_agent_card(self, agent_name: str) -> dict: def get_default_remote_connections() -> RemoteConnections: """Get the default RemoteConnections instance""" return _default_remote_connections - - -async def load_remote_agents(config_dir: str = None) -> None: - """Load remote agents via the default RemoteConnections instance""" - return await _default_remote_connections.load_remote_agents(config_dir) - - -async def connect_remote_agent(agent_name: str) -> str: - """Connect to a remote agent using the default instance""" - return await _default_remote_connections.connect_remote_agent(agent_name) - - -async def start_agent( - agent_name: str, - with_listener: bool = True, - listener_port: int = None, - listener_host: str = "localhost", - notification_callback: callable = None, -) -> str: - """Start an agent using the default RemoteConnections instance""" - return await _default_remote_connections.start_agent( - agent_name, - with_listener=with_listener, - listener_port=listener_port, - listener_host=listener_host, - notification_callback=notification_callback, - ) - - -async def get_client(agent_name: str) -> AgentClient: - """Get an AgentClient from the default RemoteConnections instance""" - return await _default_remote_connections.get_client(agent_name) - - -async def stop_agent(agent_name: str): - """Stop an agent using the default RemoteConnections instance""" - return await _default_remote_connections.stop_agent(agent_name) - - -def list_running_agents() -> List[str]: - """List running agents from the default RemoteConnections instance""" - return _default_remote_connections.list_running_agents() - - -def list_available_agents() -> List[str]: - """List available agents from the default RemoteConnections instance""" - return _default_remote_connections.list_available_agents() - - -async def stop_all(): - """Stop all agents via the default RemoteConnections instance""" - return await _default_remote_connections.stop_all() - - -def get_agent_info(agent_name: str) -> dict: - """Get agent info from the default RemoteConnections instance""" - return _default_remote_connections.get_agent_info(agent_name) - - -def list_remote_agents() -> List[str]: - """List remote agents from the default RemoteConnections instance""" - return _default_remote_connections.list_remote_agents() - - -def get_remote_agent_card(agent_name: str) -> dict: - """Get remote agent card data from the default RemoteConnections instance""" - return _default_remote_connections.get_remote_agent_card(agent_name) From 29bce7469deaed1b1bb02475879c15bb24151344 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:19:00 +0800 Subject: [PATCH 2/6] refactor RemoteConnections and its tests for improved clarity --- python/valuecell/core/agent/README.md | 26 +- python/valuecell/core/agent/connect.py | 604 ++++++++--------- .../core/agent/tests/test_connect.py | 607 +++++------------- python/valuecell/core/coordinate/planner.py | 4 +- python/valuecell/examples/core_e2e_demo.py | 175 ----- .../examples/core_remote_agent_demo.py | 69 -- .../valuecell/examples/trading_agents_demo.py | 4 +- 7 files changed, 440 insertions(+), 1049 deletions(-) delete mode 100644 python/valuecell/examples/core_e2e_demo.py delete mode 100644 python/valuecell/examples/core_remote_agent_demo.py diff --git a/python/valuecell/core/agent/README.md b/python/valuecell/core/agent/README.md index b676d1306..c9d1d17cd 100644 --- a/python/valuecell/core/agent/README.md +++ b/python/valuecell/core/agent/README.md @@ -122,17 +122,17 @@ await connections.stop_agent("AgentName") await connections.stop_all() ``` -#### Remote Agent Support +#### Remote and Local Are Unified ```python -# List remote Agents -remote_agents = connections.list_remote_agents() - -# Get remote Agent configuration -card_data = connections.get_remote_agent_card("RemoteAgentName") +# All available agents (local + configured URL-only) +available_agents = connections.list_available_agents() -# Get Agent information +# Get Agent information (if implemented) agent_info = connections.get_agent_info("AgentName") + +# Get Agent card (returns None if unavailable; set fetch_if_missing=True to fetch remotely) +card = connections.get_agent_card("AgentName", fetch_if_missing=False) ``` ### 3. AgentClient Class @@ -250,13 +250,9 @@ async def remote_demo(): available_agents = connections.list_available_agents() logger.info(f"Available Agents: {available_agents}") - # List remote Agents - remote_agents = connections.list_remote_agents() - logger.info(f"Remote Agents: {remote_agents}") - - # Connect to remote Agent (if any) - if remote_agents: - agent_name = remote_agents[0] + # Connect to any available Agent (including URL-only) + if available_agents: + agent_name = available_agents[0] try: agent_url = await connections.start_agent(agent_name) logger.info(f"Successfully connected to remote Agent {agent_name}: {agent_url}") @@ -272,7 +268,7 @@ async def remote_demo(): logger.info(f"Remote response: {response}") except Exception as e: - logger.error(f"Failed to connect to remote Agent {agent_name}: {e}") + logger.error(f"Failed to connect to Agent {agent_name}: {e}") if __name__ == "__main__": asyncio.run(remote_demo()) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 2add1c66e..6bd1430e2 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -1,8 +1,9 @@ import asyncio import json import logging +from dataclasses import dataclass from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional import httpx from a2a.client import A2ACardResolver @@ -11,24 +12,38 @@ from valuecell.core.agent.client import AgentClient from valuecell.core.agent.listener import NotificationListener from valuecell.core.types import NotificationCallbackType -from valuecell.utils import get_agent_card_path, get_next_available_port +from valuecell.utils import get_next_available_port logger = logging.getLogger(__name__) +@dataclass +class AgentContext: + """Unified context for both local and remote agents.""" + + name: str + # Connection/runtime state + url: Optional[str] = None + agent_card: Optional[AgentCard] = None + instance: Optional[object] = None # when present, treated as local service + server_task: Optional[asyncio.Task] = None + listener_task: Optional[asyncio.Task] = None + listener_url: Optional[str] = None + client: Optional[AgentClient] = None + # Listener preferences + desired_listener_host: Optional[str] = None + desired_listener_port: Optional[int] = None + notification_callback: Optional[NotificationCallbackType] = None + + class RemoteConnections: """Manager for remote Agent connections""" def __init__(self): - self._connections: Dict[str, AgentClient] = {} - self._running_agents: Dict[str, asyncio.Task] = {} - self._agent_instances: Dict[str, object] = {} - self._listeners: Dict[str, asyncio.Task] = {} - self._listener_urls: Dict[str, str] = {} - # Remote agent cards loaded from config files - self._remote_agent_cards: Dict[str, AgentCard] = {} - # Remote agent configs (JSON data from config files) - self._remote_agent_configs: Dict[str, dict] = {} + # Unified per-agent contexts + self._contexts: Dict[str, AgentContext] = {} + # Whether remote contexts (from configs) have been loaded + self._remote_contexts_loaded: bool = False # Per-agent locks for concurrent start_agent calls self._agent_locks: Dict[str, asyncio.Lock] = {} @@ -38,8 +53,8 @@ def _get_agent_lock(self, agent_name: str) -> asyncio.Lock: self._agent_locks[agent_name] = asyncio.Lock() return self._agent_locks[agent_name] - def _load_remote_agent_configs(self, config_dir: str = None) -> None: - """Load remote agent configs from JSON files (sync operation).""" + def _load_remote_contexts(self, config_dir: str = None) -> None: + """Load remote agent contexts from JSON config files into _contexts.""" if config_dir is None: # Default to python/configs/agent_cards relative to current file current_file = Path(__file__) @@ -50,6 +65,7 @@ def _load_remote_agent_configs(self, config_dir: str = None) -> None: config_dir = Path(config_dir) if not config_dir.exists(): + self._remote_contexts_loaded = True return for json_file in config_dir.glob("*.json"): @@ -66,77 +82,21 @@ def _load_remote_agent_configs(self, config_dir: str = None) -> None: if not all(field in config_data for field in required_fields): continue - self._remote_agent_configs[agent_name] = config_data + # Don't overwrite existing context with a constructed one that has more info + existing = self._contexts.get(agent_name) + if existing and existing.instance: + continue + + url = config_data.get("url") + self._contexts[agent_name] = AgentContext(name=agent_name, url=url) except (json.JSONDecodeError, FileNotFoundError, KeyError): continue + self._remote_contexts_loaded = True - async def load_remote_agents(self, config_dir: str = None) -> None: - """Load remote agent cards from configuration directory.""" - if config_dir is None: - config_dir = get_agent_card_path() - else: - config_dir = Path(config_dir) - - if not config_dir.exists(): - logger.warning(f"Remote agent config directory not found: {config_dir}") - return - - async with httpx.AsyncClient() as httpx_client: - loaded_count = 0 - for json_file in config_dir.glob("*.json"): - try: - with open(json_file, "r", encoding="utf-8") as f: - card_data = json.load(f) - - agent_name = card_data.get("name") - if not agent_name: - logger.warning(f"No 'name' field in {json_file}, skipping") - continue - - # Validate required fields - required_fields = ["name", "url"] - if not all(field in card_data for field in required_fields): - logger.warning( - f"Missing required fields in {json_file}, skipping" - ) - continue - - resolver = A2ACardResolver( - httpx_client=httpx_client, base_url=card_data["url"] - ) - self._remote_agent_cards[agent_name] = ( - await resolver.get_agent_card() - ) - loaded_count += 1 - logger.info( - f"Loaded remote agent card: {agent_name} from {json_file.name}" - ) - - except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: - logger.error( - f"Failed to load remote agent card from {json_file}: {e}" - ) - - logger.info(f"Loaded {loaded_count} remote agent cards from {config_dir}") - - async def connect_remote_agent(self, agent_name: str) -> str: - """Connect to a remote agent (no lifecycle management).""" - if agent_name not in self._remote_agent_configs: - # Auto-load configs if not found - self._load_remote_agent_configs() - - if agent_name not in self._remote_agent_configs: - raise ValueError(f"Remote agent '{agent_name}' not found in loaded cards") - - config_data = self._remote_agent_configs[agent_name] - agent_url = config_data["url"] - - # Create client connection for remote agent - self._connections[agent_name] = AgentClient(agent_url) - - logger.info(f"Connected to remote agent '{agent_name}' at {agent_url}") - return agent_url + def _ensure_remote_contexts_loaded(self) -> None: + if not self._remote_contexts_loaded: + self._load_remote_contexts() async def start_agent( self, @@ -150,258 +110,194 @@ async def start_agent( # Use agent-specific lock to prevent concurrent starts of the same agent agent_lock = self._get_agent_lock(agent_name) async with agent_lock: - # Check if agent is already running - if agent_name in self._running_agents or agent_name in self._connections: - logger.info( - f"Agent '{agent_name}' is already running, returning existing instance" - ) - # Return existing agent card - if agent_name in self._agent_instances: - return self._agent_instances[agent_name].agent_card - elif agent_name in self._remote_agent_cards: - return self._remote_agent_cards[agent_name] - else: - # Fallback: reload agent card - logger.warning( - f"Agent '{agent_name}' running but no cached card, reloading..." - ) + ctx = await self._get_or_create_context(agent_name) - # Check if it's a remote agent first - if agent_name in self._remote_agent_configs: - return await self._handle_remote_agent( - agent_name, - with_listener=with_listener, - listener_host=listener_host, - listener_port=listener_port, - notification_callback=notification_callback, - ) - - # Handle local agent - agent_class = registry.get_agent_class_by_name(agent_name) - if not agent_class: - raise ValueError(f"Agent '{agent_name}' not found in registry") - - # Create Agent instance only if not already exists - if agent_name not in self._agent_instances: - agent_instance = agent_class() - self._agent_instances[agent_name] = agent_instance - logger.info(f"Created new instance for agent '{agent_name}'") - else: - agent_instance = self._agent_instances[agent_name] - logger.info(f"Reusing existing instance for agent '{agent_name}'") + # Record listener preferences on the context + if with_listener: + ctx.desired_listener_host = listener_host + ctx.desired_listener_port = listener_port + ctx.notification_callback = notification_callback - agent_card = agent_instance.agent_card + # If already set up, return card + if ctx.agent_card and (ctx.client or ctx.server_task): + return ctx.agent_card - # Setup listener if needed - try: - listener_url = await self._setup_listener_if_needed( - agent_name, - agent_card, - with_listener, - listener_host, - listener_port, - notification_callback, - ) - except Exception: - await self._cleanup_agent(agent_name) - raise + # Ensure AgentCard + await self._ensure_agent_card(ctx) - # Start agent service only if not already running - try: - if agent_name not in self._running_agents: - await self._start_agent_service(agent_name, agent_instance) - logger.info(f"Started service for agent '{agent_name}'") - else: - logger.info(f"Service for agent '{agent_name}' already running") - except Exception as e: - logger.error(f"Failed to start agent '{agent_name}': {e}") - await self._cleanup_agent(agent_name) - raise RuntimeError(f"Failed to start agent '{agent_name}'") from e - - # Create client connection with listener URL only if not exists - if agent_name not in self._connections: - self._create_client_for_agent(agent_name, agent_card.url, listener_url) - logger.info(f"Created client connection for agent '{agent_name}'") - else: - logger.info( - f"Client connection for agent '{agent_name}' already exists" - ) + # Ensure listener if requested and supported + if with_listener: + await self._ensure_listener(ctx) - return agent_card + # Start local agent service if needed + if ctx.instance: + await self._ensure_local_service(ctx) - async def _setup_listener_if_needed( - self, - agent_name: str, - agent_card: AgentCard, - with_listener: bool, - listener_host: str, - listener_port: int, - notification_callback: NotificationCallbackType, - ) -> str: - """Setup listener for agent if needed and supported. Returns listener URL or None.""" - if ( - not with_listener - or not agent_card - or not agent_card.capabilities.push_notifications - ): - return None + # Ensure client connection + await self._ensure_client(ctx) - try: - return await self._start_listener_for_agent( - agent_name, - listener_host=listener_host, - listener_port=listener_port, - notification_callback=notification_callback, - ) - except Exception as e: - logger.error(f"Failed to start listener for '{agent_name}': {e}") - raise RuntimeError(f"Failed to start listener for '{agent_name}'") from e + return ctx.agent_card - async def _handle_remote_agent( - self, - agent_name: str, - with_listener: bool = True, - listener_port: int = None, - listener_host: str = "localhost", - notification_callback: NotificationCallbackType = None, - ) -> AgentCard: - """Handle remote agent connection and card loading.""" - # Check if remote agent is already connected - if agent_name in self._connections: - logger.info(f"Remote agent '{agent_name}' already connected") - return self._remote_agent_cards.get(agent_name) - - config_data = self._remote_agent_configs[agent_name] - agent_url = config_data["url"] - - # Load actual agent card using A2ACardResolver only if not cached - agent_card = self._remote_agent_cards.get(agent_name) - if not agent_card: + async def _ensure_agent_card(self, ctx: AgentContext) -> None: + """Ensure ctx.agent_card is populated.""" + if ctx.agent_card: + return + if ctx.url and not ctx.instance: + if not ctx.url: + raise ValueError(f"Remote agent '{ctx.name}' missing URL") async with httpx.AsyncClient() as httpx_client: try: resolver = A2ACardResolver( - httpx_client=httpx_client, base_url=agent_url + httpx_client=httpx_client, base_url=ctx.url ) - agent_card = await resolver.get_agent_card() - self._remote_agent_cards[agent_name] = agent_card - logger.info(f"Loaded agent card for remote agent: {agent_name}") + ctx.agent_card = await resolver.get_agent_card() + logger.info(f"Loaded agent card: {ctx.name}") except Exception as e: - logger.error(f"Failed to get agent card for {agent_name}: {e}") + logger.error(f"Failed to get agent card for {ctx.name}: {e}") + # Proceed without card if remote doesn't expose it else: - logger.info(f"Using cached agent card for remote agent: {agent_name}") - - # Setup listener if needed - listener_url = await self._setup_listener_if_needed( - agent_name, - agent_card, - with_listener, - listener_host, - listener_port, - notification_callback, - ) - - # Create client connection with listener URL only if not exists - if agent_name not in self._connections: - self._connections[agent_name] = AgentClient( - agent_url, push_notification_url=listener_url + if not ctx.instance: + # Create instance lazily here to source the card + agent_class = registry.get_agent_class_by_name(ctx.name) + if not agent_class: + raise ValueError(f"Agent '{ctx.name}' not found in registry") + ctx.instance = agent_class() + logger.info(f"Created new instance for agent '{ctx.name}'") + ctx.agent_card = ctx.instance.agent_card + + async def _ensure_listener(self, ctx: AgentContext) -> None: + """Ensure listener is running if supported by agent card.""" + if ctx.listener_task or not ctx.agent_card: + return + if not getattr(ctx.agent_card.capabilities, "push_notifications", False): + return + try: + listener_task, listener_url = await self._start_listener( + host=ctx.desired_listener_host or "localhost", + port=ctx.desired_listener_port, + notification_callback=ctx.notification_callback, ) - logger.info(f"Connected to remote agent '{agent_name}' at {agent_url}") - if listener_url: - logger.info(f" └─ with listener at {listener_url}") - else: - logger.info(f"Already connected to remote agent '{agent_name}'") + ctx.listener_task = listener_task + ctx.listener_url = listener_url + except Exception as e: + logger.error(f"Failed to start listener for '{ctx.name}': {e}") + raise RuntimeError(f"Failed to start listener for '{ctx.name}'") from e - return agent_card + async def _ensure_client(self, ctx: AgentContext) -> None: + """Ensure AgentClient is created and connected.""" + if ctx.client: + return + url = ctx.url or (ctx.agent_card.url if ctx.agent_card else None) + if not url: + raise ValueError(f"Unable to determine URL for agent '{ctx.name}'") + ctx.client = AgentClient(url, push_notification_url=ctx.listener_url) + # Log based on whether it's a local service or a remote URL-only connection + if ctx.instance: + logger.info(f"Started agent '{ctx.name}' at {url}") + else: + logger.info(f"Connected to agent '{ctx.name}' at {url}") + if ctx.listener_url: + logger.info(f" └─ with listener at {ctx.listener_url}") - async def _start_listener_for_agent( + async def _start_listener( self, - agent_name: str, - listener_host: str, - listener_port: int = None, + host: str = "localhost", + port: Optional[int] = None, notification_callback: callable = None, - ) -> str: - """Start a NotificationListener for the agent and return its URL.""" - # Auto-assign port if not specified - if listener_port is None: - listener_port = get_next_available_port(5000) - - # Create and start listener + ) -> tuple[asyncio.Task, str]: + """Start a NotificationListener and return (task, url).""" + if port is None: + port = get_next_available_port(5000) listener = NotificationListener( - host=listener_host, - port=listener_port, + host=host, + port=port, notification_callback=notification_callback, ) - listener_task = asyncio.create_task(listener.start_async()) - self._listeners[agent_name] = listener_task - - listener_url = f"http://{listener_host}:{listener_port}/notify" - self._listener_urls[agent_name] = listener_url - - # Wait a moment for listener to start + listener_url = f"http://{host}:{port}/notify" await asyncio.sleep(0.3) - logger.info(f"Started listener for '{agent_name}' at {listener_url}") - - return listener_url + logger.info(f"Started listener at {listener_url}") + return listener_task, listener_url - async def _start_agent_service(self, agent_name: str, agent_instance: object): - """Start the agent service (serve) and track the running task.""" - server_task = asyncio.create_task(agent_instance.serve()) - self._running_agents[agent_name] = server_task - - # Wait for agent to start + async def _ensure_local_service(self, ctx: AgentContext): + """Start the local agent service if not already running.""" + if ctx.server_task: + return + if not ctx.instance: + agent_class = registry.get_agent_class_by_name(ctx.name) + if not agent_class: + raise ValueError(f"Agent '{ctx.name}' not found in registry") + ctx.instance = agent_class() + logger.info(f"Created new instance for agent '{ctx.name}'") + server_task = asyncio.create_task(ctx.instance.serve()) + ctx.server_task = server_task await asyncio.sleep(0.5) - def _create_client_for_agent( - self, agent_name: str, agent_url: str, listener_url: str = None - ): - """Create an AgentClient for the agent and record the connection.""" - self._connections[agent_name] = AgentClient( - agent_url, push_notification_url=listener_url - ) + async def _get_or_create_context( + self, + agent_name: str, + ) -> AgentContext: + """Get or initialize an AgentContext for local or remote agents.""" + # Load remote contexts lazily + self._ensure_remote_contexts_loaded() + + ctx = self._contexts.get(agent_name) + if ctx: + return ctx + + # Try local agent from registry + agent_class = registry.get_agent_class_by_name(agent_name) + if agent_class: + instance = agent_class() + ctx = AgentContext(name=agent_name, instance=instance) + try: + ctx.agent_card = instance.agent_card + except Exception: + pass + self._contexts[agent_name] = ctx + return ctx - logger.info(f"Started agent '{agent_name}' at {agent_url}") - if listener_url: - logger.info(f" └─ with listener at {listener_url}") + # If not local and not preloaded as remote, it's unknown + raise ValueError( + f"Agent '{agent_name}' not found (neither local nor remote config)" + ) async def _cleanup_agent(self, agent_name: str): """Clean up all resources for an agent""" - # Close client connection - if agent_name in self._connections: - await self._connections[agent_name].close() - + ctx = self._contexts.get(agent_name) + if not ctx: + return + # Close client + if ctx.client: + await ctx.client.close() + ctx.client = None # Stop listener - if agent_name in self._listeners: - self._listeners[agent_name].cancel() + if ctx.listener_task: + ctx.listener_task.cancel() try: - await self._listeners[agent_name] + await ctx.listener_task except asyncio.CancelledError: pass - del self._listeners[agent_name] - - # Stop agent - if agent_name in self._running_agents: - self._running_agents[agent_name].cancel() + ctx.listener_task = None + ctx.listener_url = None + # Stop local agent + if ctx.server_task: + ctx.server_task.cancel() try: - await self._running_agents[agent_name] + await ctx.server_task except asyncio.CancelledError: pass - del self._running_agents[agent_name] - - # Clean up references - if agent_name in self._connections: - del self._connections[agent_name] - if agent_name in self._agent_instances: - del self._agent_instances[agent_name] - if agent_name in self._listener_urls: - del self._listener_urls[agent_name] + ctx.server_task = None + # Remove context + del self._contexts[agent_name] async def get_client(self, agent_name: str) -> AgentClient: """Get Agent client connection""" - if agent_name not in self._connections: + ctx = self._contexts.get(agent_name) + if not ctx or not ctx.client: await self.start_agent(agent_name) - - return self._connections[agent_name] + ctx = self._contexts.get(agent_name) + return ctx.client async def stop_agent(self, agent_name: str): """Stop Agent service and associated listener""" @@ -410,71 +306,89 @@ async def stop_agent(self, agent_name: str): def list_running_agents(self) -> List[str]: """List running agents""" - return list(self._running_agents.keys()) + return [name for name, ctx in self._contexts.items() if ctx.server_task] def list_available_agents(self) -> List[str]: - """List all available agents from registry and remote cards""" - # Auto-load remote agent configs if not already loaded - if not self._remote_agent_configs: - self._load_remote_agent_configs() - + """List all available agents from registry and config cards""" + # Ensure remote contexts are loaded + self._ensure_remote_contexts_loaded() local_agents = registry.list_agent_names() - remote_agents = list(self._remote_agent_configs.keys()) - return local_agents + remote_agents + remote_agents = [ + name for name, ctx in self._contexts.items() if ctx.url and not ctx.instance + ] + # Deduplicate while preserving order (locals first) + seen = set() + merged = [] + for name in local_agents + remote_agents: + if name not in seen: + seen.add(name) + merged.append(name) + return merged async def stop_all(self): """Stop all running agents""" - for agent_name in list(self._running_agents.keys()): + for agent_name in list(self._contexts.keys()): await self.stop_agent(agent_name) - def get_agent_info(self, agent_name: str) -> dict: - """Get agent information including listener info""" - # Check if it's a local agent - if agent_name in self._agent_instances: - agent_instance = self._agent_instances[agent_name] - return { - "name": agent_name, - "type": "local", - "url": agent_instance.agent_card.url, - "listener_url": self._listener_urls.get(agent_name), - "card": agent_instance.agent_card.model_dump(exclude_none=True), - "running": agent_name in self._running_agents, - "has_listener": agent_name in self._listeners, - } - - # Check if it's a remote agent - if agent_name in self._remote_agent_configs: - config_data = self._remote_agent_configs[agent_name] - agent_card = self._remote_agent_cards.get(agent_name) - return { - "name": agent_name, - "type": "remote", - "url": config_data.get("url"), - "card": ( - agent_card.model_dump(exclude_none=True) - if agent_card - else config_data - ), - "connected": agent_name in self._connections, - "running": False, # Remote agents are not managed by us - "has_listener": False, - } + def get_agent_card( + self, agent_name: str, fetch_if_missing: bool = False + ) -> Optional[AgentCard]: + """Get AgentCard object for any known agent; returns None if not available. + + By default, this does not perform network I/O. If fetch_if_missing is True + and a URL is available for the agent, this will attempt to fetch the card: + - If no event loop is running, it will fetch synchronously (blocking). + - If an event loop is running, it schedules a background fetch and returns + None immediately (the card will be cached when the task completes). + """ + self._ensure_remote_contexts_loaded() + ctx = self._contexts.get(agent_name) + if not ctx: + # Try to construct a local context from registry for convenience + agent_class = registry.get_agent_class_by_name(agent_name) + if agent_class: + instance = agent_class() + ctx = AgentContext(name=agent_name, instance=instance) + try: + ctx.agent_card = instance.agent_card + except Exception: + pass + self._contexts[agent_name] = ctx + else: + return None + if ctx.agent_card: + return ctx.agent_card + if ctx.instance: + try: + return ctx.instance.agent_card + except Exception: + return None + # URL-only context without cached card + if fetch_if_missing and ctx.url: + + async def _fetch_card(): + async with httpx.AsyncClient() as httpx_client: + try: + resolver = A2ACardResolver( + httpx_client=httpx_client, base_url=ctx.url + ) # type: ignore[arg-type] + card = await resolver.get_agent_card() + ctx.agent_card = card + logger.info(f"Fetched and cached agent card: {ctx.name}") + except Exception as e: + logger.error(f"Failed to fetch agent card for {ctx.name}: {e}") - return None + try: + loop = asyncio.get_running_loop() + # Schedule background fetch; caller can retry later + loop.create_task(_fetch_card()) + return None + except RuntimeError: + # No running loop: perform fetch synchronously + asyncio.run(_fetch_card()) + return ctx.agent_card - def list_remote_agents(self) -> List[str]: - """List remote agents loaded from config files""" - # Auto-load remote agent configs if not already loaded - if not self._remote_agent_configs: - self._load_remote_agent_configs() - return list(self._remote_agent_configs.keys()) - - def get_remote_agent_card(self, agent_name: str) -> dict: - """Get remote agent card data""" - # Return actual AgentCard if available, otherwise config data - if agent_name in self._remote_agent_cards: - return self._remote_agent_cards[agent_name] - return self._remote_agent_configs.get(agent_name) + return None # Global default instance for backward compatibility and ease of use diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py index 22359d701..7b4c0c299 100644 --- a/python/valuecell/core/agent/tests/test_connect.py +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -1,5 +1,5 @@ """ -Additional comprehensive tests for RemoteConnections to improve coverage. +Updated tests for RemoteConnections after simplifying local/remote handling. """ import asyncio @@ -9,489 +9,214 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from valuecell.core.agent.connect import RemoteConnections +from valuecell.core.agent.connect import RemoteConnections, AgentContext -class TestRemoteConnectionsComprehensive: - """Comprehensive tests to improve coverage of RemoteConnections.""" - +class TestRemoteConnectionsUnified: def setup_method(self): - """Setup before each test method.""" self.instance = RemoteConnections() - def test_init_creates_all_required_attributes(self): - """Test that __init__ properly initializes all attributes.""" - instance = RemoteConnections() - - assert isinstance(instance._connections, dict) - assert isinstance(instance._running_agents, dict) - assert isinstance(instance._agent_instances, dict) - assert isinstance(instance._listeners, dict) - assert isinstance(instance._listener_urls, dict) - assert isinstance(instance._remote_agent_cards, dict) - assert isinstance(instance._remote_agent_configs, dict) - assert isinstance(instance._agent_locks, dict) - - # All should be empty initially - assert len(instance._connections) == 0 - assert len(instance._running_agents) == 0 - assert len(instance._agent_instances) == 0 - assert len(instance._listeners) == 0 - assert len(instance._listener_urls) == 0 - assert len(instance._remote_agent_cards) == 0 - assert len(instance._remote_agent_configs) == 0 - assert len(instance._agent_locks) == 0 - - def test_load_remote_agent_configs_with_invalid_json(self): - """Test loading remote agent configs with invalid JSON.""" - with tempfile.TemporaryDirectory() as temp_dir: - # Create file with invalid JSON - invalid_file = Path(temp_dir) / "invalid.json" - with open(invalid_file, "w") as f: - f.write("{ invalid json content") - - # Should not raise exception - self.instance._load_remote_agent_configs(temp_dir) - - # Should not load any configs - assert len(self.instance._remote_agent_configs) == 0 + def test_init_creates_minimal_attributes(self): + inst = RemoteConnections() + assert isinstance(inst._contexts, dict) + assert isinstance(inst._agent_locks, dict) + assert inst._remote_contexts_loaded is False + assert len(inst._contexts) == 0 - def test_load_remote_agent_configs_with_missing_name(self): - """Test loading remote agent configs with missing name field.""" + def test_load_remote_contexts_validation(self): with tempfile.TemporaryDirectory() as temp_dir: - # Create file without name field - no_name_file = Path(temp_dir) / "no_name.json" - config_data = { - "url": "http://localhost:8000", - "description": "Test agent without name", - } - with open(no_name_file, "w") as f: - json.dump(config_data, f) - - self.instance._load_remote_agent_configs(temp_dir) - - # Should not load config without name - assert len(self.instance._remote_agent_configs) == 0 - - def test_load_remote_agent_configs_with_missing_url(self): - """Test loading remote agent configs with missing URL field.""" - with tempfile.TemporaryDirectory() as temp_dir: - # Create file without URL field - no_url_file = Path(temp_dir) / "no_url.json" - config_data = { - "name": "test_agent", - "description": "Test agent without URL", - } - with open(no_url_file, "w") as f: - json.dump(config_data, f) - - self.instance._load_remote_agent_configs(temp_dir) - - # Should not load config without URL - assert len(self.instance._remote_agent_configs) == 0 - - @pytest.mark.asyncio - async def test_load_remote_agents_with_nonexistent_directory(self): - """Test load_remote_agents with non-existent directory.""" - with patch( - "valuecell.core.agent.connect.get_agent_card_path", - return_value=Path("/nonexistent"), - ): - # Should not raise exception - await self.instance.load_remote_agents() - - # Should not load any agents - assert len(self.instance._remote_agent_cards) == 0 - - @pytest.mark.asyncio - async def test_load_remote_agents_with_http_error(self): - """Test load_remote_agents when HTTP client fails.""" - # This test is challenging because the actual implementation doesn't - # have proper exception handling in the load_remote_agents method. - # We'll skip this for now and focus on other coverage improvements. - pytest.skip( - "Skipping test due to missing exception handling in load_remote_agents" - ) + # invalid JSON + (Path(temp_dir) / "invalid.json").write_text("{ invalid") - @pytest.mark.asyncio - async def test_connect_remote_agent_not_found(self): - """Test connect_remote_agent with non-existent agent.""" - with pytest.raises(ValueError, match="Remote agent 'nonexistent' not found"): - await self.instance.connect_remote_agent("nonexistent") - - @pytest.mark.asyncio - async def test_connect_remote_agent_success(self): - """Test successful remote agent connection.""" - # Set up remote agent config - self.instance._remote_agent_configs["test_agent"] = { - "name": "test_agent", - "url": "http://localhost:8000", - } - - with patch("valuecell.core.agent.connect.AgentClient") as mock_client: - result = await self.instance.connect_remote_agent("test_agent") - - assert result == "http://localhost:8000" - assert "test_agent" in self.instance._connections - mock_client.assert_called_once_with("http://localhost:8000") - - @pytest.mark.asyncio - async def test_start_agent_remote_agent_flow(self): - """Test start_agent with remote agent.""" - # Set up remote agent config - self.instance._remote_agent_configs["remote_agent"] = { - "name": "remote_agent", - "url": "http://localhost:8000", - } + # missing name + (Path(temp_dir) / "no_name.json").write_text( + json.dumps({"url": "http://localhost:8000"}) + ) - mock_card = MagicMock() - mock_card.capabilities.push_notifications = False + # missing url + (Path(temp_dir) / "no_url.json").write_text( + json.dumps({"name": "agent_without_url"}) + ) - with patch.object( - self.instance, "_handle_remote_agent", return_value=mock_card - ) as mock_handle: - result = await self.instance.start_agent("remote_agent") + # valid + (Path(temp_dir) / "ok.json").write_text( + json.dumps({"name": "remote_ok", "url": "http://localhost:9000"}) + ) - assert result == mock_card - mock_handle.assert_called_once() + self.instance._load_remote_contexts(temp_dir) + assert "remote_ok" in self.instance._contexts + ctx = self.instance._contexts["remote_ok"] + assert ctx.url == "http://localhost:9000" + assert ctx.instance is None @pytest.mark.asyncio - async def test_start_agent_local_agent_not_found(self): - """Test start_agent with non-existent local agent.""" + async def test_start_agent_local_not_found(self): with patch( "valuecell.core.agent.registry.get_agent_class_by_name", return_value=None ): - with pytest.raises( - ValueError, match="Agent 'nonexistent' not found in registry" - ): + with pytest.raises(ValueError, match="not found"): await self.instance.start_agent("nonexistent") @pytest.mark.asyncio - async def test_start_agent_already_running(self): - """Test start_agent with already running agent.""" - # Mock agent instance - mock_instance = MagicMock() - mock_card = MagicMock() - mock_instance.agent_card = mock_card - - self.instance._agent_instances["test_agent"] = mock_instance - self.instance._running_agents["test_agent"] = MagicMock() - - result = await self.instance.start_agent("test_agent") - assert result == mock_card - - @pytest.mark.asyncio - async def test_start_agent_with_listener_setup_failure(self): - """Test start_agent when listener setup fails.""" - mock_agent_class = MagicMock() + async def test_start_agent_local_with_listener(self): + # Mock local agent class mock_instance = MagicMock() mock_card = MagicMock() + mock_card.url = "http://localhost:8100" mock_card.capabilities.push_notifications = True mock_instance.agent_card = mock_card - mock_agent_class.return_value = mock_instance with patch( "valuecell.core.agent.registry.get_agent_class_by_name", - return_value=mock_agent_class, + return_value=lambda: mock_instance, ): with patch.object( self.instance, - "_setup_listener_if_needed", - side_effect=Exception("Listener failed"), - ): - with patch.object(self.instance, "_cleanup_agent") as mock_cleanup: - with pytest.raises(Exception, match="Listener failed"): - await self.instance.start_agent( - "test_agent", with_listener=True - ) - - mock_cleanup.assert_called_once_with("test_agent") - - @pytest.mark.asyncio - async def test_start_agent_service_failure(self): - """Test start_agent when agent service start fails.""" - mock_agent_class = MagicMock() - mock_instance = MagicMock() - mock_card = MagicMock() - mock_card.capabilities.push_notifications = False - mock_instance.agent_card = mock_card - mock_agent_class.return_value = mock_instance - - with patch( - "valuecell.core.agent.registry.get_agent_class_by_name", - return_value=mock_agent_class, - ): - with patch.object( - self.instance, - "_start_agent_service", - side_effect=Exception("Service failed"), - ): - with patch.object(self.instance, "_cleanup_agent") as mock_cleanup: - with pytest.raises( - RuntimeError, match="Failed to start agent 'test_agent'" - ): - await self.instance.start_agent("test_agent") - - mock_cleanup.assert_called_once_with("test_agent") - - @pytest.mark.asyncio - async def test_setup_listener_if_needed_no_listener(self): - """Test _setup_listener_if_needed when listener is not needed.""" - mock_card = MagicMock() - mock_card.capabilities.push_notifications = True - - result = await self.instance._setup_listener_if_needed( - "test_agent", - mock_card, - with_listener=False, - listener_host="localhost", - listener_port=5000, - notification_callback=None, - ) - - assert result is None - - @pytest.mark.asyncio - async def test_setup_listener_if_needed_no_push_notifications(self): - """Test _setup_listener_if_needed when agent doesn't support push notifications.""" - mock_card = MagicMock() - mock_card.capabilities.push_notifications = False - - result = await self.instance._setup_listener_if_needed( - "test_agent", - mock_card, - with_listener=True, - listener_host="localhost", - listener_port=5000, - notification_callback=None, + "_start_listener", + return_value=( + MagicMock(), + "http://localhost:5555/notify", + ), + ) as mock_listener: + with patch("asyncio.create_task") as mock_task: + with patch("asyncio.sleep", new=AsyncMock()): + with patch( + "valuecell.core.agent.connect.AgentClient" + ) as mock_client: + result = await self.instance.start_agent( + "local_agent", with_listener=True, listener_port=5555 + ) + + assert result is mock_card + mock_listener.assert_called_once() + mock_task.assert_called() # serve() + mock_client.assert_called_once() + + @pytest.mark.asyncio + async def test_start_agent_url_only_success(self): + # Preload a URL-only context + self.instance._contexts["remote_agent"] = AgentContext( + name="remote_agent", url="http://localhost:9001" ) - assert result is None + fake_card = MagicMock() + fake_card.capabilities.push_notifications = False - @pytest.mark.asyncio - async def test_setup_listener_if_needed_failure(self): - """Test _setup_listener_if_needed when listener start fails.""" - mock_card = MagicMock() - mock_card.capabilities.push_notifications = True + class FakeResolver: + async def get_agent_card(self): + return fake_card - with patch.object( - self.instance, - "_start_listener_for_agent", - side_effect=Exception("Listener failed"), - ): - with pytest.raises( - RuntimeError, match="Failed to start listener for 'test_agent'" + with patch("httpx.AsyncClient"): + with patch( + "valuecell.core.agent.connect.A2ACardResolver", + return_value=FakeResolver(), ): - await self.instance._setup_listener_if_needed( - "test_agent", - mock_card, - with_listener=True, - listener_host="localhost", - listener_port=5000, - notification_callback=None, - ) - - @pytest.mark.asyncio - async def test_handle_remote_agent_already_connected(self): - """Test _handle_remote_agent when agent is already connected.""" - mock_card = MagicMock() - self.instance._connections["remote_agent"] = MagicMock() - self.instance._remote_agent_cards["remote_agent"] = mock_card - - result = await self.instance._handle_remote_agent("remote_agent") - assert result == mock_card - - @pytest.mark.asyncio - async def test_handle_remote_agent_card_loading_failure(self): - """Test _handle_remote_agent when card loading fails.""" - self.instance._remote_agent_configs["remote_agent"] = { - "name": "remote_agent", - "url": "http://localhost:8000", - } + with patch("valuecell.core.agent.connect.AgentClient") as mock_client: + result = await self.instance.start_agent("remote_agent") + assert result is fake_card + mock_client.assert_called_once_with( + "http://localhost:9001", push_notification_url=None + ) + + def test_get_agent_card_fetch_sync(self): + # Create url-only context; call from sync test to trigger blocking fetch + ctx = MagicMock() + ctx.name = "url_only" + ctx.url = "http://localhost:9100" + ctx.instance = None + ctx.agent_card = None + self.instance._contexts["url_only"] = ctx + + fake_card = MagicMock() + + class FakeResolver: + async def get_agent_card(self): + return fake_card with patch("httpx.AsyncClient"): - with patch("valuecell.core.agent.connect.A2ACardResolver") as mock_resolver: - mock_resolver.return_value.get_agent_card.side_effect = Exception( - "Card loading failed" - ) - - await self.instance._handle_remote_agent("remote_agent") - # Should handle error gracefully and still create connection - assert "remote_agent" in self.instance._connections - - @pytest.mark.asyncio - async def test_start_listener_for_agent_with_auto_port(self): - """Test _start_listener_for_agent with automatic port assignment.""" - with patch( - "valuecell.core.agent.connect.get_next_available_port", return_value=5555 - ): - with patch("valuecell.core.agent.connect.NotificationListener"): - with patch("asyncio.create_task"): - with patch("asyncio.sleep"): - result = await self.instance._start_listener_for_agent( - "test_agent", "localhost" - ) - - assert result == "http://localhost:5555/notify" - assert "test_agent" in self.instance._listeners - assert ( - self.instance._listener_urls["test_agent"] - == "http://localhost:5555/notify" - ) - - @pytest.mark.asyncio - async def test_start_agent_service(self): - """Test _start_agent_service method.""" - mock_agent = MagicMock() - mock_agent.serve = AsyncMock() - - with patch("asyncio.create_task") as mock_task: - with patch("asyncio.sleep"): - await self.instance._start_agent_service("test_agent", mock_agent) - - mock_task.assert_called_once() - assert "test_agent" in self.instance._running_agents - - def test_create_client_for_agent(self): - """Test _create_client_for_agent method.""" - with patch("valuecell.core.agent.connect.AgentClient") as mock_client: - self.instance._create_client_for_agent( - "test_agent", "http://localhost:8000", "http://localhost:5000/notify" - ) - - mock_client.assert_called_once_with( - "http://localhost:8000", - push_notification_url="http://localhost:5000/notify", - ) - assert "test_agent" in self.instance._connections + with patch( + "valuecell.core.agent.connect.A2ACardResolver", + return_value=FakeResolver(), + ): + card = self.instance.get_agent_card("url_only", fetch_if_missing=True) + assert card is fake_card + assert ctx.agent_card is fake_card @pytest.mark.asyncio - async def test_cleanup_agent_complete(self): - """Test _cleanup_agent with all resources present.""" - # Set up mock resources + async def test_cleanup_agent(self): + # Prepare context with async resources mock_client = AsyncMock() - - # Create proper task mocks that can be awaited - mock_listener_task = asyncio.create_task(asyncio.sleep(0)) - mock_agent_task = asyncio.create_task(asyncio.sleep(0)) - - # Cancel them immediately to simulate cleanup - mock_listener_task.cancel() - mock_agent_task.cancel() - - self.instance._connections["test_agent"] = mock_client - self.instance._listeners["test_agent"] = mock_listener_task - self.instance._running_agents["test_agent"] = mock_agent_task - self.instance._agent_instances["test_agent"] = MagicMock() - self.instance._listener_urls["test_agent"] = "http://localhost:5000/notify" - - await self.instance._cleanup_agent("test_agent") - - # Verify cleanup + listener_task = asyncio.create_task(asyncio.sleep(0)) + server_task = asyncio.create_task(asyncio.sleep(0)) + # cancel promptly + listener_task.cancel() + server_task.cancel() + + # Build a minimal context-like object + class Ctx: + pass + + ctx = Ctx() + ctx.name = "cleanme" + ctx.client = mock_client + ctx.listener_task = listener_task + ctx.listener_url = "http://localhost:5555/notify" + ctx.server_task = server_task + ctx.instance = None + ctx.url = "http://localhost:9999" + ctx.agent_card = None + + self.instance._contexts["cleanme"] = ctx + + await self.instance._cleanup_agent("cleanme") mock_client.close.assert_called_once() - - assert "test_agent" not in self.instance._connections - assert "test_agent" not in self.instance._listeners - assert "test_agent" not in self.instance._running_agents - assert "test_agent" not in self.instance._agent_instances - assert "test_agent" not in self.instance._listener_urls + assert "cleanme" not in self.instance._contexts @pytest.mark.asyncio - async def test_get_client_starts_agent_if_not_exists(self): - """Test get_client starts agent if connection doesn't exist.""" - mock_client = MagicMock() - + async def test_get_client_starts_when_missing(self): with patch.object(self.instance, "start_agent") as mock_start: - # Mock start_agent to add the connection - async def side_effect(agent_name): - self.instance._connections[agent_name] = mock_client + + async def side_effect(name): + # inject a context with client + class Ctx: + pass + + c = Ctx() + c.name = name + c.client = MagicMock() + c.instance = None + c.url = "http://localhost:9200" + c.agent_card = None + c.server_task = None + c.listener_task = None + c.listener_url = None + c.desired_listener_host = None + c.desired_listener_port = None + c.notification_callback = None + self.instance._contexts[name] = c return MagicMock() mock_start.side_effect = side_effect - - result = await self.instance.get_client("test_agent") - - mock_start.assert_called_once_with("test_agent") - assert result == mock_client - - def test_get_agent_info_remote_agent(self): - """Test get_agent_info for remote agent.""" - self.instance._remote_agent_configs["remote_agent"] = { - "name": "remote_agent", - "url": "http://localhost:8000", - } - - result = self.instance.get_agent_info("remote_agent") - - assert result["name"] == "remote_agent" - assert result["type"] == "remote" - assert result["url"] == "http://localhost:8000" - assert result["connected"] is False - assert result["running"] is False - - def test_get_agent_info_remote_agent_with_card(self): - """Test get_agent_info for remote agent with loaded card.""" - mock_card = MagicMock() - mock_card.model_dump.return_value = {"name": "remote_agent", "capabilities": {}} - - self.instance._remote_agent_configs["remote_agent"] = { - "name": "remote_agent", - "url": "http://localhost:8000", - } - self.instance._remote_agent_cards["remote_agent"] = mock_card - - result = self.instance.get_agent_info("remote_agent") - - assert result["card"] == {"name": "remote_agent", "capabilities": {}} - - def test_get_agent_info_local_agent(self): - """Test get_agent_info for local agent.""" - mock_instance = MagicMock() - mock_card = MagicMock() - mock_card.url = "http://localhost:8001" - mock_card.model_dump.return_value = {"name": "local_agent"} - mock_instance.agent_card = mock_card - - self.instance._agent_instances["local_agent"] = mock_instance - self.instance._running_agents["local_agent"] = MagicMock() - self.instance._listeners["local_agent"] = MagicMock() - self.instance._listener_urls["local_agent"] = "http://localhost:5000/notify" - - result = self.instance.get_agent_info("local_agent") - - assert result["name"] == "local_agent" - assert result["type"] == "local" - assert result["url"] == "http://localhost:8001" - assert result["running"] is True - assert result["has_listener"] is True - assert result["listener_url"] == "http://localhost:5000/notify" - - def test_get_agent_info_nonexistent(self): - """Test get_agent_info for non-existent agent.""" - result = self.instance.get_agent_info("nonexistent") - assert result is None - - def test_get_remote_agent_card_with_card(self): - """Test get_remote_agent_card when card is available.""" - mock_card = {"name": "test_agent", "capabilities": {}} - self.instance._remote_agent_cards["test_agent"] = mock_card - - result = self.instance.get_remote_agent_card("test_agent") - assert result == mock_card - - def test_get_remote_agent_card_config_only(self): - """Test get_remote_agent_card when only config is available.""" - config_data = {"name": "test_agent", "url": "http://localhost:8000"} - self.instance._remote_agent_configs["test_agent"] = config_data - - result = self.instance.get_remote_agent_card("test_agent") - assert result == config_data - - def test_get_remote_agent_card_none(self): - """Test get_remote_agent_card when neither card nor config is available.""" - result = self.instance.get_remote_agent_card("nonexistent") - assert result is None - - -if __name__ == "__main__": - pytest.main([__file__]) + client = await self.instance.get_client("need_client") + assert client is self.instance._contexts["need_client"].client + + def test_list_available_only(self): + # simulate loaded contexts + class Ctx: + pass + + local_ctx = Ctx() + local_ctx.instance = MagicMock() + local_ctx.url = None + remote_ctx = Ctx() + remote_ctx.instance = None + remote_ctx.url = "http://x" + self.instance._contexts["local_a"] = local_ctx + self.instance._contexts["url_b"] = remote_ctx + with patch( + "valuecell.core.agent.registry.list_agent_names", + return_value=["local_a", "other_local"], + ): + names = self.instance.list_available_agents() + assert "local_a" in names + assert "url_b" in names diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index 21cb98ffc..e15879e3f 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -222,12 +222,12 @@ def tool_get_agent_description(self, agent_name: str) -> str: Returns: str: A description of the agent's capabilities and supported operations """ - self.agent_connections.list_remote_agents() - if card := self.agent_connections.get_remote_agent_card(agent_name): + if card := self.agent_connections.get_agent_card(agent_name): if isinstance(card, AgentCard): return agentcard_to_prompt(card) if isinstance(card, dict): return str(card) + return agentcard_to_prompt(card) return "The requested agent could not be found or is not available." diff --git a/python/valuecell/examples/core_e2e_demo.py b/python/valuecell/examples/core_e2e_demo.py deleted file mode 100644 index 3e59cb416..000000000 --- a/python/valuecell/examples/core_e2e_demo.py +++ /dev/null @@ -1,175 +0,0 @@ -import asyncio -import logging -from valuecell.core.agent.decorator import serve -from valuecell.core.agent.connect import RemoteConnections - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - - -# Demo agents using the @serve decorator -@serve(push_notifications=True) -class CalculatorAgent: - """A calculator agent that can do basic math""" - - def __init__(self): - logger.info("Initializing CalculatorAgent") - self.agent_name = "CalculatorAgent" - - async def stream(self, query, session_id, task_id): - """Process math queries""" - logger.info(f"Calculator processing: {query}") - - yield {"is_task_complete": False, "content": f"🧮 Calculating: {query}"} - await asyncio.sleep(0.5) - - try: - # Simple math evaluation (in real world, use safe parsing) - if any(op in query for op in ["+", "-", "*", "/", "(", ")"]): - # For demo, just respond with a mock calculation - result = "42" # Mock result - yield {"is_task_complete": False, "content": "💭 Computing result..."} - await asyncio.sleep(0.5) - yield {"is_task_complete": True, "content": f"✅ Result: {result}"} - else: - yield { - "is_task_complete": True, - "content": "❓ I can help with math calculations. Try something like '2 + 3'", - } - except Exception as e: - yield { - "is_task_complete": True, - "content": f"❌ Error in calculation: {str(e)}", - } - - -@serve( - port=9101, - push_notifications=True, - description="Provides weather information", -) -class WeatherAgent: - """A weather information agent""" - - def __init__(self): - logger.info("Initializing WeatherAgent") - self.agent_name = "WeatherAgent" - - async def stream(self, query, session_id, task_id): - """Process weather queries""" - logger.info(f"Weather processing: {query}") - - yield {"is_task_complete": False, "content": f"🌤️ Checking weather for: {query}"} - await asyncio.sleep(0.8) - - if "weather" in query.lower(): - yield { - "is_task_complete": False, - "content": "🌡️ Fetching current conditions...", - } - await asyncio.sleep(0.5) - yield { - "is_task_complete": False, - "content": "📊 Analyzing forecast data...", - } - await asyncio.sleep(0.5) - yield { - "is_task_complete": True, - "content": f"☀️ Weather report: Sunny, 22°C. Perfect day! (for query: {query})", - } - else: - yield { - "is_task_complete": True, - "content": "🌍 I provide weather information. Ask me about the weather in any location!", - } - - -@serve() -class SimpleAgent: - """A simple non-streaming agent""" - - async def stream(self, query, session_id, task_id): - """Simple response""" - yield {"is_task_complete": True, "content": f"Simple response to: {query}"} - - -async def demo_complete_system(): - """Complete demonstration of the decorator system""" - logger.info("🚀 Starting Complete A2A Decorator System Demo") - - # Create connections manager - connections = RemoteConnections() - - try: - # Show available agents from registry - available = connections.list_available_agents() - logger.info(f"📋 Available agents from registry: {available}") - - # Start multiple agents - logger.info("▶️ Starting multiple agents...") - - calc_url = await connections.start_agent("CalculatorAgent") - weather_url = await connections.start_agent("WeatherAgent") - simple_url = await connections.start_agent("SimpleAgent") - - logger.info(f"🧮 Calculator Agent: {calc_url}") - logger.info(f"🌤️ Weather Agent: {weather_url}") - logger.info(f"📝 Simple Agent: {simple_url}") - - # Wait for all agents to fully start - await asyncio.sleep(3) - - # Show running agents - running = connections.list_running_agents() - logger.info(f"🏃 Running agents: {running}") - - # Test Calculator Agent - logger.info("🧪 Testing Calculator Agent...") - client = await connections.get_client("CalculatorAgent") - task, event = await client.send_message("What is 15 + 27?") - logger.info(f"Calculator result: {task.status}") - - # # Test Weather Agent - logger.info("🧪 Testing Weather Agent...") - client = await connections.get_client("WeatherAgent") - task, event = await client.send_message( - "What's the weather like in San Francisco?" - ) - logger.info(f"Weather result: {task.status}") - - # Test Simple Agent - logger.info("🧪 Testing Simple Agent...") - client = await connections.get_client("SimpleAgent") - task, event = await client.send_message("Hello simple agent") - logger.info(f"Simple agent result: {task.status}") - - await asyncio.sleep(5) - # Show agent information - for agent_name in running: - info = connections.get_agent_info(agent_name) - if info: - logger.info( - f"ℹ️ {agent_name}: {info['url']} (running: {info['running']})" - ) - - logger.info("✅ All tests completed successfully!") - - except Exception as e: - logger.error(f"❌ Error in demo: {e}") - import traceback - - traceback.print_exc() - raise - - finally: - # Clean up - logger.info("🧹 Stopping all agents...") - await connections.stop_all() - logger.info("✅ Demo completed and cleaned up") - - -if __name__ == "__main__": - asyncio.run(demo_complete_system()) diff --git a/python/valuecell/examples/core_remote_agent_demo.py b/python/valuecell/examples/core_remote_agent_demo.py deleted file mode 100644 index ed36eee6a..000000000 --- a/python/valuecell/examples/core_remote_agent_demo.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -Example usage of RemoteConnections with remote agents. - -This script demonstrates how to: -1. Load remote agent cards from configuration files -2. Connect to remote agents -3. Get agent information -""" - -import asyncio -import logging - -from valuecell.core.agent.connect import RemoteConnections - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -async def main(): - """Main example function.""" - # Create RemoteConnections instance - connections = RemoteConnections() - - # List all available agents (local + remote) - available_agents = connections.list_available_agents() - logger.info(f"Available agents: {available_agents}") - - # List only remote agents - remote_agents = connections.list_remote_agents() - logger.info(f"Remote agents: {remote_agents}") - - # Get info for each remote agent - for agent_name in remote_agents: - agent_info = connections.get_agent_info(agent_name) - logger.info(f"Agent info for '{agent_name}': {agent_info}") - - # Get raw card data - card_data = connections.get_remote_agent_card(agent_name) - logger.info(f"Card data for '{agent_name}': {card_data}") - - # Try to start/connect to a remote agent (if any) - if remote_agents: - agent_name = remote_agents[0] - logger.info(f"Attempting to start remote agent: {agent_name}") - try: - agent_url = await connections.start_agent(agent_name) - logger.info(f"Successfully started {agent_name} at {agent_url}") - - # Get client connection - client = await connections.get_client(agent_name) - logger.info(f"Got client for {agent_name}: {client}") - - # Check updated agent info - updated_info = connections.get_agent_info(agent_name) - logger.info(f"Updated info for '{agent_name}': {updated_info}") - - async for task, event in await client.send_message( - "analyze apple stock with buffett", - streaming=True, - ): - logger.info(f"Response from {agent_name}:\n{task}\n{event}") - - except Exception as e: - logger.error(f"Failed to start {agent_name}: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/python/valuecell/examples/trading_agents_demo.py b/python/valuecell/examples/trading_agents_demo.py index 3f76c2391..58edc5f4f 100644 --- a/python/valuecell/examples/trading_agents_demo.py +++ b/python/valuecell/examples/trading_agents_demo.py @@ -31,8 +31,8 @@ async def setup(self): """Set up connection""" try: # Connect to remote agent - agent_url = await self.connections.connect_remote_agent(self.agent_name) - logger.info(f"TradingAgents connected successfully: {agent_url}") + # agent_url = await self.connections.connect_remote_agent(self.agent_name) + # logger.info(f"TradingAgents connected successfully: {agent_url}") # Get client connection self.client = await self.connections.get_client(self.agent_name) From 92a17c962702c3ca9bb2257bfdfbb304b5dc70b8 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 15:59:18 +0800 Subject: [PATCH 3/6] refactor: simplify agent connection handling and remove unused tests --- python/valuecell/core/__init__.py | 26 +- python/valuecell/core/agent/__init__.py | 2 - python/valuecell/core/agent/card.py | 65 +++++ python/valuecell/core/agent/client.py | 11 +- python/valuecell/core/agent/connect.py | 269 +++++------------- python/valuecell/core/agent/decorator.py | 135 ++------- .../core/agent/tests/test_connect.py | 222 --------------- 7 files changed, 176 insertions(+), 554 deletions(-) create mode 100644 python/valuecell/core/agent/card.py delete mode 100644 python/valuecell/core/agent/tests/test_connect.py diff --git a/python/valuecell/core/__init__.py b/python/valuecell/core/__init__.py index 7efffec1f..5a2b86241 100644 --- a/python/valuecell/core/__init__.py +++ b/python/valuecell/core/__init__.py @@ -1,38 +1,31 @@ # Session management +from .agent.decorator import create_wrapped_agent +from .agent.responses import notification, streaming from .session import ( + InMemoryMessageStore, InMemorySessionStore, Message, + MessageStore, Role, Session, - SessionStatus, SessionManager, + SessionStatus, SessionStore, - MessageStore, - InMemoryMessageStore, SQLiteMessageStore, ) # Task management -from .task import ( - InMemoryTaskStore, - Task, - TaskManager, - TaskStatus, - TaskStore, -) +from .task import InMemoryTaskStore, Task, TaskManager, TaskStatus, TaskStore # Type system from .types import ( - UserInput, - UserInputMetadata, BaseAgent, - StreamResponse, RemoteAgentResponse, + StreamResponse, + UserInput, + UserInputMetadata, ) -from .agent.decorator import serve, create_wrapped_agent -from .agent.responses import streaming, notification - __all__ = [ # Session exports "Message", @@ -58,7 +51,6 @@ "StreamResponse", "RemoteAgentResponse", # Agent utilities - "serve", "create_wrapped_agent", # Response utilities "streaming", diff --git a/python/valuecell/core/agent/__init__.py b/python/valuecell/core/agent/__init__.py index 2005b5024..f96fcdd03 100644 --- a/python/valuecell/core/agent/__init__.py +++ b/python/valuecell/core/agent/__init__.py @@ -3,13 +3,11 @@ # Core agent functionality from .client import AgentClient from .connect import RemoteConnections -from .decorator import serve from .registry import AgentRegistry __all__ = [ # Core agent exports "AgentClient", "RemoteConnections", - "serve", "AgentRegistry", ] diff --git a/python/valuecell/core/agent/card.py b/python/valuecell/core/agent/card.py new file mode 100644 index 000000000..38cb296c1 --- /dev/null +++ b/python/valuecell/core/agent/card.py @@ -0,0 +1,65 @@ +import json +from pathlib import Path +from typing import Optional + +from a2a.types import AgentCapabilities, AgentCard +from valuecell.utils import get_agent_card_path + + +def parse_local_agent_card_dict(agent_card_dict: dict) -> Optional[AgentCard]: + if not isinstance(agent_card_dict, dict): + return None + if "enabled" in agent_card_dict: + del agent_card_dict["enabled"] + if "description" not in agent_card_dict: + agent_card_dict["description"] = ( + f"No description available for {agent_card_dict.get('name', 'unknown')} agent." + ) + if "capabilities" not in agent_card_dict: + agent_card_dict["capabilities"] = AgentCapabilities( + streaming=True, push_notifications=False + ).model_dump() + + agent_card = AgentCard.model_validate(agent_card_dict) + return agent_card + + +def find_local_agent_card_by_agent_name( + agent_name: str, ignore_disabled: bool = True, base_dir: Optional[str | Path] = None +) -> Optional[AgentCard]: + """ + Reads JSON files from agent_cards directory and returns the first one where name matches. + + Args: + name: The agent name to search for + + Returns: + Dict: The agent configuration dictionary if found, None otherwise + """ + agent_cards_path = Path(base_dir) if base_dir else Path(get_agent_card_path()) + + # Check if the agent_cards directory exists + if not agent_cards_path.exists(): + return None + + # Iterate through all JSON files in the agent_cards directory + for json_file in agent_cards_path.glob("*.json"): + try: + with open(json_file, "r", encoding="utf-8") as f: + agent_card_dict = json.load(f) + + # Check if this agent config has the matching name + if not isinstance(agent_card_dict, dict): + continue + if agent_card_dict.get("name") != agent_name: + continue + if agent_card_dict.get("enabled", True) is False and ignore_disabled: + continue + return parse_local_agent_card_dict(agent_card_dict) + + except (json.JSONDecodeError, IOError): + # Skip files that can't be read or parsed + continue + + # Return None if no matching agent is found + return None diff --git a/python/valuecell/core/agent/client.py b/python/valuecell/core/agent/client.py index 46687d5f2..73def6d00 100644 --- a/python/valuecell/core/agent/client.py +++ b/python/valuecell/core/agent/client.py @@ -12,11 +12,12 @@ class AgentClient: def __init__(self, agent_url: str, push_notification_url: str = None): self.agent_url = agent_url self.push_notification_url = push_notification_url + self.agent_card = None self._client = None self._httpx_client = None self._initialized = False - async def _ensure_initialized(self): + async def ensure_initialized(self): if not self._initialized: await self._setup_client() self._initialized = True @@ -44,8 +45,8 @@ async def _setup_client(self): client_factory = ClientFactory(config) card_resolver = A2ACardResolver(self._httpx_client, self.agent_url) - card = await card_resolver.get_agent_card() - self._client = client_factory.create(card) + self.agent_card = await card_resolver.get_agent_card() + self._client = client_factory.create(self.agent_card) async def send_message( self, @@ -59,7 +60,7 @@ async def send_message( If `streaming` is True, return an async iterator producing (task, event) pairs. If `streaming` is False, return the first (task, event) pair (and close the generator). """ - await self._ensure_initialized() + await self.ensure_initialized() message = Message( role=Role.user, @@ -87,7 +88,7 @@ async def wrapper() -> AsyncIterator[RemoteAgentResponse]: return wrapper() async def get_agent_card(self): - await self._ensure_initialized() + await self.ensure_initialized() card_resolver = A2ACardResolver(self._httpx_client, self.agent_url) return await card_resolver.get_agent_card() diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 6bd1430e2..1bf75e2d2 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -5,10 +5,8 @@ from pathlib import Path from typing import Dict, List, Optional -import httpx -from a2a.client import A2ACardResolver from a2a.types import AgentCard -from valuecell.core.agent import registry +from valuecell.core.agent.card import parse_local_agent_card_dict from valuecell.core.agent.client import AgentClient from valuecell.core.agent.listener import NotificationListener from valuecell.core.types import NotificationCallbackType @@ -19,14 +17,13 @@ @dataclass class AgentContext: - """Unified context for both local and remote agents.""" + """Unified context for remote agents.""" name: str # Connection/runtime state url: Optional[str] = None - agent_card: Optional[AgentCard] = None - instance: Optional[object] = None # when present, treated as local service - server_task: Optional[asyncio.Task] = None + local_agent_card: Optional[AgentCard] = None + # Capability flags derived from card or JSON (fallbacks if no full card) listener_task: Optional[asyncio.Task] = None listener_url: Optional[str] = None client: Optional[AgentClient] = None @@ -37,10 +34,16 @@ class AgentContext: class RemoteConnections: - """Manager for remote Agent connections""" + """Manager for remote Agent connections (client + optional listener only). + + Design: This class no longer starts any local in-process agents or talks to + a registry. It reads AgentCards from local JSON files under + python/configs/agent_cards, creates HTTP clients to the specified URLs, and + optionally starts a notification listener when supported. + """ def __init__(self): - # Unified per-agent contexts + # Unified per-agent contexts (keyed by agent name) self._contexts: Dict[str, AgentContext] = {} # Whether remote contexts (from configs) have been loaded self._remote_contexts_loaded: bool = False @@ -53,43 +56,43 @@ def _get_agent_lock(self, agent_name: str) -> asyncio.Lock: self._agent_locks[agent_name] = asyncio.Lock() return self._agent_locks[agent_name] - def _load_remote_contexts(self, config_dir: str = None) -> None: - """Load remote agent contexts from JSON config files into _contexts.""" - if config_dir is None: + def _load_remote_contexts(self, agent_card_dir: str = None) -> None: + """Load remote agent contexts from JSON config files into _contexts. + + Always uses parse_local_agent_card_dict to parse/normalize the + AgentCard; supports custom directories via base_dir. + """ + if agent_card_dir is None: # Default to python/configs/agent_cards relative to current file - current_file = Path(__file__) - config_dir = ( - current_file.parent.parent.parent.parent / "configs" / "agent_cards" + agent_card_dir = ( + Path(__file__).parent.parent.parent.parent / "configs" / "agent_cards" ) else: - config_dir = Path(config_dir) + agent_card_dir = Path(agent_card_dir) - if not config_dir.exists(): + if not agent_card_dir.exists(): self._remote_contexts_loaded = True + logger.warning( + f"Agent card directory {agent_card_dir} does not exist; no remote agents loaded" + ) return - for json_file in config_dir.glob("*.json"): + for json_file in agent_card_dir.glob("*.json"): try: + # Read name minimally to resolve via helper with open(json_file, "r", encoding="utf-8") as f: - config_data = json.load(f) - - agent_name = config_data.get("name") + agent_card_dict = json.load(f) + agent_name = agent_card_dict.get("name") if not agent_name: continue - - # Validate required fields - required_fields = ["name", "url"] - if not all(field in config_data for field in required_fields): - continue - - # Don't overwrite existing context with a constructed one that has more info - existing = self._contexts.get(agent_name) - if existing and existing.instance: + local_agent_card = parse_local_agent_card_dict(agent_card_dict) + if not local_agent_card or not local_agent_card.url: continue - - url = config_data.get("url") - self._contexts[agent_name] = AgentContext(name=agent_name, url=url) - + self._contexts[agent_name] = AgentContext( + name=agent_name, + url=local_agent_card.url, + local_agent_card=local_agent_card, + ) except (json.JSONDecodeError, FileNotFoundError, KeyError): continue self._remote_contexts_loaded = True @@ -98,15 +101,23 @@ def _ensure_remote_contexts_loaded(self) -> None: if not self._remote_contexts_loaded: self._load_remote_contexts() + # Public helper primarily for tests or tooling to load from a custom dir + def load_from_dir(self, config_dir: str) -> None: + """Load agent contexts from a specific directory of JSON card files.""" + self._load_remote_contexts(config_dir) + async def start_agent( self, agent_name: str, with_listener: bool = True, - listener_port: int = None, + listener_port: int | None = None, listener_host: str = "localhost", notification_callback: NotificationCallbackType = None, - ) -> AgentCard: - """Start an agent, optionally with a notification listener.""" + ) -> Optional[AgentCard]: + """Connect to an agent URL and optionally start a notification listener. + + Returns the AgentCard if available from local configs; otherwise None. + """ # Use agent-specific lock to prevent concurrent starts of the same agent agent_lock = self._get_agent_lock(agent_name) async with agent_lock: @@ -118,58 +129,28 @@ async def start_agent( ctx.desired_listener_port = listener_port ctx.notification_callback = notification_callback - # If already set up, return card - if ctx.agent_card and (ctx.client or ctx.server_task): - return ctx.agent_card + # If already connected, return card (may be None if only URL known) + if ctx.client: + return ctx.client.agent_card - # Ensure AgentCard - await self._ensure_agent_card(ctx) + # Ensure client connection (uses URL from context) + await self._ensure_client(ctx) # Ensure listener if requested and supported if with_listener: await self._ensure_listener(ctx) - # Start local agent service if needed - if ctx.instance: - await self._ensure_local_service(ctx) - - # Ensure client connection - await self._ensure_client(ctx) - - return ctx.agent_card - - async def _ensure_agent_card(self, ctx: AgentContext) -> None: - """Ensure ctx.agent_card is populated.""" - if ctx.agent_card: - return - if ctx.url and not ctx.instance: - if not ctx.url: - raise ValueError(f"Remote agent '{ctx.name}' missing URL") - async with httpx.AsyncClient() as httpx_client: - try: - resolver = A2ACardResolver( - httpx_client=httpx_client, base_url=ctx.url - ) - ctx.agent_card = await resolver.get_agent_card() - logger.info(f"Loaded agent card: {ctx.name}") - except Exception as e: - logger.error(f"Failed to get agent card for {ctx.name}: {e}") - # Proceed without card if remote doesn't expose it - else: - if not ctx.instance: - # Create instance lazily here to source the card - agent_class = registry.get_agent_class_by_name(ctx.name) - if not agent_class: - raise ValueError(f"Agent '{ctx.name}' not found in registry") - ctx.instance = agent_class() - logger.info(f"Created new instance for agent '{ctx.name}'") - ctx.agent_card = ctx.instance.agent_card + return ctx.client.agent_card async def _ensure_listener(self, ctx: AgentContext) -> None: """Ensure listener is running if supported by agent card.""" - if ctx.listener_task or not ctx.agent_card: + if ctx.listener_task: return - if not getattr(ctx.agent_card.capabilities, "push_notifications", False): + if ( + ctx.client + and ctx.client.agent_card + and not ctx.client.agent_card.capabilities.push_notifications + ): return try: listener_task, listener_url = await self._start_listener( @@ -187,15 +168,12 @@ async def _ensure_client(self, ctx: AgentContext) -> None: """Ensure AgentClient is created and connected.""" if ctx.client: return - url = ctx.url or (ctx.agent_card.url if ctx.agent_card else None) + url = ctx.url or (ctx.local_agent_card.url if ctx.local_agent_card else None) if not url: raise ValueError(f"Unable to determine URL for agent '{ctx.name}'") ctx.client = AgentClient(url, push_notification_url=ctx.listener_url) - # Log based on whether it's a local service or a remote URL-only connection - if ctx.instance: - logger.info(f"Started agent '{ctx.name}' at {url}") - else: - logger.info(f"Connected to agent '{ctx.name}' at {url}") + await ctx.client.ensure_initialized() + logger.info(f"Connected to agent '{ctx.name}' at {url}") if ctx.listener_url: logger.info(f" └─ with listener at {ctx.listener_url}") @@ -219,25 +197,11 @@ async def _start_listener( logger.info(f"Started listener at {listener_url}") return listener_task, listener_url - async def _ensure_local_service(self, ctx: AgentContext): - """Start the local agent service if not already running.""" - if ctx.server_task: - return - if not ctx.instance: - agent_class = registry.get_agent_class_by_name(ctx.name) - if not agent_class: - raise ValueError(f"Agent '{ctx.name}' not found in registry") - ctx.instance = agent_class() - logger.info(f"Created new instance for agent '{ctx.name}'") - server_task = asyncio.create_task(ctx.instance.serve()) - ctx.server_task = server_task - await asyncio.sleep(0.5) - async def _get_or_create_context( self, agent_name: str, ) -> AgentContext: - """Get or initialize an AgentContext for local or remote agents.""" + """Get an AgentContext for a known agent (from local configs).""" # Load remote contexts lazily self._ensure_remote_contexts_loaded() @@ -245,18 +209,6 @@ async def _get_or_create_context( if ctx: return ctx - # Try local agent from registry - agent_class = registry.get_agent_class_by_name(agent_name) - if agent_class: - instance = agent_class() - ctx = AgentContext(name=agent_name, instance=instance) - try: - ctx.agent_card = instance.agent_card - except Exception: - pass - self._contexts[agent_name] = ctx - return ctx - # If not local and not preloaded as remote, it's unknown raise ValueError( f"Agent '{agent_name}' not found (neither local nor remote config)" @@ -280,16 +232,8 @@ async def _cleanup_agent(self, agent_name: str): pass ctx.listener_task = None ctx.listener_url = None - # Stop local agent - if ctx.server_task: - ctx.server_task.cancel() - try: - await ctx.server_task - except asyncio.CancelledError: - pass - ctx.server_task = None - # Remove context - del self._contexts[agent_name] + # Keep the context to allow quick reconnection; do not delete metadata + # Removing deletion allows list_available_agents to remain stable async def get_client(self, agent_name: str) -> AgentClient: """Get Agent client connection""" @@ -306,88 +250,29 @@ async def stop_agent(self, agent_name: str): def list_running_agents(self) -> List[str]: """List running agents""" - return [name for name, ctx in self._contexts.items() if ctx.server_task] + return [name for name, ctx in self._contexts.items() if ctx.client] def list_available_agents(self) -> List[str]: - """List all available agents from registry and config cards""" + """List all available agents from local config cards""" # Ensure remote contexts are loaded self._ensure_remote_contexts_loaded() - local_agents = registry.list_agent_names() - remote_agents = [ - name for name, ctx in self._contexts.items() if ctx.url and not ctx.instance - ] - # Deduplicate while preserving order (locals first) - seen = set() - merged = [] - for name in local_agents + remote_agents: - if name not in seen: - seen.add(name) - merged.append(name) - return merged + return list(self._contexts.keys()) async def stop_all(self): - """Stop all running agents""" + """Stop all running clients and listeners""" for agent_name in list(self._contexts.keys()): await self.stop_agent(agent_name) - def get_agent_card( - self, agent_name: str, fetch_if_missing: bool = False - ) -> Optional[AgentCard]: - """Get AgentCard object for any known agent; returns None if not available. - - By default, this does not perform network I/O. If fetch_if_missing is True - and a URL is available for the agent, this will attempt to fetch the card: - - If no event loop is running, it will fetch synchronously (blocking). - - If an event loop is running, it schedules a background fetch and returns - None immediately (the card will be cached when the task completes). - """ + def get_agent_card(self, agent_name: str) -> Optional[AgentCard]: + """Get AgentCard for a known agent from local configs.""" self._ensure_remote_contexts_loaded() ctx = self._contexts.get(agent_name) if not ctx: - # Try to construct a local context from registry for convenience - agent_class = registry.get_agent_class_by_name(agent_name) - if agent_class: - instance = agent_class() - ctx = AgentContext(name=agent_name, instance=instance) - try: - ctx.agent_card = instance.agent_card - except Exception: - pass - self._contexts[agent_name] = ctx - else: - return None - if ctx.agent_card: - return ctx.agent_card - if ctx.instance: - try: - return ctx.instance.agent_card - except Exception: - return None - # URL-only context without cached card - if fetch_if_missing and ctx.url: - - async def _fetch_card(): - async with httpx.AsyncClient() as httpx_client: - try: - resolver = A2ACardResolver( - httpx_client=httpx_client, base_url=ctx.url - ) # type: ignore[arg-type] - card = await resolver.get_agent_card() - ctx.agent_card = card - logger.info(f"Fetched and cached agent card: {ctx.name}") - except Exception as e: - logger.error(f"Failed to fetch agent card for {ctx.name}: {e}") - - try: - loop = asyncio.get_running_loop() - # Schedule background fetch; caller can retry later - loop.create_task(_fetch_card()) - return None - except RuntimeError: - # No running loop: perform fetch synchronously - asyncio.run(_fetch_card()) - return ctx.agent_card - + return None + if ctx.client and ctx.client.agent_card: + return ctx.client.agent_card + if ctx.local_agent_card: + return ctx.local_agent_card return None diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index c3eb91e56..2dba6b445 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -1,7 +1,5 @@ -import json import logging -from pathlib import Path -from typing import Dict, Optional, Type +from typing import Type import httpx import uvicorn @@ -15,18 +13,10 @@ InMemoryTaskStore, TaskUpdater, ) -from a2a.types import ( - AgentCapabilities, - AgentCard, - AgentSkill, - Part, - TaskState, - TextPart, - UnsupportedOperationError, -) +from a2a.types import AgentCard, Part, TaskState, TextPart, UnsupportedOperationError from a2a.utils import new_agent_text_message, new_task from a2a.utils.errors import ServerError -from valuecell.core.agent import registry +from valuecell.core.agent.card import find_local_agent_card_by_agent_name from valuecell.core.types import ( BaseAgent, NotifyResponse, @@ -34,40 +24,13 @@ StreamResponse, StreamResponseEvent, ) -from valuecell.utils import ( - get_agent_card_path, - get_next_available_port, - parse_host_port, -) +from valuecell.utils import parse_host_port logger = logging.getLogger(__name__) -def serve( - host: str = "localhost", - port: int = None, - streaming: bool = True, - push_notifications: bool = False, - description: str = None, - version: str = "1.0.0", - skills: list[AgentSkill | dict] = None, - **extra_kwargs, -): +def _serve(agent_card: AgentCard): def decorator(cls: Type) -> Type: - if extra_kwargs: - logger.warning( - f"Extra kwargs {extra_kwargs} are not used in the @serve decorator" - ) - - # Build agent card (port will be assigned when server starts) - agent_skills = [] - if skills: - for skill in skills: - if isinstance(skill, dict): - agent_skills.append(AgentSkill(**skill)) - elif isinstance(skill, AgentSkill): - agent_skills.append(skill) - # Determine the agent name consistently agent_name = cls.__name__ @@ -76,29 +39,13 @@ class DecoratedAgent(cls): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # Assign port when instance is created - actual_port = port or get_next_available_port() - # Create agent card with actual port - self.agent_card = AgentCard( - name=agent_name, - description=description - or f"No description available for {agent_name}", - url=f"http://{host}:{actual_port}/", - version=version, - default_input_modes=["text"], - default_output_modes=["text"], - capabilities=AgentCapabilities( - streaming=streaming, push_notifications=push_notifications - ), - skills=agent_skills, - supports_authenticated_extended_card=False, - ) + self.agent_card = agent_card - self._host = host - self._port = actual_port + self._host, self._port = parse_host_port( + agent_card.url, default_scheme="http" + ) self._executor = None - self._server_task = None async def serve(self): # Create AgentExecutor wrapper @@ -138,13 +85,13 @@ async def serve(self): DecoratedAgent.__qualname__ = cls.__qualname__ # Register to registry - try: - registry.register(DecoratedAgent, agent_name) - except ImportError: - # Registry not available, skip registration - logger.warning( - f"Agent registry not available, skipping registration for {DecoratedAgent.__name__}" - ) + # try: + # registry.register(DecoratedAgent, agent_name) + # except ImportError: + # # Registry not available, skip registration + # logger.warning( + # f"Agent registry not available, skipping registration for {DecoratedAgent.__name__}" + # ) return DecoratedAgent @@ -278,56 +225,12 @@ def _create_agent_executor(agent_instance): return GenericAgentExecutor(agent_instance) -def _get_serve_params_by_agent_name(name: str) -> Optional[Dict]: - """ - Reads JSON files from agent_cards directory and returns the first one where name matches. - - Args: - name: The agent name to search for - - Returns: - Dict: The agent configuration dictionary if found, None otherwise - """ - agent_cards_path = Path(get_agent_card_path()) - - # Check if the agent_cards directory exists - if not agent_cards_path.exists(): - return None - - # Iterate through all JSON files in the agent_cards directory - for json_file in agent_cards_path.glob("*.json"): - try: - with open(json_file, "r", encoding="utf-8") as f: - agent_config = json.load(f) - - # Check if this agent config has the matching name - if not isinstance(agent_config, dict): - continue - if agent_config.get("name") != name: - continue - if "url" in agent_config and agent_config["url"]: - host, port = parse_host_port( - agent_config.get("url"), default_scheme="http" - ) - agent_config["host"] = host - agent_config["port"] = port - - return agent_config - - except (json.JSONDecodeError, IOError): - # Skip files that can't be read or parsed - continue - - # Return None if no matching agent is found - return None - - def create_wrapped_agent(agent_class: Type[BaseAgent]): # Get agent configuration from agent cards - agent_config = _get_serve_params_by_agent_name(agent_class.__name__) - if not agent_config: + agent_card = find_local_agent_card_by_agent_name(agent_class.__name__) + if not agent_card: raise ValueError( f"No agent configuration found for {agent_class.__name__} in agent cards" ) - return serve(**agent_config)(agent_class)() + return _serve(agent_card)(agent_class)() diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py deleted file mode 100644 index 7b4c0c299..000000000 --- a/python/valuecell/core/agent/tests/test_connect.py +++ /dev/null @@ -1,222 +0,0 @@ -""" -Updated tests for RemoteConnections after simplifying local/remote handling. -""" - -import asyncio -import json -import tempfile -from pathlib import Path -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest -from valuecell.core.agent.connect import RemoteConnections, AgentContext - - -class TestRemoteConnectionsUnified: - def setup_method(self): - self.instance = RemoteConnections() - - def test_init_creates_minimal_attributes(self): - inst = RemoteConnections() - assert isinstance(inst._contexts, dict) - assert isinstance(inst._agent_locks, dict) - assert inst._remote_contexts_loaded is False - assert len(inst._contexts) == 0 - - def test_load_remote_contexts_validation(self): - with tempfile.TemporaryDirectory() as temp_dir: - # invalid JSON - (Path(temp_dir) / "invalid.json").write_text("{ invalid") - - # missing name - (Path(temp_dir) / "no_name.json").write_text( - json.dumps({"url": "http://localhost:8000"}) - ) - - # missing url - (Path(temp_dir) / "no_url.json").write_text( - json.dumps({"name": "agent_without_url"}) - ) - - # valid - (Path(temp_dir) / "ok.json").write_text( - json.dumps({"name": "remote_ok", "url": "http://localhost:9000"}) - ) - - self.instance._load_remote_contexts(temp_dir) - assert "remote_ok" in self.instance._contexts - ctx = self.instance._contexts["remote_ok"] - assert ctx.url == "http://localhost:9000" - assert ctx.instance is None - - @pytest.mark.asyncio - async def test_start_agent_local_not_found(self): - with patch( - "valuecell.core.agent.registry.get_agent_class_by_name", return_value=None - ): - with pytest.raises(ValueError, match="not found"): - await self.instance.start_agent("nonexistent") - - @pytest.mark.asyncio - async def test_start_agent_local_with_listener(self): - # Mock local agent class - mock_instance = MagicMock() - mock_card = MagicMock() - mock_card.url = "http://localhost:8100" - mock_card.capabilities.push_notifications = True - mock_instance.agent_card = mock_card - - with patch( - "valuecell.core.agent.registry.get_agent_class_by_name", - return_value=lambda: mock_instance, - ): - with patch.object( - self.instance, - "_start_listener", - return_value=( - MagicMock(), - "http://localhost:5555/notify", - ), - ) as mock_listener: - with patch("asyncio.create_task") as mock_task: - with patch("asyncio.sleep", new=AsyncMock()): - with patch( - "valuecell.core.agent.connect.AgentClient" - ) as mock_client: - result = await self.instance.start_agent( - "local_agent", with_listener=True, listener_port=5555 - ) - - assert result is mock_card - mock_listener.assert_called_once() - mock_task.assert_called() # serve() - mock_client.assert_called_once() - - @pytest.mark.asyncio - async def test_start_agent_url_only_success(self): - # Preload a URL-only context - self.instance._contexts["remote_agent"] = AgentContext( - name="remote_agent", url="http://localhost:9001" - ) - - fake_card = MagicMock() - fake_card.capabilities.push_notifications = False - - class FakeResolver: - async def get_agent_card(self): - return fake_card - - with patch("httpx.AsyncClient"): - with patch( - "valuecell.core.agent.connect.A2ACardResolver", - return_value=FakeResolver(), - ): - with patch("valuecell.core.agent.connect.AgentClient") as mock_client: - result = await self.instance.start_agent("remote_agent") - assert result is fake_card - mock_client.assert_called_once_with( - "http://localhost:9001", push_notification_url=None - ) - - def test_get_agent_card_fetch_sync(self): - # Create url-only context; call from sync test to trigger blocking fetch - ctx = MagicMock() - ctx.name = "url_only" - ctx.url = "http://localhost:9100" - ctx.instance = None - ctx.agent_card = None - self.instance._contexts["url_only"] = ctx - - fake_card = MagicMock() - - class FakeResolver: - async def get_agent_card(self): - return fake_card - - with patch("httpx.AsyncClient"): - with patch( - "valuecell.core.agent.connect.A2ACardResolver", - return_value=FakeResolver(), - ): - card = self.instance.get_agent_card("url_only", fetch_if_missing=True) - assert card is fake_card - assert ctx.agent_card is fake_card - - @pytest.mark.asyncio - async def test_cleanup_agent(self): - # Prepare context with async resources - mock_client = AsyncMock() - listener_task = asyncio.create_task(asyncio.sleep(0)) - server_task = asyncio.create_task(asyncio.sleep(0)) - # cancel promptly - listener_task.cancel() - server_task.cancel() - - # Build a minimal context-like object - class Ctx: - pass - - ctx = Ctx() - ctx.name = "cleanme" - ctx.client = mock_client - ctx.listener_task = listener_task - ctx.listener_url = "http://localhost:5555/notify" - ctx.server_task = server_task - ctx.instance = None - ctx.url = "http://localhost:9999" - ctx.agent_card = None - - self.instance._contexts["cleanme"] = ctx - - await self.instance._cleanup_agent("cleanme") - mock_client.close.assert_called_once() - assert "cleanme" not in self.instance._contexts - - @pytest.mark.asyncio - async def test_get_client_starts_when_missing(self): - with patch.object(self.instance, "start_agent") as mock_start: - - async def side_effect(name): - # inject a context with client - class Ctx: - pass - - c = Ctx() - c.name = name - c.client = MagicMock() - c.instance = None - c.url = "http://localhost:9200" - c.agent_card = None - c.server_task = None - c.listener_task = None - c.listener_url = None - c.desired_listener_host = None - c.desired_listener_port = None - c.notification_callback = None - self.instance._contexts[name] = c - return MagicMock() - - mock_start.side_effect = side_effect - client = await self.instance.get_client("need_client") - assert client is self.instance._contexts["need_client"].client - - def test_list_available_only(self): - # simulate loaded contexts - class Ctx: - pass - - local_ctx = Ctx() - local_ctx.instance = MagicMock() - local_ctx.url = None - remote_ctx = Ctx() - remote_ctx.instance = None - remote_ctx.url = "http://x" - self.instance._contexts["local_a"] = local_ctx - self.instance._contexts["url_b"] = remote_ctx - with patch( - "valuecell.core.agent.registry.list_agent_names", - return_value=["local_a", "other_local"], - ): - names = self.instance.list_available_agents() - assert "local_a" in names - assert "url_b" in names From 30ca3de65b8298fa197b535b97c0b0ed4417d447 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 16:05:44 +0800 Subject: [PATCH 4/6] fix: improve error logging when loading agent cards --- python/valuecell/core/agent/connect.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 1bf75e2d2..77cb56e3b 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -93,7 +93,10 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: url=local_agent_card.url, local_agent_card=local_agent_card, ) - except (json.JSONDecodeError, FileNotFoundError, KeyError): + except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: + logger.warning( + f"Failed to load agent card from {json_file}; skipping: {e}" + ) continue self._remote_contexts_loaded = True From 7c9da1b228638fd5860e2254e504fe8b4ec16338 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 16:05:49 +0800 Subject: [PATCH 5/6] test: add unit tests for RemoteConnections functionality --- .../core/agent/tests/test_connect.py | 326 ++++++++++++++++++ 1 file changed, 326 insertions(+) create mode 100644 python/valuecell/core/agent/tests/test_connect.py diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py new file mode 100644 index 000000000..be1c2e19a --- /dev/null +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -0,0 +1,326 @@ +""" +Unit tests for valuecell.core.agent.connect.RemoteConnections + +Notes: +- We avoid real network and uvicorn by monkeypatching AgentClient and NotificationListener. +- AgentCard JSONs are generated with all required fields to satisfy a2a.types.AgentCard. +""" + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass +from pathlib import Path +from typing import ClassVar, Dict, Optional + +import pytest + +from a2a.client.client_factory import minimal_agent_card +from a2a.types import AgentCard + +from valuecell.core.agent.connect import RemoteConnections + + +# ---------------------------- +# Test helpers and fakes +# ---------------------------- + + +def make_card_dict(name: str, url: str, push_notifications: bool) -> dict: + """Create a complete AgentCard dict with all required fields filled. + + We base this on a2a.client.client_factory.minimal_agent_card to ensure all + required properties exist, then override name/url and capabilities. + """ + base = minimal_agent_card(url) + card_dict = base.model_dump() + card_dict.update( + { + "name": name, + "url": url, + # Provide a description to be explicit (parse_local_agent_card_dict can also fill it) + "description": f"Test card for {name}", + # Capabilities must include push_notifications per our tests + "capabilities": { + "streaming": True, + "push_notifications": push_notifications, + }, + } + ) + return card_dict + + +@dataclass +class FakeAgentClient: + """A lightweight stand-in for AgentClient to avoid real HTTP calls. + + Behavior: + - ensure_initialized() sets agent_card from the url->AgentCard mapping. + - close() marks the instance as closed. + """ + + # Class-level registry populated by tests: url -> AgentCard + cards_by_url: ClassVar[Dict[str, AgentCard]] = {} + # Class-level counter to validate single instantiation in concurrency tests + create_count: ClassVar[int] = 0 + + agent_url: str + push_notification_url: Optional[str] = None + + def __init__(self, agent_url: str, push_notification_url: str | None = None): + type(self).create_count += 1 + self.agent_url = agent_url + self.push_notification_url = push_notification_url + self.agent_card: Optional[AgentCard] = None + self._closed = False + + async def ensure_initialized(self): + # Simulate I/O a little + await asyncio.sleep(0) + # Map back to the card provided in the JSON for this URL + card = type(self).cards_by_url.get(self.agent_url) + if card is None: + # As a fallback, generate a minimal card to keep tests robust + card = minimal_agent_card(self.agent_url) + self.agent_card = card + + async def close(self): + self._closed = True + + +class DummyNotificationListener: + """Dummy listener that doesn't bind a real port.""" + + def __init__( + self, host: str = "localhost", port: int = 0, notification_callback=None + ): + self.host = host + self.port = port + self.notification_callback = notification_callback + + async def start_async(self): + # Simulate server startup without actually starting uvicorn + await asyncio.sleep(0.01) + + +# ---------------------------- +# Tests +# ---------------------------- + + +@pytest.mark.asyncio +async def test_load_from_dir_and_list(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Prepare two agent cards + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + cards = [ + make_card_dict("AgentAlpha", "http://127.0.0.1:8101", push_notifications=False), + make_card_dict("AgentBeta", "http://127.0.0.1:8102", push_notifications=True), + ] + for c in cards: + with open(dir_path / f"{c['name']}.json", "w", encoding="utf-8") as f: + json.dump(c, f) + + # Wire FakeAgentClient and DummyNotificationListener + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + + # Also prime the client mapping + FakeAgentClient.cards_by_url = { + c["url"]: AgentCard.model_validate(c) for c in cards + } + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + all_agents = rc.list_available_agents() + assert set(all_agents) == {"AgentAlpha", "AgentBeta"} + + +@pytest.mark.asyncio +async def test_start_agent_without_listener( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + # Create a card that does not support push notifications + card = make_card_dict( + "NoPushAgent", "http://127.0.0.1:8201", push_notifications=False + ) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + returned_card = await rc.start_agent("NoPushAgent", with_listener=False) + assert isinstance(returned_card, AgentCard) + assert returned_card.name == "NoPushAgent" + + # Validate client exists and was created exactly once + client = await rc.get_client("NoPushAgent") + assert isinstance(client, FakeAgentClient) + assert client.push_notification_url is None # listener not requested + assert FakeAgentClient.create_count == 1 + + +@pytest.mark.asyncio +async def test_start_agent_with_listener_when_supported( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + # Card supports push notifications + card = make_card_dict("PushAgent", "http://127.0.0.1:8301", push_notifications=True) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + returned_card = await rc.start_agent( + "PushAgent", with_listener=True, listener_host="127.0.0.1" + ) + assert isinstance(returned_card, AgentCard) + assert returned_card.name == "PushAgent" + + # Ensure listener started and URL recorded + ctx = rc._contexts["PushAgent"] + assert ctx.listener_url is not None + assert ctx.listener_url.startswith("http://127.0.0.1:") + + # Current implementation creates client before listener, so push_notification_url stays None + client = await rc.get_client("PushAgent") + assert isinstance(client, FakeAgentClient) + assert client.push_notification_url is None + + +@pytest.mark.asyncio +async def test_start_agent_with_listener_but_not_supported( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + # Card does NOT support push notifications; listener should not be started + card = make_card_dict("NoPush", "http://127.0.0.1:8401", push_notifications=False) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + await rc.start_agent("NoPush", with_listener=True) + client = await rc.get_client("NoPush") + assert isinstance(client, FakeAgentClient) + # Since capabilities.push_notifications=False, listener shouldn't be used + assert client.push_notification_url is None + + +@pytest.mark.asyncio +async def test_concurrent_start_calls_single_initialization( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + card = make_card_dict( + "ConcurrentAgent", "http://127.0.0.1:8501", push_notifications=True + ) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + # Launch multiple concurrent start calls for the same agent + await asyncio.gather( + rc.start_agent("ConcurrentAgent", with_listener=True), + rc.start_agent("ConcurrentAgent", with_listener=True), + rc.start_agent("ConcurrentAgent", with_listener=True), + ) + + # Only one client should have been constructed + assert FakeAgentClient.create_count == 1 + + +@pytest.mark.asyncio +async def test_stop_agent_and_stop_all(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + card1 = make_card_dict("A1", "http://127.0.0.1:8601", push_notifications=True) + card2 = make_card_dict("A2", "http://127.0.0.1:8602", push_notifications=False) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + for c in (card1, card2): + with open(dir_path / f"{c['name']}.json", "w", encoding="utf-8") as f: + json.dump(c, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = { + card1["url"]: AgentCard.model_validate(card1), + card2["url"]: AgentCard.model_validate(card2), + } + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + await rc.start_agent("A1", with_listener=True) + await rc.start_agent("A2", with_listener=False) + assert set(rc.list_running_agents()) == {"A1", "A2"} + + # Stop a single agent + await rc.stop_agent("A1") + assert rc.list_running_agents() == ["A2"] + + # Stop all + await rc.stop_all() + assert rc.list_running_agents() == [] + + +@pytest.mark.asyncio +async def test_unknown_agent_raises(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Empty directory (no cards) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + with pytest.raises(ValueError): + await rc.start_agent("NotExist") From 67e2a2ec7c863e823d87718f0de5feb65c9705c1 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 11:36:30 +0800 Subject: [PATCH 6/6] refactor: remove remote agent config loading from initialization --- python/valuecell/core/agent/connect.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 71e4d00f7..77cb56e3b 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -49,8 +49,6 @@ def __init__(self): self._remote_contexts_loaded: bool = False # Per-agent locks for concurrent start_agent calls self._agent_locks: Dict[str, asyncio.Lock] = {} - # Load remote agent configs on initialization - self._load_remote_agent_configs() def _get_agent_lock(self, agent_name: str) -> asyncio.Lock: """Get or create a lock for a specific agent (thread-safe)"""