diff --git a/python/valuecell/core/__init__.py b/python/valuecell/core/__init__.py index 7efffec1f..5a2b86241 100644 --- a/python/valuecell/core/__init__.py +++ b/python/valuecell/core/__init__.py @@ -1,38 +1,31 @@ # Session management +from .agent.decorator import create_wrapped_agent +from .agent.responses import notification, streaming from .session import ( + InMemoryMessageStore, InMemorySessionStore, Message, + MessageStore, Role, Session, - SessionStatus, SessionManager, + SessionStatus, SessionStore, - MessageStore, - InMemoryMessageStore, SQLiteMessageStore, ) # Task management -from .task import ( - InMemoryTaskStore, - Task, - TaskManager, - TaskStatus, - TaskStore, -) +from .task import InMemoryTaskStore, Task, TaskManager, TaskStatus, TaskStore # Type system from .types import ( - UserInput, - UserInputMetadata, BaseAgent, - StreamResponse, RemoteAgentResponse, + StreamResponse, + UserInput, + UserInputMetadata, ) -from .agent.decorator import serve, create_wrapped_agent -from .agent.responses import streaming, notification - __all__ = [ # Session exports "Message", @@ -58,7 +51,6 @@ "StreamResponse", "RemoteAgentResponse", # Agent utilities - "serve", "create_wrapped_agent", # Response utilities "streaming", diff --git a/python/valuecell/core/agent/README.md b/python/valuecell/core/agent/README.md index b676d1306..c9d1d17cd 100644 --- a/python/valuecell/core/agent/README.md +++ b/python/valuecell/core/agent/README.md @@ -122,17 +122,17 @@ await connections.stop_agent("AgentName") await connections.stop_all() ``` -#### Remote Agent Support +#### Remote and Local Are Unified ```python -# List remote Agents -remote_agents = connections.list_remote_agents() - -# Get remote Agent configuration -card_data = connections.get_remote_agent_card("RemoteAgentName") +# All available agents (local + configured URL-only) +available_agents = connections.list_available_agents() -# Get Agent information +# Get Agent information (if implemented) agent_info = connections.get_agent_info("AgentName") + +# Get Agent card (returns None if unavailable; set fetch_if_missing=True to fetch remotely) +card = connections.get_agent_card("AgentName", fetch_if_missing=False) ``` ### 3. AgentClient Class @@ -250,13 +250,9 @@ async def remote_demo(): available_agents = connections.list_available_agents() logger.info(f"Available Agents: {available_agents}") - # List remote Agents - remote_agents = connections.list_remote_agents() - logger.info(f"Remote Agents: {remote_agents}") - - # Connect to remote Agent (if any) - if remote_agents: - agent_name = remote_agents[0] + # Connect to any available Agent (including URL-only) + if available_agents: + agent_name = available_agents[0] try: agent_url = await connections.start_agent(agent_name) logger.info(f"Successfully connected to remote Agent {agent_name}: {agent_url}") @@ -272,7 +268,7 @@ async def remote_demo(): logger.info(f"Remote response: {response}") except Exception as e: - logger.error(f"Failed to connect to remote Agent {agent_name}: {e}") + logger.error(f"Failed to connect to Agent {agent_name}: {e}") if __name__ == "__main__": asyncio.run(remote_demo()) diff --git a/python/valuecell/core/agent/__init__.py b/python/valuecell/core/agent/__init__.py index 2005b5024..f96fcdd03 100644 --- a/python/valuecell/core/agent/__init__.py +++ b/python/valuecell/core/agent/__init__.py @@ -3,13 +3,11 @@ # Core agent functionality from .client import AgentClient from .connect import RemoteConnections -from .decorator import serve from .registry import AgentRegistry __all__ = [ # Core agent exports "AgentClient", "RemoteConnections", - "serve", "AgentRegistry", ] diff --git a/python/valuecell/core/agent/card.py b/python/valuecell/core/agent/card.py new file mode 100644 index 000000000..38cb296c1 --- /dev/null +++ b/python/valuecell/core/agent/card.py @@ -0,0 +1,65 @@ +import json +from pathlib import Path +from typing import Optional + +from a2a.types import AgentCapabilities, AgentCard +from valuecell.utils import get_agent_card_path + + +def parse_local_agent_card_dict(agent_card_dict: dict) -> Optional[AgentCard]: + if not isinstance(agent_card_dict, dict): + return None + if "enabled" in agent_card_dict: + del agent_card_dict["enabled"] + if "description" not in agent_card_dict: + agent_card_dict["description"] = ( + f"No description available for {agent_card_dict.get('name', 'unknown')} agent." + ) + if "capabilities" not in agent_card_dict: + agent_card_dict["capabilities"] = AgentCapabilities( + streaming=True, push_notifications=False + ).model_dump() + + agent_card = AgentCard.model_validate(agent_card_dict) + return agent_card + + +def find_local_agent_card_by_agent_name( + agent_name: str, ignore_disabled: bool = True, base_dir: Optional[str | Path] = None +) -> Optional[AgentCard]: + """ + Reads JSON files from agent_cards directory and returns the first one where name matches. + + Args: + name: The agent name to search for + + Returns: + Dict: The agent configuration dictionary if found, None otherwise + """ + agent_cards_path = Path(base_dir) if base_dir else Path(get_agent_card_path()) + + # Check if the agent_cards directory exists + if not agent_cards_path.exists(): + return None + + # Iterate through all JSON files in the agent_cards directory + for json_file in agent_cards_path.glob("*.json"): + try: + with open(json_file, "r", encoding="utf-8") as f: + agent_card_dict = json.load(f) + + # Check if this agent config has the matching name + if not isinstance(agent_card_dict, dict): + continue + if agent_card_dict.get("name") != agent_name: + continue + if agent_card_dict.get("enabled", True) is False and ignore_disabled: + continue + return parse_local_agent_card_dict(agent_card_dict) + + except (json.JSONDecodeError, IOError): + # Skip files that can't be read or parsed + continue + + # Return None if no matching agent is found + return None diff --git a/python/valuecell/core/agent/client.py b/python/valuecell/core/agent/client.py index 46687d5f2..73def6d00 100644 --- a/python/valuecell/core/agent/client.py +++ b/python/valuecell/core/agent/client.py @@ -12,11 +12,12 @@ class AgentClient: def __init__(self, agent_url: str, push_notification_url: str = None): self.agent_url = agent_url self.push_notification_url = push_notification_url + self.agent_card = None self._client = None self._httpx_client = None self._initialized = False - async def _ensure_initialized(self): + async def ensure_initialized(self): if not self._initialized: await self._setup_client() self._initialized = True @@ -44,8 +45,8 @@ async def _setup_client(self): client_factory = ClientFactory(config) card_resolver = A2ACardResolver(self._httpx_client, self.agent_url) - card = await card_resolver.get_agent_card() - self._client = client_factory.create(card) + self.agent_card = await card_resolver.get_agent_card() + self._client = client_factory.create(self.agent_card) async def send_message( self, @@ -59,7 +60,7 @@ async def send_message( If `streaming` is True, return an async iterator producing (task, event) pairs. If `streaming` is False, return the first (task, event) pair (and close the generator). """ - await self._ensure_initialized() + await self.ensure_initialized() message = Message( role=Role.user, @@ -87,7 +88,7 @@ async def wrapper() -> AsyncIterator[RemoteAgentResponse]: return wrapper() async def get_agent_card(self): - await self._ensure_initialized() + await self.ensure_initialized() card_resolver = A2ACardResolver(self._httpx_client, self.agent_url) return await card_resolver.get_agent_card() diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 3882dadcf..77cb56e3b 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -1,38 +1,54 @@ import asyncio import json import logging +from dataclasses import dataclass from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional -import httpx -from a2a.client import A2ACardResolver from a2a.types import AgentCard -from valuecell.core.agent import registry +from valuecell.core.agent.card import parse_local_agent_card_dict from valuecell.core.agent.client import AgentClient from valuecell.core.agent.listener import NotificationListener from valuecell.core.types import NotificationCallbackType -from valuecell.utils import get_agent_card_path, get_next_available_port +from valuecell.utils import get_next_available_port logger = logging.getLogger(__name__) +@dataclass +class AgentContext: + """Unified context for remote agents.""" + + name: str + # Connection/runtime state + url: Optional[str] = None + local_agent_card: Optional[AgentCard] = None + # Capability flags derived from card or JSON (fallbacks if no full card) + listener_task: Optional[asyncio.Task] = None + listener_url: Optional[str] = None + client: Optional[AgentClient] = None + # Listener preferences + desired_listener_host: Optional[str] = None + desired_listener_port: Optional[int] = None + notification_callback: Optional[NotificationCallbackType] = None + + class RemoteConnections: - """Manager for remote Agent connections""" + """Manager for remote Agent connections (client + optional listener only). + + Design: This class no longer starts any local in-process agents or talks to + a registry. It reads AgentCards from local JSON files under + python/configs/agent_cards, creates HTTP clients to the specified URLs, and + optionally starts a notification listener when supported. + """ def __init__(self): - self._connections: Dict[str, AgentClient] = {} - self._running_agents: Dict[str, asyncio.Task] = {} - self._agent_instances: Dict[str, object] = {} - self._listeners: Dict[str, asyncio.Task] = {} - self._listener_urls: Dict[str, str] = {} - # Remote agent cards loaded from config files - self._remote_agent_cards: Dict[str, AgentCard] = {} - # Remote agent configs (JSON data from config files) - self._remote_agent_configs: Dict[str, dict] = {} + # Unified per-agent contexts (keyed by agent name) + self._contexts: Dict[str, AgentContext] = {} + # Whether remote contexts (from configs) have been loaded + self._remote_contexts_loaded: bool = False # Per-agent locks for concurrent start_agent calls self._agent_locks: Dict[str, asyncio.Lock] = {} - # Load remote agent configs on initialization - self._load_remote_agent_configs() def _get_agent_lock(self, agent_name: str) -> asyncio.Lock: """Get or create a lock for a specific agent (thread-safe)""" @@ -40,370 +56,195 @@ def _get_agent_lock(self, agent_name: str) -> asyncio.Lock: self._agent_locks[agent_name] = asyncio.Lock() return self._agent_locks[agent_name] - def _load_remote_agent_configs(self, config_dir: str = None) -> None: - """Load remote agent configs from JSON files (sync operation).""" - if config_dir is None: + def _load_remote_contexts(self, agent_card_dir: str = None) -> None: + """Load remote agent contexts from JSON config files into _contexts. + + Always uses parse_local_agent_card_dict to parse/normalize the + AgentCard; supports custom directories via base_dir. + """ + if agent_card_dir is None: # Default to python/configs/agent_cards relative to current file - current_file = Path(__file__) - config_dir = ( - current_file.parent.parent.parent.parent / "configs" / "agent_cards" + agent_card_dir = ( + Path(__file__).parent.parent.parent.parent / "configs" / "agent_cards" ) else: - config_dir = Path(config_dir) + agent_card_dir = Path(agent_card_dir) - if not config_dir.exists(): + if not agent_card_dir.exists(): + self._remote_contexts_loaded = True + logger.warning( + f"Agent card directory {agent_card_dir} does not exist; no remote agents loaded" + ) return - for json_file in config_dir.glob("*.json"): + for json_file in agent_card_dir.glob("*.json"): try: + # Read name minimally to resolve via helper with open(json_file, "r", encoding="utf-8") as f: - config_data = json.load(f) - - agent_name = config_data.get("name") + agent_card_dict = json.load(f) + agent_name = agent_card_dict.get("name") if not agent_name: continue - - # Validate required fields - required_fields = ["name", "url"] - if not all(field in config_data for field in required_fields): + local_agent_card = parse_local_agent_card_dict(agent_card_dict) + if not local_agent_card or not local_agent_card.url: continue - - self._remote_agent_configs[agent_name] = config_data - - except (json.JSONDecodeError, FileNotFoundError, KeyError): + self._contexts[agent_name] = AgentContext( + name=agent_name, + url=local_agent_card.url, + local_agent_card=local_agent_card, + ) + except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: + logger.warning( + f"Failed to load agent card from {json_file}; skipping: {e}" + ) continue + self._remote_contexts_loaded = True - async def load_remote_agents(self, config_dir: str = None) -> None: - """Load remote agent cards from configuration directory.""" - if config_dir is None: - config_dir = get_agent_card_path() - else: - config_dir = Path(config_dir) - - if not config_dir.exists(): - logger.warning(f"Remote agent config directory not found: {config_dir}") - return + def _ensure_remote_contexts_loaded(self) -> None: + if not self._remote_contexts_loaded: + self._load_remote_contexts() - async with httpx.AsyncClient() as httpx_client: - loaded_count = 0 - for json_file in config_dir.glob("*.json"): - try: - with open(json_file, "r", encoding="utf-8") as f: - card_data = json.load(f) - - agent_name = card_data.get("name") - if not agent_name: - logger.warning(f"No 'name' field in {json_file}, skipping") - continue - - # Validate required fields - required_fields = ["name", "url"] - if not all(field in card_data for field in required_fields): - logger.warning( - f"Missing required fields in {json_file}, skipping" - ) - continue - - resolver = A2ACardResolver( - httpx_client=httpx_client, base_url=card_data["url"] - ) - self._remote_agent_cards[ - agent_name - ] = await resolver.get_agent_card() - loaded_count += 1 - logger.info( - f"Loaded remote agent card: {agent_name} from {json_file.name}" - ) - - except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: - logger.error( - f"Failed to load remote agent card from {json_file}: {e}" - ) - - logger.info(f"Loaded {loaded_count} remote agent cards from {config_dir}") - - async def connect_remote_agent(self, agent_name: str) -> str: - """Connect to a remote agent (no lifecycle management).""" - if agent_name not in self._remote_agent_configs: - # Auto-load configs if not found - self._load_remote_agent_configs() - - if agent_name not in self._remote_agent_configs: - raise ValueError(f"Remote agent '{agent_name}' not found in loaded cards") - - config_data = self._remote_agent_configs[agent_name] - agent_url = config_data["url"] - - # Create client connection for remote agent - self._connections[agent_name] = AgentClient(agent_url) - - logger.info(f"Connected to remote agent '{agent_name}' at {agent_url}") - return agent_url + # Public helper primarily for tests or tooling to load from a custom dir + def load_from_dir(self, config_dir: str) -> None: + """Load agent contexts from a specific directory of JSON card files.""" + self._load_remote_contexts(config_dir) async def start_agent( self, agent_name: str, with_listener: bool = True, - listener_port: int = None, + listener_port: int | None = None, listener_host: str = "localhost", notification_callback: NotificationCallbackType = None, - ) -> AgentCard: - """Start an agent, optionally with a notification listener.""" + ) -> Optional[AgentCard]: + """Connect to an agent URL and optionally start a notification listener. + + Returns the AgentCard if available from local configs; otherwise None. + """ # Use agent-specific lock to prevent concurrent starts of the same agent agent_lock = self._get_agent_lock(agent_name) async with agent_lock: - # Check if agent is already running - if agent_name in self._running_agents or agent_name in self._connections: - logger.info( - f"Agent '{agent_name}' is already running, returning existing instance" - ) - # Return existing agent card - if agent_name in self._agent_instances: - return self._agent_instances[agent_name].agent_card - elif agent_name in self._remote_agent_cards: - return self._remote_agent_cards[agent_name] - else: - # Fallback: reload agent card - logger.warning( - f"Agent '{agent_name}' running but no cached card, reloading..." - ) - - # Check if it's a remote agent first - if agent_name in self._remote_agent_configs: - return await self._handle_remote_agent( - agent_name, - with_listener=with_listener, - listener_host=listener_host, - listener_port=listener_port, - notification_callback=notification_callback, - ) + ctx = await self._get_or_create_context(agent_name) - # Handle local agent - agent_class = registry.get_agent_class_by_name(agent_name) - if not agent_class: - raise ValueError(f"Agent '{agent_name}' not found in registry") + # Record listener preferences on the context + if with_listener: + ctx.desired_listener_host = listener_host + ctx.desired_listener_port = listener_port + ctx.notification_callback = notification_callback - # Create Agent instance only if not already exists - if agent_name not in self._agent_instances: - agent_instance = agent_class() - self._agent_instances[agent_name] = agent_instance - logger.info(f"Created new instance for agent '{agent_name}'") - else: - agent_instance = self._agent_instances[agent_name] - logger.info(f"Reusing existing instance for agent '{agent_name}'") + # If already connected, return card (may be None if only URL known) + if ctx.client: + return ctx.client.agent_card - agent_card = agent_instance.agent_card + # Ensure client connection (uses URL from context) + await self._ensure_client(ctx) - # Setup listener if needed - try: - listener_url = await self._setup_listener_if_needed( - agent_name, - agent_card, - with_listener, - listener_host, - listener_port, - notification_callback, - ) - except Exception: - await self._cleanup_agent(agent_name) - raise - - # Start agent service only if not already running - try: - if agent_name not in self._running_agents: - await self._start_agent_service(agent_name, agent_instance) - logger.info(f"Started service for agent '{agent_name}'") - else: - logger.info(f"Service for agent '{agent_name}' already running") - except Exception as e: - logger.error(f"Failed to start agent '{agent_name}': {e}") - await self._cleanup_agent(agent_name) - raise RuntimeError(f"Failed to start agent '{agent_name}'") from e - - # Create client connection with listener URL only if not exists - if agent_name not in self._connections: - self._create_client_for_agent(agent_name, agent_card.url, listener_url) - logger.info(f"Created client connection for agent '{agent_name}'") - else: - logger.info( - f"Client connection for agent '{agent_name}' already exists" - ) + # Ensure listener if requested and supported + if with_listener: + await self._ensure_listener(ctx) - return agent_card + return ctx.client.agent_card - async def _setup_listener_if_needed( - self, - agent_name: str, - agent_card: AgentCard, - with_listener: bool, - listener_host: str, - listener_port: int, - notification_callback: NotificationCallbackType, - ) -> str: - """Setup listener for agent if needed and supported. Returns listener URL or None.""" + async def _ensure_listener(self, ctx: AgentContext) -> None: + """Ensure listener is running if supported by agent card.""" + if ctx.listener_task: + return if ( - not with_listener - or not agent_card - or not agent_card.capabilities.push_notifications + ctx.client + and ctx.client.agent_card + and not ctx.client.agent_card.capabilities.push_notifications ): - return None - + return try: - return await self._start_listener_for_agent( - agent_name, - listener_host=listener_host, - listener_port=listener_port, - notification_callback=notification_callback, + listener_task, listener_url = await self._start_listener( + host=ctx.desired_listener_host or "localhost", + port=ctx.desired_listener_port, + notification_callback=ctx.notification_callback, ) + ctx.listener_task = listener_task + ctx.listener_url = listener_url except Exception as e: - logger.error(f"Failed to start listener for '{agent_name}': {e}") - raise RuntimeError(f"Failed to start listener for '{agent_name}'") from e - - async def _handle_remote_agent( - self, - agent_name: str, - with_listener: bool = True, - listener_port: int = None, - listener_host: str = "localhost", - notification_callback: NotificationCallbackType = None, - ) -> AgentCard: - """Handle remote agent connection and card loading.""" - # Check if remote agent is already connected - if agent_name in self._connections: - logger.info(f"Remote agent '{agent_name}' already connected") - return self._remote_agent_cards.get(agent_name) - - config_data = self._remote_agent_configs[agent_name] - agent_url = config_data["url"] - - # Load actual agent card using A2ACardResolver only if not cached - agent_card = self._remote_agent_cards.get(agent_name) - if not agent_card: - async with httpx.AsyncClient() as httpx_client: - try: - resolver = A2ACardResolver( - httpx_client=httpx_client, base_url=agent_url - ) - agent_card = await resolver.get_agent_card() - self._remote_agent_cards[agent_name] = agent_card - logger.info(f"Loaded agent card for remote agent: {agent_name}") - except Exception as e: - logger.error(f"Failed to get agent card for {agent_name}: {e}") - else: - logger.info(f"Using cached agent card for remote agent: {agent_name}") - - # Setup listener if needed - listener_url = await self._setup_listener_if_needed( - agent_name, - agent_card, - with_listener, - listener_host, - listener_port, - notification_callback, - ) + logger.error(f"Failed to start listener for '{ctx.name}': {e}") + raise RuntimeError(f"Failed to start listener for '{ctx.name}'") from e - # Create client connection with listener URL only if not exists - if agent_name not in self._connections: - self._connections[agent_name] = AgentClient( - agent_url, push_notification_url=listener_url - ) - logger.info(f"Connected to remote agent '{agent_name}' at {agent_url}") - if listener_url: - logger.info(f" └─ with listener at {listener_url}") - else: - logger.info(f"Already connected to remote agent '{agent_name}'") - - return agent_card - - async def _start_listener_for_agent( + async def _ensure_client(self, ctx: AgentContext) -> None: + """Ensure AgentClient is created and connected.""" + if ctx.client: + return + url = ctx.url or (ctx.local_agent_card.url if ctx.local_agent_card else None) + if not url: + raise ValueError(f"Unable to determine URL for agent '{ctx.name}'") + ctx.client = AgentClient(url, push_notification_url=ctx.listener_url) + await ctx.client.ensure_initialized() + logger.info(f"Connected to agent '{ctx.name}' at {url}") + if ctx.listener_url: + logger.info(f" └─ with listener at {ctx.listener_url}") + + async def _start_listener( self, - agent_name: str, - listener_host: str, - listener_port: int = None, + host: str = "localhost", + port: Optional[int] = None, notification_callback: callable = None, - ) -> str: - """Start a NotificationListener for the agent and return its URL.""" - # Auto-assign port if not specified - if listener_port is None: - listener_port = get_next_available_port(5000) - - # Create and start listener + ) -> tuple[asyncio.Task, str]: + """Start a NotificationListener and return (task, url).""" + if port is None: + port = get_next_available_port(5000) listener = NotificationListener( - host=listener_host, - port=listener_port, + host=host, + port=port, notification_callback=notification_callback, ) - listener_task = asyncio.create_task(listener.start_async()) - self._listeners[agent_name] = listener_task - - listener_url = f"http://{listener_host}:{listener_port}/notify" - self._listener_urls[agent_name] = listener_url - - # Wait a moment for listener to start + listener_url = f"http://{host}:{port}/notify" await asyncio.sleep(0.3) - logger.info(f"Started listener for '{agent_name}' at {listener_url}") - - return listener_url - - async def _start_agent_service(self, agent_name: str, agent_instance: object): - """Start the agent service (serve) and track the running task.""" - server_task = asyncio.create_task(agent_instance.serve()) - self._running_agents[agent_name] = server_task - - # Wait for agent to start - await asyncio.sleep(0.5) + logger.info(f"Started listener at {listener_url}") + return listener_task, listener_url - def _create_client_for_agent( - self, agent_name: str, agent_url: str, listener_url: str = None - ): - """Create an AgentClient for the agent and record the connection.""" - self._connections[agent_name] = AgentClient( - agent_url, push_notification_url=listener_url + async def _get_or_create_context( + self, + agent_name: str, + ) -> AgentContext: + """Get an AgentContext for a known agent (from local configs).""" + # Load remote contexts lazily + self._ensure_remote_contexts_loaded() + + ctx = self._contexts.get(agent_name) + if ctx: + return ctx + + # If not local and not preloaded as remote, it's unknown + raise ValueError( + f"Agent '{agent_name}' not found (neither local nor remote config)" ) - logger.info(f"Started agent '{agent_name}' at {agent_url}") - if listener_url: - logger.info(f" └─ with listener at {listener_url}") - async def _cleanup_agent(self, agent_name: str): """Clean up all resources for an agent""" - # Close client connection - if agent_name in self._connections: - await self._connections[agent_name].close() - + ctx = self._contexts.get(agent_name) + if not ctx: + return + # Close client + if ctx.client: + await ctx.client.close() + ctx.client = None # Stop listener - if agent_name in self._listeners: - self._listeners[agent_name].cancel() + if ctx.listener_task: + ctx.listener_task.cancel() try: - await self._listeners[agent_name] + await ctx.listener_task except asyncio.CancelledError: pass - del self._listeners[agent_name] - - # Stop agent - if agent_name in self._running_agents: - self._running_agents[agent_name].cancel() - try: - await self._running_agents[agent_name] - except asyncio.CancelledError: - pass - del self._running_agents[agent_name] - - # Clean up references - if agent_name in self._connections: - del self._connections[agent_name] - if agent_name in self._agent_instances: - del self._agent_instances[agent_name] - if agent_name in self._listener_urls: - del self._listener_urls[agent_name] + ctx.listener_task = None + ctx.listener_url = None + # Keep the context to allow quick reconnection; do not delete metadata + # Removing deletion allows list_available_agents to remain stable async def get_client(self, agent_name: str) -> AgentClient: """Get Agent client connection""" - if agent_name not in self._connections: + ctx = self._contexts.get(agent_name) + if not ctx or not ctx.client: await self.start_agent(agent_name) - - return self._connections[agent_name] + ctx = self._contexts.get(agent_name) + return ctx.client async def stop_agent(self, agent_name: str): """Stop Agent service and associated listener""" @@ -412,72 +253,31 @@ async def stop_agent(self, agent_name: str): def list_running_agents(self) -> List[str]: """List running agents""" - return list(self._running_agents.keys()) + return [name for name, ctx in self._contexts.items() if ctx.client] def list_available_agents(self) -> List[str]: - """List all available agents from registry and remote cards""" - # Auto-load remote agent configs if not already loaded - if not self._remote_agent_configs: - self._load_remote_agent_configs() - - local_agents = registry.list_agent_names() - remote_agents = list(self._remote_agent_configs.keys()) - return local_agents + remote_agents + """List all available agents from local config cards""" + # Ensure remote contexts are loaded + self._ensure_remote_contexts_loaded() + return list(self._contexts.keys()) async def stop_all(self): - """Stop all running agents""" - for agent_name in list(self._running_agents.keys()): + """Stop all running clients and listeners""" + for agent_name in list(self._contexts.keys()): await self.stop_agent(agent_name) - def get_agent_info(self, agent_name: str) -> dict: - """Get agent information including listener info""" - # Check if it's a local agent - if agent_name in self._agent_instances: - agent_instance = self._agent_instances[agent_name] - return { - "name": agent_name, - "type": "local", - "url": agent_instance.agent_card.url, - "listener_url": self._listener_urls.get(agent_name), - "card": agent_instance.agent_card.model_dump(exclude_none=True), - "running": agent_name in self._running_agents, - "has_listener": agent_name in self._listeners, - } - - # Check if it's a remote agent - if agent_name in self._remote_agent_configs: - config_data = self._remote_agent_configs[agent_name] - agent_card = self._remote_agent_cards.get(agent_name) - return { - "name": agent_name, - "type": "remote", - "url": config_data.get("url"), - "card": ( - agent_card.model_dump(exclude_none=True) - if agent_card - else config_data - ), - "connected": agent_name in self._connections, - "running": False, # Remote agents are not managed by us - "has_listener": False, - } - + def get_agent_card(self, agent_name: str) -> Optional[AgentCard]: + """Get AgentCard for a known agent from local configs.""" + self._ensure_remote_contexts_loaded() + ctx = self._contexts.get(agent_name) + if not ctx: + return None + if ctx.client and ctx.client.agent_card: + return ctx.client.agent_card + if ctx.local_agent_card: + return ctx.local_agent_card return None - def list_remote_agents(self) -> List[str]: - """List remote agents loaded from config files""" - # Auto-load remote agent configs if not already loaded - if not self._remote_agent_configs: - self._load_remote_agent_configs() - return list(self._remote_agent_configs.keys()) - - def get_remote_agent_card(self, agent_name: str) -> dict: - """Get remote agent card data""" - # Return actual AgentCard if available, otherwise config data - if agent_name in self._remote_agent_cards: - return self._remote_agent_cards[agent_name] - return self._remote_agent_configs.get(agent_name) - # Global default instance for backward compatibility and ease of use _default_remote_connections = RemoteConnections() @@ -487,70 +287,3 @@ def get_remote_agent_card(self, agent_name: str) -> dict: def get_default_remote_connections() -> RemoteConnections: """Get the default RemoteConnections instance""" return _default_remote_connections - - -async def load_remote_agents(config_dir: str = None) -> None: - """Load remote agents via the default RemoteConnections instance""" - return await _default_remote_connections.load_remote_agents(config_dir) - - -async def connect_remote_agent(agent_name: str) -> str: - """Connect to a remote agent using the default instance""" - return await _default_remote_connections.connect_remote_agent(agent_name) - - -async def start_agent( - agent_name: str, - with_listener: bool = True, - listener_port: int = None, - listener_host: str = "localhost", - notification_callback: callable = None, -) -> str: - """Start an agent using the default RemoteConnections instance""" - return await _default_remote_connections.start_agent( - agent_name, - with_listener=with_listener, - listener_port=listener_port, - listener_host=listener_host, - notification_callback=notification_callback, - ) - - -async def get_client(agent_name: str) -> AgentClient: - """Get an AgentClient from the default RemoteConnections instance""" - return await _default_remote_connections.get_client(agent_name) - - -async def stop_agent(agent_name: str): - """Stop an agent using the default RemoteConnections instance""" - return await _default_remote_connections.stop_agent(agent_name) - - -def list_running_agents() -> List[str]: - """List running agents from the default RemoteConnections instance""" - return _default_remote_connections.list_running_agents() - - -def list_available_agents() -> List[str]: - """List available agents from the default RemoteConnections instance""" - return _default_remote_connections.list_available_agents() - - -async def stop_all(): - """Stop all agents via the default RemoteConnections instance""" - return await _default_remote_connections.stop_all() - - -def get_agent_info(agent_name: str) -> dict: - """Get agent info from the default RemoteConnections instance""" - return _default_remote_connections.get_agent_info(agent_name) - - -def list_remote_agents() -> List[str]: - """List remote agents from the default RemoteConnections instance""" - return _default_remote_connections.list_remote_agents() - - -def get_remote_agent_card(agent_name: str) -> dict: - """Get remote agent card data from the default RemoteConnections instance""" - return _default_remote_connections.get_remote_agent_card(agent_name) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 613ba73db..65805c783 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -1,7 +1,5 @@ -import json import logging -from pathlib import Path -from typing import Dict, Optional, Type +from typing import Type import httpx import uvicorn @@ -15,59 +13,24 @@ InMemoryTaskStore, TaskUpdater, ) -from a2a.types import ( - AgentCapabilities, - AgentCard, - AgentSkill, - Part, - TaskState, - TextPart, - UnsupportedOperationError, -) +from a2a.types import AgentCard, Part, TaskState, TextPart, UnsupportedOperationError from a2a.utils import new_agent_text_message, new_task from a2a.utils.errors import ServerError -from valuecell.core.agent import registry +from valuecell.core.agent.card import find_local_agent_card_by_agent_name from valuecell.core.types import ( BaseAgent, NotifyResponse, StreamResponse, StreamResponseEvent, ) -from valuecell.utils import ( - get_agent_card_path, - get_next_available_port, - parse_host_port, -) +from valuecell.utils import parse_host_port from .responses import EventPredicates logger = logging.getLogger(__name__) -def serve( - host: str = "localhost", - port: int = None, - streaming: bool = True, - push_notifications: bool = False, - description: str = None, - version: str = "1.0.0", - skills: list[AgentSkill | dict] = None, - **extra_kwargs, -): +def _serve(agent_card: AgentCard): def decorator(cls: Type) -> Type: - if extra_kwargs: - logger.warning( - f"Extra kwargs {extra_kwargs} are not used in the @serve decorator" - ) - - # Build agent card (port will be assigned when server starts) - agent_skills = [] - if skills: - for skill in skills: - if isinstance(skill, dict): - agent_skills.append(AgentSkill(**skill)) - elif isinstance(skill, AgentSkill): - agent_skills.append(skill) - # Determine the agent name consistently agent_name = cls.__name__ @@ -76,29 +39,13 @@ class DecoratedAgent(cls): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # Assign port when instance is created - actual_port = port or get_next_available_port() - # Create agent card with actual port - self.agent_card = AgentCard( - name=agent_name, - description=description - or f"No description available for {agent_name}", - url=f"http://{host}:{actual_port}/", - version=version, - default_input_modes=["text"], - default_output_modes=["text"], - capabilities=AgentCapabilities( - streaming=streaming, push_notifications=push_notifications - ), - skills=agent_skills, - supports_authenticated_extended_card=False, - ) + self.agent_card = agent_card - self._host = host - self._port = actual_port + self._host, self._port = parse_host_port( + agent_card.url, default_scheme="http" + ) self._executor = None - self._server_task = None async def serve(self): # Create AgentExecutor wrapper @@ -138,13 +85,13 @@ async def serve(self): DecoratedAgent.__qualname__ = cls.__qualname__ # Register to registry - try: - registry.register(DecoratedAgent, agent_name) - except ImportError: - # Registry not available, skip registration - logger.warning( - f"Agent registry not available, skipping registration for {DecoratedAgent.__name__}" - ) + # try: + # registry.register(DecoratedAgent, agent_name) + # except ImportError: + # # Registry not available, skip registration + # logger.warning( + # f"Agent registry not available, skipping registration for {DecoratedAgent.__name__}" + # ) return DecoratedAgent @@ -271,56 +218,12 @@ def _create_agent_executor(agent_instance): return GenericAgentExecutor(agent_instance) -def _get_serve_params_by_agent_name(name: str) -> Optional[Dict]: - """ - Reads JSON files from agent_cards directory and returns the first one where name matches. - - Args: - name: The agent name to search for - - Returns: - Dict: The agent configuration dictionary if found, None otherwise - """ - agent_cards_path = Path(get_agent_card_path()) - - # Check if the agent_cards directory exists - if not agent_cards_path.exists(): - return None - - # Iterate through all JSON files in the agent_cards directory - for json_file in agent_cards_path.glob("*.json"): - try: - with open(json_file, "r", encoding="utf-8") as f: - agent_config = json.load(f) - - # Check if this agent config has the matching name - if not isinstance(agent_config, dict): - continue - if agent_config.get("name") != name: - continue - if "url" in agent_config and agent_config["url"]: - host, port = parse_host_port( - agent_config.get("url"), default_scheme="http" - ) - agent_config["host"] = host - agent_config["port"] = port - - return agent_config - - except (json.JSONDecodeError, IOError): - # Skip files that can't be read or parsed - continue - - # Return None if no matching agent is found - return None - - def create_wrapped_agent(agent_class: Type[BaseAgent]): # Get agent configuration from agent cards - agent_config = _get_serve_params_by_agent_name(agent_class.__name__) - if not agent_config: + agent_card = find_local_agent_card_by_agent_name(agent_class.__name__) + if not agent_card: raise ValueError( f"No agent configuration found for {agent_class.__name__} in agent cards" ) - return serve(**agent_config)(agent_class)() + return _serve(agent_card)(agent_class)() diff --git a/python/valuecell/core/agent/tests/test_connect.py b/python/valuecell/core/agent/tests/test_connect.py new file mode 100644 index 000000000..be1c2e19a --- /dev/null +++ b/python/valuecell/core/agent/tests/test_connect.py @@ -0,0 +1,326 @@ +""" +Unit tests for valuecell.core.agent.connect.RemoteConnections + +Notes: +- We avoid real network and uvicorn by monkeypatching AgentClient and NotificationListener. +- AgentCard JSONs are generated with all required fields to satisfy a2a.types.AgentCard. +""" + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass +from pathlib import Path +from typing import ClassVar, Dict, Optional + +import pytest + +from a2a.client.client_factory import minimal_agent_card +from a2a.types import AgentCard + +from valuecell.core.agent.connect import RemoteConnections + + +# ---------------------------- +# Test helpers and fakes +# ---------------------------- + + +def make_card_dict(name: str, url: str, push_notifications: bool) -> dict: + """Create a complete AgentCard dict with all required fields filled. + + We base this on a2a.client.client_factory.minimal_agent_card to ensure all + required properties exist, then override name/url and capabilities. + """ + base = minimal_agent_card(url) + card_dict = base.model_dump() + card_dict.update( + { + "name": name, + "url": url, + # Provide a description to be explicit (parse_local_agent_card_dict can also fill it) + "description": f"Test card for {name}", + # Capabilities must include push_notifications per our tests + "capabilities": { + "streaming": True, + "push_notifications": push_notifications, + }, + } + ) + return card_dict + + +@dataclass +class FakeAgentClient: + """A lightweight stand-in for AgentClient to avoid real HTTP calls. + + Behavior: + - ensure_initialized() sets agent_card from the url->AgentCard mapping. + - close() marks the instance as closed. + """ + + # Class-level registry populated by tests: url -> AgentCard + cards_by_url: ClassVar[Dict[str, AgentCard]] = {} + # Class-level counter to validate single instantiation in concurrency tests + create_count: ClassVar[int] = 0 + + agent_url: str + push_notification_url: Optional[str] = None + + def __init__(self, agent_url: str, push_notification_url: str | None = None): + type(self).create_count += 1 + self.agent_url = agent_url + self.push_notification_url = push_notification_url + self.agent_card: Optional[AgentCard] = None + self._closed = False + + async def ensure_initialized(self): + # Simulate I/O a little + await asyncio.sleep(0) + # Map back to the card provided in the JSON for this URL + card = type(self).cards_by_url.get(self.agent_url) + if card is None: + # As a fallback, generate a minimal card to keep tests robust + card = minimal_agent_card(self.agent_url) + self.agent_card = card + + async def close(self): + self._closed = True + + +class DummyNotificationListener: + """Dummy listener that doesn't bind a real port.""" + + def __init__( + self, host: str = "localhost", port: int = 0, notification_callback=None + ): + self.host = host + self.port = port + self.notification_callback = notification_callback + + async def start_async(self): + # Simulate server startup without actually starting uvicorn + await asyncio.sleep(0.01) + + +# ---------------------------- +# Tests +# ---------------------------- + + +@pytest.mark.asyncio +async def test_load_from_dir_and_list(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Prepare two agent cards + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + cards = [ + make_card_dict("AgentAlpha", "http://127.0.0.1:8101", push_notifications=False), + make_card_dict("AgentBeta", "http://127.0.0.1:8102", push_notifications=True), + ] + for c in cards: + with open(dir_path / f"{c['name']}.json", "w", encoding="utf-8") as f: + json.dump(c, f) + + # Wire FakeAgentClient and DummyNotificationListener + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + + # Also prime the client mapping + FakeAgentClient.cards_by_url = { + c["url"]: AgentCard.model_validate(c) for c in cards + } + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + all_agents = rc.list_available_agents() + assert set(all_agents) == {"AgentAlpha", "AgentBeta"} + + +@pytest.mark.asyncio +async def test_start_agent_without_listener( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + # Create a card that does not support push notifications + card = make_card_dict( + "NoPushAgent", "http://127.0.0.1:8201", push_notifications=False + ) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + returned_card = await rc.start_agent("NoPushAgent", with_listener=False) + assert isinstance(returned_card, AgentCard) + assert returned_card.name == "NoPushAgent" + + # Validate client exists and was created exactly once + client = await rc.get_client("NoPushAgent") + assert isinstance(client, FakeAgentClient) + assert client.push_notification_url is None # listener not requested + assert FakeAgentClient.create_count == 1 + + +@pytest.mark.asyncio +async def test_start_agent_with_listener_when_supported( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + # Card supports push notifications + card = make_card_dict("PushAgent", "http://127.0.0.1:8301", push_notifications=True) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + returned_card = await rc.start_agent( + "PushAgent", with_listener=True, listener_host="127.0.0.1" + ) + assert isinstance(returned_card, AgentCard) + assert returned_card.name == "PushAgent" + + # Ensure listener started and URL recorded + ctx = rc._contexts["PushAgent"] + assert ctx.listener_url is not None + assert ctx.listener_url.startswith("http://127.0.0.1:") + + # Current implementation creates client before listener, so push_notification_url stays None + client = await rc.get_client("PushAgent") + assert isinstance(client, FakeAgentClient) + assert client.push_notification_url is None + + +@pytest.mark.asyncio +async def test_start_agent_with_listener_but_not_supported( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + # Card does NOT support push notifications; listener should not be started + card = make_card_dict("NoPush", "http://127.0.0.1:8401", push_notifications=False) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + await rc.start_agent("NoPush", with_listener=True) + client = await rc.get_client("NoPush") + assert isinstance(client, FakeAgentClient) + # Since capabilities.push_notifications=False, listener shouldn't be used + assert client.push_notification_url is None + + +@pytest.mark.asyncio +async def test_concurrent_start_calls_single_initialization( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + card = make_card_dict( + "ConcurrentAgent", "http://127.0.0.1:8501", push_notifications=True + ) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + with open(dir_path / f"{card['name']}.json", "w", encoding="utf-8") as f: + json.dump(card, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = {card["url"]: AgentCard.model_validate(card)} + FakeAgentClient.create_count = 0 + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + # Launch multiple concurrent start calls for the same agent + await asyncio.gather( + rc.start_agent("ConcurrentAgent", with_listener=True), + rc.start_agent("ConcurrentAgent", with_listener=True), + rc.start_agent("ConcurrentAgent", with_listener=True), + ) + + # Only one client should have been constructed + assert FakeAgentClient.create_count == 1 + + +@pytest.mark.asyncio +async def test_stop_agent_and_stop_all(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + card1 = make_card_dict("A1", "http://127.0.0.1:8601", push_notifications=True) + card2 = make_card_dict("A2", "http://127.0.0.1:8602", push_notifications=False) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + for c in (card1, card2): + with open(dir_path / f"{c['name']}.json", "w", encoding="utf-8") as f: + json.dump(c, f) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + monkeypatch.setattr(connect_mod, "NotificationListener", DummyNotificationListener) + FakeAgentClient.cards_by_url = { + card1["url"]: AgentCard.model_validate(card1), + card2["url"]: AgentCard.model_validate(card2), + } + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + await rc.start_agent("A1", with_listener=True) + await rc.start_agent("A2", with_listener=False) + assert set(rc.list_running_agents()) == {"A1", "A2"} + + # Stop a single agent + await rc.stop_agent("A1") + assert rc.list_running_agents() == ["A2"] + + # Stop all + await rc.stop_all() + assert rc.list_running_agents() == [] + + +@pytest.mark.asyncio +async def test_unknown_agent_raises(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Empty directory (no cards) + dir_path = tmp_path / "agent_cards" + dir_path.mkdir(parents=True) + + from valuecell.core.agent import connect as connect_mod + + monkeypatch.setattr(connect_mod, "AgentClient", FakeAgentClient) + + rc = RemoteConnections() + rc.load_from_dir(str(dir_path)) + + with pytest.raises(ValueError): + await rc.start_agent("NotExist") diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index 33a65bb1e..b38d3d6a4 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -223,12 +223,12 @@ def tool_get_agent_description(self, agent_name: str) -> str: Returns: str: A description of the agent's capabilities and supported operations """ - self.agent_connections.list_remote_agents() - if card := self.agent_connections.get_remote_agent_card(agent_name): + if card := self.agent_connections.get_agent_card(agent_name): if isinstance(card, AgentCard): return agentcard_to_prompt(card) if isinstance(card, dict): return str(card) + return agentcard_to_prompt(card) return "The requested agent could not be found or is not available." diff --git a/python/valuecell/examples/core_e2e_demo.py b/python/valuecell/examples/core_e2e_demo.py deleted file mode 100644 index 3e59cb416..000000000 --- a/python/valuecell/examples/core_e2e_demo.py +++ /dev/null @@ -1,175 +0,0 @@ -import asyncio -import logging -from valuecell.core.agent.decorator import serve -from valuecell.core.agent.connect import RemoteConnections - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - - -# Demo agents using the @serve decorator -@serve(push_notifications=True) -class CalculatorAgent: - """A calculator agent that can do basic math""" - - def __init__(self): - logger.info("Initializing CalculatorAgent") - self.agent_name = "CalculatorAgent" - - async def stream(self, query, session_id, task_id): - """Process math queries""" - logger.info(f"Calculator processing: {query}") - - yield {"is_task_complete": False, "content": f"🧮 Calculating: {query}"} - await asyncio.sleep(0.5) - - try: - # Simple math evaluation (in real world, use safe parsing) - if any(op in query for op in ["+", "-", "*", "/", "(", ")"]): - # For demo, just respond with a mock calculation - result = "42" # Mock result - yield {"is_task_complete": False, "content": "💭 Computing result..."} - await asyncio.sleep(0.5) - yield {"is_task_complete": True, "content": f"✅ Result: {result}"} - else: - yield { - "is_task_complete": True, - "content": "❓ I can help with math calculations. Try something like '2 + 3'", - } - except Exception as e: - yield { - "is_task_complete": True, - "content": f"❌ Error in calculation: {str(e)}", - } - - -@serve( - port=9101, - push_notifications=True, - description="Provides weather information", -) -class WeatherAgent: - """A weather information agent""" - - def __init__(self): - logger.info("Initializing WeatherAgent") - self.agent_name = "WeatherAgent" - - async def stream(self, query, session_id, task_id): - """Process weather queries""" - logger.info(f"Weather processing: {query}") - - yield {"is_task_complete": False, "content": f"🌤️ Checking weather for: {query}"} - await asyncio.sleep(0.8) - - if "weather" in query.lower(): - yield { - "is_task_complete": False, - "content": "🌡️ Fetching current conditions...", - } - await asyncio.sleep(0.5) - yield { - "is_task_complete": False, - "content": "📊 Analyzing forecast data...", - } - await asyncio.sleep(0.5) - yield { - "is_task_complete": True, - "content": f"☀️ Weather report: Sunny, 22°C. Perfect day! (for query: {query})", - } - else: - yield { - "is_task_complete": True, - "content": "🌍 I provide weather information. Ask me about the weather in any location!", - } - - -@serve() -class SimpleAgent: - """A simple non-streaming agent""" - - async def stream(self, query, session_id, task_id): - """Simple response""" - yield {"is_task_complete": True, "content": f"Simple response to: {query}"} - - -async def demo_complete_system(): - """Complete demonstration of the decorator system""" - logger.info("🚀 Starting Complete A2A Decorator System Demo") - - # Create connections manager - connections = RemoteConnections() - - try: - # Show available agents from registry - available = connections.list_available_agents() - logger.info(f"📋 Available agents from registry: {available}") - - # Start multiple agents - logger.info("▶️ Starting multiple agents...") - - calc_url = await connections.start_agent("CalculatorAgent") - weather_url = await connections.start_agent("WeatherAgent") - simple_url = await connections.start_agent("SimpleAgent") - - logger.info(f"🧮 Calculator Agent: {calc_url}") - logger.info(f"🌤️ Weather Agent: {weather_url}") - logger.info(f"📝 Simple Agent: {simple_url}") - - # Wait for all agents to fully start - await asyncio.sleep(3) - - # Show running agents - running = connections.list_running_agents() - logger.info(f"🏃 Running agents: {running}") - - # Test Calculator Agent - logger.info("🧪 Testing Calculator Agent...") - client = await connections.get_client("CalculatorAgent") - task, event = await client.send_message("What is 15 + 27?") - logger.info(f"Calculator result: {task.status}") - - # # Test Weather Agent - logger.info("🧪 Testing Weather Agent...") - client = await connections.get_client("WeatherAgent") - task, event = await client.send_message( - "What's the weather like in San Francisco?" - ) - logger.info(f"Weather result: {task.status}") - - # Test Simple Agent - logger.info("🧪 Testing Simple Agent...") - client = await connections.get_client("SimpleAgent") - task, event = await client.send_message("Hello simple agent") - logger.info(f"Simple agent result: {task.status}") - - await asyncio.sleep(5) - # Show agent information - for agent_name in running: - info = connections.get_agent_info(agent_name) - if info: - logger.info( - f"ℹ️ {agent_name}: {info['url']} (running: {info['running']})" - ) - - logger.info("✅ All tests completed successfully!") - - except Exception as e: - logger.error(f"❌ Error in demo: {e}") - import traceback - - traceback.print_exc() - raise - - finally: - # Clean up - logger.info("🧹 Stopping all agents...") - await connections.stop_all() - logger.info("✅ Demo completed and cleaned up") - - -if __name__ == "__main__": - asyncio.run(demo_complete_system()) diff --git a/python/valuecell/examples/core_remote_agent_demo.py b/python/valuecell/examples/core_remote_agent_demo.py deleted file mode 100644 index ed36eee6a..000000000 --- a/python/valuecell/examples/core_remote_agent_demo.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -Example usage of RemoteConnections with remote agents. - -This script demonstrates how to: -1. Load remote agent cards from configuration files -2. Connect to remote agents -3. Get agent information -""" - -import asyncio -import logging - -from valuecell.core.agent.connect import RemoteConnections - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -async def main(): - """Main example function.""" - # Create RemoteConnections instance - connections = RemoteConnections() - - # List all available agents (local + remote) - available_agents = connections.list_available_agents() - logger.info(f"Available agents: {available_agents}") - - # List only remote agents - remote_agents = connections.list_remote_agents() - logger.info(f"Remote agents: {remote_agents}") - - # Get info for each remote agent - for agent_name in remote_agents: - agent_info = connections.get_agent_info(agent_name) - logger.info(f"Agent info for '{agent_name}': {agent_info}") - - # Get raw card data - card_data = connections.get_remote_agent_card(agent_name) - logger.info(f"Card data for '{agent_name}': {card_data}") - - # Try to start/connect to a remote agent (if any) - if remote_agents: - agent_name = remote_agents[0] - logger.info(f"Attempting to start remote agent: {agent_name}") - try: - agent_url = await connections.start_agent(agent_name) - logger.info(f"Successfully started {agent_name} at {agent_url}") - - # Get client connection - client = await connections.get_client(agent_name) - logger.info(f"Got client for {agent_name}: {client}") - - # Check updated agent info - updated_info = connections.get_agent_info(agent_name) - logger.info(f"Updated info for '{agent_name}': {updated_info}") - - async for task, event in await client.send_message( - "analyze apple stock with buffett", - streaming=True, - ): - logger.info(f"Response from {agent_name}:\n{task}\n{event}") - - except Exception as e: - logger.error(f"Failed to start {agent_name}: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/python/valuecell/examples/trading_agents_demo.py b/python/valuecell/examples/trading_agents_demo.py index 3f76c2391..58edc5f4f 100644 --- a/python/valuecell/examples/trading_agents_demo.py +++ b/python/valuecell/examples/trading_agents_demo.py @@ -31,8 +31,8 @@ async def setup(self): """Set up connection""" try: # Connect to remote agent - agent_url = await self.connections.connect_remote_agent(self.agent_name) - logger.info(f"TradingAgents connected successfully: {agent_url}") + # agent_url = await self.connections.connect_remote_agent(self.agent_name) + # logger.info(f"TradingAgents connected successfully: {agent_url}") # Get client connection self.client = await self.connections.get_client(self.agent_name)