diff --git a/python/valuecell/core/agent/README.md b/python/valuecell/core/agent/README.md deleted file mode 100644 index 82f2da823..000000000 --- a/python/valuecell/core/agent/README.md +++ /dev/null @@ -1,439 +0,0 @@ -# ValueCell Agent System - -The ValueCell Agent System is a distributed intelligent agent framework based on the Agent-to-Agent (A2A) protocol, providing a clean decorator interface and powerful connection management capabilities. - -## Core Features - -- 🎯 **Simple Decorator**: Easily create Agents using the `@serve` decorator -- 🔄 **Streaming Response**: Support for real-time streaming data processing -- 🌐 **Distributed Architecture**: Support for both local and remote Agent connections -- 📡 **Push Notifications**: Optional push notification functionality -- 🔧 **Flexible Configuration**: Support for automatic port allocation and custom configuration - -## Quick Start - -### 1. Create a Simple Agent - -```python -from valuecell.core.agent.decorator import serve - -@serve(push_notifications=True) -class CalculatorAgent: - """An agent that can perform basic math calculations""" - - def __init__(self): - self.agent_name = "CalculatorAgent" - - async def stream(self, query, session_id, task_id): - """Process math queries""" - yield {"is_task_complete": False, "content": f"🧮 Calculating: {query}"} - - # Execute calculation logic - try: - if any(op in query for op in ["+", "-", "*", "/", "(", ")"]): - result = eval(query) # Note: Use safe parsing in production - yield {"is_task_complete": True, "content": f"✅ Result: {result}"} - else: - yield { - "is_task_complete": True, - "content": "❓ Please enter a math expression, e.g., '2 + 3'" - } - except Exception as e: - yield { - "is_task_complete": True, - "content": f"❌ Calculation error: {str(e)}" - } -``` - -### 2. Use RemoteConnections to manage Agents - -```python -import asyncio -from valuecell.core.agent.connect import RemoteConnections - -async def main(): - # Create connection manager - connections = RemoteConnections() - - # List all available Agents - available = connections.list_available_agents() - print(f"Available Agents: {available}") - - # Start Agent - calc_url = await connections.start_agent("CalculatorAgent") - print(f"Calculator Agent started at: {calc_url}") - - # Get client and send message - client = await connections.get_client("CalculatorAgent") - task, event = await client.send_message("What is 15 + 27?") - print(f"Calculation result: {task.status}") - - # Clean up resources - await connections.stop_all() - -if __name__ == "__main__": - asyncio.run(main()) -``` - -## Core Components - -### 1. @serve Decorator - -The `@serve` decorator is the core tool for creating Agents, providing the following parameters: - -```python -@serve( - host="localhost", # Service host - port=9100, # Service port (optional, auto-allocated) - streaming=True, # Whether to support streaming response - push_notifications=False, # Whether to enable push notifications - description="Description", # Agent description - version="1.0.0", # Agent version - skills=[] # Agent skills list -) -``` - -### 2. RemoteConnections Class - -`RemoteConnections` is the core class for Agent connection management, providing the following functionality: - -#### Basic Operations - -```python -connections = RemoteConnections() - -# List all available Agents (local + remote) -available_agents = connections.list_available_agents() - -# List running Agents -running_agents = connections.list_running_agents() - -# Start Agent -agent_url = await connections.start_agent("AgentName") - -# Get Agent client -client = await connections.get_client("AgentName") - -# Stop specific Agent -await connections.stop_agent("AgentName") - -# Stop all Agents -await connections.stop_all() -``` - -#### Remote and Local Are Unified - -```python -# All available agents (local + configured URL-only) -available_agents = connections.list_available_agents() - -# Get Agent information (if implemented) -agent_info = connections.get_agent_info("AgentName") - -# Get Agent card (returns None if unavailable; set fetch_if_missing=True to fetch remotely) -card = connections.get_agent_card("AgentName", fetch_if_missing=False) -``` - -### 3. AgentClient Class - -`AgentClient` provides the interface for communicating with Agents: - -```python -client = AgentClient("http://localhost:9100/") - -# Send message (non-streaming) -task, event = await client.send_message("Hello Agent") - -# Send message (streaming) -async for response in await client.send_message("Stream query", streaming=True): - print(f"Streaming response: {response}") - -# Get Agent card information -card = await client.get_agent_card() - -# Close connection -await client.close() -``` - -## Complete Examples - -### Example 1: Multi-Agent System - -```python -import asyncio -import logging -from valuecell.core.agent.decorator import serve -from valuecell.core.agent.connect import RemoteConnections - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -@serve(push_notifications=True) -class CalculatorAgent: - async def stream(self, query, session_id, task_id): - yield {"is_task_complete": False, "content": f"🧮 Calculating: {query}"} - await asyncio.sleep(0.5) - yield {"is_task_complete": True, "content": "✅ Calculation complete"} - -@serve(port=9101, push_notifications=True) -class WeatherAgent: - async def stream(self, query, session_id, task_id): - yield {"is_task_complete": False, "content": f"🌤️ Checking weather: {query}"} - await asyncio.sleep(0.8) - yield {"is_task_complete": True, "content": "☀️ Today's weather: Sunny, 22°C"} - -async def demo(): - connections = RemoteConnections() - - try: - # Start multiple Agents - calc_url = await connections.start_agent("CalculatorAgent") - weather_url = await connections.start_agent("WeatherAgent") - - logger.info(f"Calculator Agent: {calc_url}") - logger.info(f"Weather Agent: {weather_url}") - - # Wait for Agents to start - await asyncio.sleep(2) - - # Test Calculator Agent - calc_client = await connections.get_client("CalculatorAgent") - task, _ = await calc_client.send_message("2 + 3") - logger.info(f"Calculator result: {task.status}") - - # Test Weather Agent - weather_client = await connections.get_client("WeatherAgent") - task, _ = await weather_client.send_message("How's the weather in Beijing?") - logger.info(f"Weather result: {task.status}") - - await asyncio.sleep(5) - - finally: - await connections.stop_all() - -if __name__ == "__main__": - asyncio.run(demo()) -``` - -### Example 2: Remote Agent Connection - -```python -import asyncio -import logging -from valuecell.core.agent.connect import RemoteConnections - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -async def remote_demo(): - connections = RemoteConnections() - - # List all available Agents (including remote ones) - available_agents = connections.list_available_agents() - logger.info(f"Available Agents: {available_agents}") - - # Connect to any available Agent (including URL-only) - if available_agents: - agent_name = available_agents[0] - try: - agent_url = await connections.start_agent(agent_name) - logger.info(f"Successfully connected to remote Agent {agent_name}: {agent_url}") - - # Get client and send message - client = await connections.get_client(agent_name) - - # Stream process message - async for response in await client.send_message( - "Analyze Apple stock", - streaming=True - ): - logger.info(f"Remote response: {response}") - - except Exception as e: - logger.error(f"Failed to connect to Agent {agent_name}: {e}") - -if __name__ == "__main__": - asyncio.run(remote_demo()) -``` - -## Agent Development Guide - -### 1. Agent Interface Implementation - -All Agents must implement the `stream` method: - -```python -async def stream(self, query, session_id, task_id): - """ - Process user queries and return streaming responses - - Args: - query: User query content - session_id: Session ID - task_id: Task ID - - Yields: - dict: Dictionary containing 'content' and 'is_task_complete' - """ - pass -``` - -### 2. Response Format - -Each response should be a dictionary containing the following fields: - -```python -{ - "content": "Response content", # Required: Text content of the response - "is_task_complete": False # Required: Whether the task is complete -} -``` - -### 3. Error Handling - -It's recommended to add appropriate error handling in Agents: - -```python -async def stream(self, query, session_id, task_id): - try: - # Processing logic - yield {"is_task_complete": False, "content": "Processing..."} - # ... business logic ... - yield {"is_task_complete": True, "content": "Complete"} - except Exception as e: - yield { - "is_task_complete": True, - "content": f"❌ Processing error: {str(e)}" - } -``` - -### 4. Asynchronous Operations - -Agents can perform asynchronous operations internally: - -```python -async def stream(self, query, session_id, task_id): - yield {"is_task_complete": False, "content": "Starting process..."} - - # Asynchronous wait - await asyncio.sleep(1) - - yield {"is_task_complete": False, "content": "Intermediate step..."} - - # More asynchronous operations - result = await some_async_function() - - yield {"is_task_complete": True, "content": f"Complete: {result}"} -``` - -## Configuration - -### Remote Agent Configuration - -Create JSON configuration files in the `python/configs/agent_cards/` directory: - -```json -{ - "name": "Hedge Fund Agent", - "url": "http://localhost:8080/", - "description": "Professional hedge fund analysis Agent", - "version": "1.0.0", - "capabilities": { - "streaming": true, - "push_notifications": true - } -} -``` - -### Environment Variables - -Configuration via environment variables: - -```bash -export VALUECELL_AGENT_HOST=localhost -export VALUECELL_AGENT_PORT_RANGE_START=9100 -export VALUECELL_AGENT_PORT_RANGE_END=9200 -``` - -## Testing - -Run core functionality tests: - -```bash -cd python -python -m pytest valuecell/core/agent/tests/ -v -``` - -Run complete end-to-end tests: - -```bash -python valuecell/examples/core_e2e_demo.py -``` - -Test remote Agent connections: - -```bash -python valuecell/examples/core_remote_agent_demo.py -``` - -## Logging - -The system uses Python's standard logging library. Recommended configuration: - -```python -import logging - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) - -logger = logging.getLogger(__name__) -``` - -## Performance Considerations - -1. **Port Management**: The system automatically allocates available ports to avoid conflicts -2. **Connection Reuse**: RemoteConnections manages connection pools to avoid duplicate connections -3. **Asynchronous Processing**: Full asynchronous architecture supporting high concurrency -4. **Streaming Response**: Reduces memory usage and improves response speed -5. **Error Recovery**: Built-in error handling and recovery mechanisms - -## Troubleshooting - -### Common Issues - -1. **Port Conflicts** - - ```bash - Solution: Use automatic port allocation or specify different ports - ``` - -2. **Agent Not Registered** - - ```python - # Ensure Agent class is imported - from your_module import YourAgent - ``` - -3. **Connection Failure** - - ```python - # Check if Agent is running - running = connections.list_running_agents() - print(running) - ``` - -4. **Remote Agent Connection Issues** - - ```bash - Check configuration file format and network connectivity - ``` - -### Debug Mode - -Enable verbose logging: - -```python -logging.getLogger("valuecell.core.agent").setLevel(logging.DEBUG) -``` diff --git a/python/valuecell/core/agent/card.py b/python/valuecell/core/agent/card.py index b967e357e..da1458cbe 100644 --- a/python/valuecell/core/agent/card.py +++ b/python/valuecell/core/agent/card.py @@ -9,6 +9,14 @@ def parse_local_agent_card_dict(agent_card_dict: dict) -> Optional[AgentCard]: + """Parse a dictionary into an AgentCard, filling in missing required fields. + + Args: + agent_card_dict: Dictionary containing agent card data + + Returns: + AgentCard instance if parsing succeeds, None if input is not a dict + """ if not isinstance(agent_card_dict, dict): return None # Defined by us, remove fields that are not part of AgentCard @@ -40,14 +48,17 @@ def parse_local_agent_card_dict(agent_card_dict: dict) -> Optional[AgentCard]: def find_local_agent_card_by_agent_name( agent_name: str, base_dir: Optional[str | Path] = None ) -> Optional[AgentCard]: - """ - Reads JSON files from agent_cards directory and returns the first one where name matches. + """Find an agent card by name from local JSON configuration files. + + Searches through JSON files in the agent_cards directory and returns the first + matching agent card where the name field matches the provided agent_name. Args: - name: The agent name to search for + agent_name: The name of the agent to search for + base_dir: Optional base directory to search in. If None, uses default path Returns: - Dict: The agent configuration dictionary if found, None otherwise + AgentCard instance if found and enabled, None otherwise """ agent_cards_path = Path(base_dir) if base_dir else Path(get_agent_card_path()) diff --git a/python/valuecell/core/agent/client.py b/python/valuecell/core/agent/client.py index b04aeb61c..5ec5b0dec 100644 --- a/python/valuecell/core/agent/client.py +++ b/python/valuecell/core/agent/client.py @@ -9,7 +9,19 @@ class AgentClient: + """Client for communicating with remote agents via A2A protocol. + + Handles HTTP communication with remote agents, including message sending + and agent card resolution. Supports both streaming and non-streaming modes. + """ + def __init__(self, agent_url: str, push_notification_url: str = None): + """Initialize the agent client. + + Args: + agent_url: URL of the remote agent + push_notification_url: Optional URL for push notifications + """ self.agent_url = agent_url self.push_notification_url = push_notification_url self.agent_card = None @@ -18,11 +30,13 @@ def __init__(self, agent_url: str, push_notification_url: str = None): self._initialized = False async def ensure_initialized(self): + """Ensure the client is initialized with agent card and HTTP client.""" if not self._initialized: await self._setup_client() self._initialized = True async def _setup_client(self): + """Set up the HTTP client and resolve the agent card.""" self._httpx_client = httpx.AsyncClient(timeout=30) config = ClientConfig( @@ -55,10 +69,21 @@ async def send_message( metadata: dict = None, streaming: bool = False, ) -> AsyncIterator[RemoteAgentResponse]: - """Send message to Agent. + """Send a message to the remote agent and return an async iterator. - If `streaming` is True, return an async iterator producing (task, event) pairs. - If `streaming` is False, return the first (task, event) pair (and close the generator). + This method always returns an async iterator producing (remote_task, + event) pairs. When `streaming` is True the iterator yields streaming + events as they arrive. When `streaming` is False the iterator yields a + single (task, event) pair and then completes. + + Args: + query: The user query to send to the agent. + conversation_id: Optional conversation id to correlate messages. + metadata: Optional metadata to send alongside the message. + streaming: Whether to request streaming responses from the agent. + + Returns: + An async iterator yielding `RemoteAgentResponse` items (task,event). """ await self.ensure_initialized() @@ -88,11 +113,17 @@ async def wrapper() -> AsyncIterator[RemoteAgentResponse]: return wrapper() async def get_agent_card(self): + """Get the agent card from the remote agent. + + Returns: + The resolved agent card + """ await self.ensure_initialized() card_resolver = A2ACardResolver(self._httpx_client, self.agent_url) return await card_resolver.get_agent_card() async def close(self): + """Close the HTTP client and clean up resources.""" if self._httpx_client: await self._httpx_client.aclose() self._httpx_client = None diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index fab35f949..c8e4fc8a4 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -17,7 +17,10 @@ @dataclass class AgentContext: - """Unified context for remote agents.""" + """Unified context for remote agents. + + Stores connection state, URLs, and configuration for a remote agent. + """ name: str # Connection/runtime state @@ -200,9 +203,20 @@ async def _start_listener( self, host: str = "localhost", port: Optional[int] = None, - notification_callback: callable = None, + notification_callback: NotificationCallbackType = None, ) -> tuple[asyncio.Task, str]: - """Start a NotificationListener and return (task, url).""" + """Start a NotificationListener and return (task, url). + + Args: + host: Host to bind the listener to. + port: Optional port to bind; if None a free port will be selected. + notification_callback: Callback invoked when notifications arrive; + should conform to `NotificationCallbackType`. + + Returns: + Tuple of (asyncio.Task, listener_url) where listener_url is the + http URL where notifications should be posted. + """ if port is None: port = get_next_available_port(5000) listener = NotificationListener( diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 41d28e13b..b346a54dd 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -30,6 +30,15 @@ def _serve(agent_card: AgentCard): + """Create a decorator that wraps an agent class with server capabilities. + + Args: + agent_card: The agent card containing configuration + + Returns: + A decorator function that adds serve() method to agent classes + """ + def decorator(cls: Type) -> Type: # Determine the agent name consistently agent_name = cls.__name__ @@ -99,10 +108,30 @@ async def serve(self): class GenericAgentExecutor(AgentExecutor): + """Generic executor for BaseAgent implementations. + + Handles the execution lifecycle including task creation, streaming responses, + and error handling for agents that implement the BaseAgent interface. + """ + def __init__(self, agent: BaseAgent): + """Initialize the executor with an agent instance. + + Args: + agent: The agent instance to execute + """ self.agent = agent async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + """Execute the agent with the given context and event queue. + + Handles task creation if needed, streams responses from the agent, + and updates task status throughout execution. + + Args: + context: The request context containing user input and metadata + event_queue: Queue for sending events back to the client + """ # Prepare query and ensure a task exists in the system query = context.get_user_input() task = context.current_task @@ -184,15 +213,46 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non await updater.complete() async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + """Cancel the current agent execution. + + Args: + context: The request context + event_queue: Queue for sending events + + Raises: + ServerError: Always raises as cancel is not supported + """ # Default cancel operation raise ServerError(error=UnsupportedOperationError()) def _create_agent_executor(agent_instance): + """Create a GenericAgentExecutor for the given agent instance. + + Args: + agent_instance: The agent instance to wrap + + Returns: + GenericAgentExecutor instance + """ return GenericAgentExecutor(agent_instance) def create_wrapped_agent(agent_class: Type[BaseAgent]): + """Create a wrapped agent instance with server capabilities. + + Loads the agent card from local configuration and wraps the agent class + with server functionality. + + Args: + agent_class: The agent class to wrap + + Returns: + Wrapped agent instance ready to serve + + Raises: + ValueError: If no agent configuration is found + """ # Get agent configuration from agent cards agent_card = find_local_agent_card_by_agent_name(agent_class.__name__) if not agent_card: diff --git a/python/valuecell/core/agent/listener.py b/python/valuecell/core/agent/listener.py index 955826a8b..c3b5fe1f6 100644 --- a/python/valuecell/core/agent/listener.py +++ b/python/valuecell/core/agent/listener.py @@ -13,23 +13,45 @@ class NotificationListener: + """HTTP server for receiving push notifications from agents. + + Listens on a specified host and port for incoming notification requests, + validates them, and forwards them to a callback function. + """ + def __init__( self, host: str = "localhost", port: int = 5000, notification_callback: Optional[Callable] = None, ): + """Initialize the notification listener. + + Args: + host: Host to bind the server to + port: Port to listen on + notification_callback: Function to call when notifications are received + """ self.host = host self.port = port self.notification_callback = notification_callback self.app = self._create_app() def _create_app(self): + """Create the Starlette application with notification routes.""" app = Starlette() app.add_route("/notify", self.handle_notification, methods=["POST"]) return app async def handle_notification(self, request: Request): + """Handle incoming notification requests. + + Args: + request: The incoming HTTP request + + Returns: + JSONResponse with status or error + """ try: task_dict = await request.json() logger.info( @@ -49,10 +71,12 @@ async def handle_notification(self, request: Request): return JSONResponse({"error": str(e)}, status_code=500) def start(self): + """Start the notification listener server (blocking).""" logger.info(f"Starting listener on {self.host}:{self.port}") uvicorn.run(self.app, host=self.host, port=self.port) async def start_async(self): + """Start the notification listener server asynchronously.""" logger.info(f"Starting async listener on {self.host}:{self.port}") config = uvicorn.Config( self.app, host=self.host, port=self.port, log_level="info" @@ -62,6 +86,7 @@ async def start_async(self): def main(): + """Main entry point for running the notification listener.""" listener = NotificationListener() listener.start() diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index 4a515818b..831ce70ef 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -14,15 +14,36 @@ class _StreamResponseNamespace: - """Factory methods for streaming responses.""" + """Factory methods for streaming responses. + + Provides convenient methods to create StreamResponse instances for + different types of streaming events like message chunks, tool calls, etc. + """ def message_chunk(self, content: str) -> StreamResponse: + """Create a message chunk response. + + Args: + content: The message content chunk + + Returns: + StreamResponse with MESSAGE_CHUNK event + """ return StreamResponse( event=StreamResponseEvent.MESSAGE_CHUNK, content=content, ) def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse: + """Create a tool call started response. + + Args: + tool_call_id: Unique identifier for the tool call + tool_name: Name of the tool being called + + Returns: + StreamResponse with TOOL_CALL_STARTED event + """ return StreamResponse( event=StreamResponseEvent.TOOL_CALL_STARTED, metadata=ToolCallPayload( @@ -37,6 +58,16 @@ def tool_call_completed( tool_call_id: str, tool_name: str, ) -> StreamResponse: + """Create a tool call completed response. + + Args: + tool_result: The result of the tool execution + tool_call_id: Unique identifier for the tool call + tool_name: Name of the tool that was called + + Returns: + StreamResponse with TOOL_CALL_COMPLETED event + """ return StreamResponse( event=StreamResponseEvent.TOOL_CALL_COMPLETED, metadata=ToolCallPayload( @@ -47,6 +78,20 @@ def tool_call_completed( ) def component_generator(self, content: str, component_type: str) -> StreamResponse: + """Create a component generator response. + + Args: + content: The component content + component_type: Type of the component being generated + + Returns: + StreamResponse with COMPONENT_GENERATOR event. + + Note: + This factory returns a `StreamResponse` (not a `NotifyResponse`) so + the same component generator payload can be streamed and handled by + the existing streaming pipeline. This is intentional. + """ return StreamResponse( event=CommonResponseEvent.COMPONENT_GENERATOR, content=content, @@ -54,12 +99,28 @@ def component_generator(self, content: str, component_type: str) -> StreamRespon ) def done(self, content: Optional[str] = None) -> StreamResponse: + """Create a task completed response. + + Args: + content: Optional completion message + + Returns: + StreamResponse with TASK_COMPLETED event + """ return StreamResponse( content=content, event=TaskStatusEvent.TASK_COMPLETED, ) def failed(self, content: Optional[str] = None) -> StreamResponse: + """Create a task failed response. + + Args: + content: Optional error message + + Returns: + StreamResponse with TASK_FAILED event + """ return StreamResponse( content=content, event=TaskStatusEvent.TASK_FAILED, @@ -70,15 +131,36 @@ def failed(self, content: Optional[str] = None) -> StreamResponse: class _NotifyResponseNamespace: - """Factory methods for notify responses.""" + """Factory methods for notify responses. + + Provides convenient methods to create NotifyResponse instances for + different types of notification events. + """ def message(self, content: str) -> NotifyResponse: + """Create a notification message response. + + Args: + content: The notification content + + Returns: + NotifyResponse with MESSAGE event + """ return NotifyResponse( content=content, event=NotifyResponseEvent.MESSAGE, ) def component_generator(self, content: str, component_type: str) -> StreamResponse: + """Create a component generator response for notifications. + + Args: + content: The component content + component_type: Type of the component being generated + + Returns: + StreamResponse with COMPONENT_GENERATOR event + """ return StreamResponse( event=CommonResponseEvent.COMPONENT_GENERATOR, content=content, @@ -86,12 +168,28 @@ def component_generator(self, content: str, component_type: str) -> StreamRespon ) def done(self, content: Optional[str] = None) -> NotifyResponse: + """Create a task completed notification response. + + Args: + content: Optional completion message + + Returns: + NotifyResponse with TASK_COMPLETED event + """ return NotifyResponse( content=content, event=TaskStatusEvent.TASK_COMPLETED, ) def failed(self, content: Optional[str] = None) -> NotifyResponse: + """Create a task failed notification response. + + Args: + content: Optional error message + + Returns: + NotifyResponse with TASK_FAILED event + """ return NotifyResponse( content=content, event=TaskStatusEvent.TASK_FAILED, @@ -110,18 +208,42 @@ class EventPredicates: @staticmethod def is_task_completed(response_type) -> bool: + """Check if the response type indicates task completion. + + Args: + response_type: The response event type to check + + Returns: + True if the event indicates task completion + """ return response_type in { TaskStatusEvent.TASK_COMPLETED, } @staticmethod def is_task_failed(response_type) -> bool: + """Check if the response type indicates task failure. + + Args: + response_type: The response event type to check + + Returns: + True if the event indicates task failure + """ return response_type in { TaskStatusEvent.TASK_FAILED, } @staticmethod def is_tool_call(response_type) -> bool: + """Check if the response type indicates a tool call event. + + Args: + response_type: The response event type to check + + Returns: + True if the event is related to tool calls + """ return response_type in { StreamResponseEvent.TOOL_CALL_STARTED, StreamResponseEvent.TOOL_CALL_COMPLETED, @@ -129,6 +251,14 @@ def is_tool_call(response_type) -> bool: @staticmethod def is_reasoning(response_type) -> bool: + """Check if the response type indicates a reasoning event. + + Args: + response_type: The response event type to check + + Returns: + True if the event is related to reasoning + """ return response_type in { StreamResponseEvent.REASONING_STARTED, StreamResponseEvent.REASONING, @@ -137,6 +267,14 @@ def is_reasoning(response_type) -> bool: @staticmethod def is_message(response_type) -> bool: + """Check if the response type indicates a message event. + + Args: + response_type: The response event type to check + + Returns: + True if the event is a message-related event + """ return response_type in { StreamResponseEvent.MESSAGE_CHUNK, NotifyResponseEvent.MESSAGE, diff --git a/python/valuecell/core/conversation/conversation_store.py b/python/valuecell/core/conversation/conversation_store.py index 658e230ff..cc54aa22c 100644 --- a/python/valuecell/core/conversation/conversation_store.py +++ b/python/valuecell/core/conversation/conversation_store.py @@ -7,7 +7,9 @@ class ConversationStore(ABC): """Conversation storage abstract base class - handles conversation metadata only. - Items are stored separately using ItemStore implementations. + Implementations should provide async methods to save, load, delete and + list conversation metadata. Conversation items themselves are managed + separately by ItemStore implementations. """ @abstractmethod @@ -34,7 +36,10 @@ async def conversation_exists(self, conversation_id: str) -> bool: class InMemoryConversationStore(ConversationStore): - """In-memory conversation storage implementation""" + """In-memory ConversationStore implementation used for testing and simple scenarios. + + Stores conversations in a dict keyed by conversation_id. + """ def __init__(self): self._conversations: Dict[str, Conversation] = {} diff --git a/python/valuecell/core/conversation/item_store.py b/python/valuecell/core/conversation/item_store.py index da68d4fcd..e5cc3db62 100644 --- a/python/valuecell/core/conversation/item_store.py +++ b/python/valuecell/core/conversation/item_store.py @@ -10,6 +10,12 @@ class ItemStore(ABC): + """Abstract storage interface for conversation items. + + Implementations must provide async methods for saving and querying + ConversationItem instances. + """ + @abstractmethod async def save_item(self, item: ConversationItem) -> None: ... @@ -39,6 +45,11 @@ async def delete_conversation_items(self, conversation_id: str) -> None: ... class InMemoryItemStore(ItemStore): + """In-memory store for conversation items. + + Useful for tests and lightweight usage where persistence is not required. + """ + def __init__(self): # conversation_id -> list[ConversationItem] self._items: Dict[str, List[ConversationItem]] = {} @@ -83,7 +94,12 @@ async def delete_conversation_items(self, conversation_id: str) -> None: class SQLiteItemStore(ItemStore): - """SQLite-backed item store using aiosqlite for true async I/O.""" + """SQLite-backed item store using aiosqlite for true async I/O. + + Lazily initializes the database schema on first use. Uses aiosqlite to + perform non-blocking DB operations and converts rows to ConversationItem + instances. + """ def __init__(self, db_path: str): self.db_path = db_path diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py index 009922237..d1b0eac51 100644 --- a/python/valuecell/core/conversation/manager.py +++ b/python/valuecell/core/conversation/manager.py @@ -15,7 +15,12 @@ class ConversationManager: - """Conversation manager - handles both conversation metadata and items through separate stores""" + """High-level manager coordinating conversation metadata and items. + + Conversation metadata is delegated to a ConversationStore while message + items are delegated to an ItemStore. This class exposes convenience + methods for creating conversations, adding items, and querying state. + """ def __init__( self, diff --git a/python/valuecell/core/conversation/models.py b/python/valuecell/core/conversation/models.py index 98f4721a7..33ff1dcce 100644 --- a/python/valuecell/core/conversation/models.py +++ b/python/valuecell/core/conversation/models.py @@ -6,7 +6,7 @@ class ConversationStatus(str, Enum): - """Conversation status enumeration""" + """Conversation status enumeration for tracking lifecycle state.""" ACTIVE = "active" INACTIVE = "inactive" @@ -14,7 +14,11 @@ class ConversationStatus(str, Enum): class Conversation(BaseModel): - """Conversation data model - lightweight metadata only, items stored separately""" + """Conversation data model - lightweight metadata only, items stored separately. + + Conversation objects hold metadata about a conversation; message items + are stored in a separate ItemStore implementation. + """ conversation_id: str = Field(..., description="Unique conversation identifier") user_id: str = Field(..., description="User ID") diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 8b0726cf5..06eaf980b 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -33,7 +33,13 @@ class ExecutionContext: - """Manages the state of an interrupted execution for resumption""" + """Manage the state of an interrupted execution for later resumption. + + ExecutionContext stores lightweight metadata about an in-flight plan or + task execution that has been paused waiting for user input. The context + records the stage (e.g. "planning"), the conversation/thread identifiers, + the original requesting user, and a timestamp used for expiration. + """ def __init__(self, stage: str, conversation_id: str, thread_id: str, user_id: str): self.stage = stage @@ -46,7 +52,7 @@ def __init__(self, stage: str, conversation_id: str, thread_id: str, user_id: st def is_expired( self, max_age_seconds: int = DEFAULT_CONTEXT_TIMEOUT_SECONDS ) -> bool: - """Check if this context has expired""" + """Return True when the context is older than the configured TTL.""" current_time = asyncio.get_event_loop().time() return current_time - self.created_at > max_age_seconds @@ -55,7 +61,7 @@ def validate_user(self, user_id: str) -> bool: return self.user_id == user_id def add_metadata(self, **kwargs): - """Add metadata to the context""" + """Attach arbitrary key/value metadata to this execution context.""" self.metadata.update(kwargs) def get_metadata(self, key: str, default=None): @@ -64,13 +70,18 @@ def get_metadata(self, key: str, default=None): class UserInputManager: - """Manages pending user input requests and their lifecycle""" + """Manage pending Human-in-the-Loop user input requests. + + This simple manager stores `UserInputRequest` objects keyed by + `conversation_id`. Callers can add requests, query for prompts and provide + responses which will wake any awaiting tasks. + """ def __init__(self): self._pending_requests: Dict[str, UserInputRequest] = {} def add_request(self, conversation_id: str, request: UserInputRequest): - """Add a pending user input request""" + """Register a pending user input request for a conversation.""" self._pending_requests[conversation_id] = request def has_pending_request(self, conversation_id: str) -> bool: @@ -78,12 +89,16 @@ def has_pending_request(self, conversation_id: str) -> bool: return conversation_id in self._pending_requests def get_request_prompt(self, conversation_id: str) -> Optional[str]: - """Get the prompt for a pending request""" + """Return the prompt text for a pending request, or None if none found.""" request = self._pending_requests.get(conversation_id) return request.prompt if request else None def provide_response(self, conversation_id: str, response: str) -> bool: - """Provide a response to a pending request""" + """Supply the user's response to a pending request and complete it. + + Returns True when the response was accepted and the pending request + removed; False when no pending request existed for the conversation. + """ if conversation_id not in self._pending_requests: return False @@ -134,18 +149,22 @@ async def process_user_input( self, user_input: UserInput ) -> AsyncGenerator[BaseResponse, None]: """ - Main entry point for processing user requests with Human-in-the-Loop support. + Main entry point for processing user input with optional + Human-in-the-Loop interactions. - Handles three types of scenarios: - 1. New user requests - starts planning and execution - 2. Continuation of interrupted conversations - resumes from saved state - 3. User input responses - provides input to waiting requests + The orchestrator yields streaming `BaseResponse` objects that callers + (for example, an HTTP SSE endpoint or WebSocket) can forward to the + client. This method handles: + - Starting new plans when no execution context exists + - Resuming paused executions when conversation state requires input + - Directly providing responses to existing pending prompts Args: - user_input: The user's input containing query and metadata + user_input: The user's input, including conversation metadata. Yields: - MessageChunk: Streaming response chunks from agents + BaseResponse instances representing streaming chunks, status, + or terminal messages for the request. """ conversation_id = user_input.meta.conversation_id user_id = user_input.meta.user_id @@ -188,12 +207,16 @@ async def process_user_input( yield self._response_factory.done(conversation_id) async def provide_user_input(self, conversation_id: str, response: str): - """ - Provide user input response for a specific conversation. + """Submit a user's response to a pending input request. + + When a planner has requested clarification (Human-in-the-Loop), the + orchestrator stores a `UserInputRequest`. Calling this method provides + the response, updates the conversation state to active, and wakes any + awaiting planner logic. Args: - conversation_id: The conversation ID waiting for input - response: The user's response to the input request + conversation_id: Conversation where a pending input request exists. + response: The textual response supplied by the user. """ if self.user_input_manager.provide_response(conversation_id, response): # Update conversation status to active @@ -205,15 +228,24 @@ async def provide_user_input(self, conversation_id: str, response: str): await self.conversation_manager.update_conversation(conversation) def has_pending_user_input(self, conversation_id: str) -> bool: - """Check if a conversation has pending user input request""" + """Return True if the conversation currently awaits user input.""" return self.user_input_manager.has_pending_request(conversation_id) def get_user_input_prompt(self, conversation_id: str) -> Optional[str]: - """Get the user input prompt for a specific conversation""" + """Return the prompt text for a pending user-input request, or None. + + This is useful for displaying the outstanding prompt to the user or + embedding it into UI flows. + """ return self.user_input_manager.get_request_prompt(conversation_id) async def close_conversation(self, conversation_id: str): - """Close an existing conversation and clean up resources""" + """Close a conversation and clean up resources. + + This cancels any running tasks for the conversation, clears execution + contexts and pending user-input requests, and resets conversation + status to active when appropriate. + """ # Cancel any running tasks for this conversation await self.task_manager.cancel_conversation_tasks(conversation_id) @@ -226,21 +258,41 @@ async def get_conversation_history( event: Optional[ConversationItemEvent] = None, component_type: Optional[str] = None, ) -> list[BaseResponse]: - """Get conversation message history""" + """Return the persisted conversation history as a list of responses. + + Args: + conversation_id: The conversation to retrieve history for. + event: Optional filter to include only items with this event type. + component_type: Optional filter to include only items with this component type. + + Returns: + A list of `BaseResponse` instances reconstructed from persisted + ConversationItems. + """ items = await self.conversation_manager.get_conversation_items( conversation_id, event=event, component_type=component_type ) return [self._response_factory.from_conversation_item(it) for it in items] async def cleanup(self): - """Cleanup resources and expired contexts""" + """Perform graceful cleanup of orchestrator-managed resources. + + This will remove expired execution contexts and stop all remote agent + connections/listeners managed by the orchestrator. + """ await self._cleanup_expired_contexts() await self.agent_connections.stop_all() # ==================== Private Helper Methods ==================== async def _handle_user_input_request(self, request: UserInputRequest): - """Handle user input request from planner""" + """Register an incoming `UserInputRequest` produced by the planner. + + The planner may emit UserInputRequest objects when it requires + clarification. This helper extracts the `conversation_id` from the + request and registers it with the `UserInputManager` so callers can + later provide the response. + """ # Extract conversation_id from request context conversation_id = getattr(request, "conversation_id", None) if conversation_id: @@ -249,7 +301,15 @@ async def _handle_user_input_request(self, request: UserInputRequest): async def _handle_conversation_continuation( self, user_input: UserInput ) -> AsyncGenerator[BaseResponse, None]: - """Handle continuation of an interrupted conversation""" + """Resume an interrupted execution after the user provided requested input. + + This method validates the existing `ExecutionContext`, records the new + thread id for this resumed interaction, and either continues planning + or indicates that resuming execution is not supported for other stages. + + It yields the generated streaming responses (thread start and subsequent + planner/execution messages) back to the caller. + """ conversation_id = user_input.meta.conversation_id user_id = user_input.meta.user_id @@ -303,7 +363,11 @@ async def _handle_conversation_continuation( async def _handle_new_request( self, user_input: UserInput ) -> AsyncGenerator[BaseResponse, None]: - """Handle a new user request""" + """Start planning and execution for a new user request. + + This creates a planner task (executed asynchronously) and yields + streaming responses produced during planning and subsequent execution. + """ conversation_id = user_input.meta.conversation_id thread_id = generate_thread_id() response = self._response_factory.thread_started( @@ -328,7 +392,13 @@ async def _handle_new_request( yield response def _create_context_aware_callback(self, conversation_id: str): - """Create a callback that adds conversation context to user input requests""" + """Return an async callback that tags UserInputRequest objects with the + conversation_id and forwards them to the orchestrator's handler. + + The planner receives this callback and can call it whenever it needs + to request additional information from the end-user; the callback + ensures the request is associated with the correct conversation. + """ async def context_aware_handle(request): request.conversation_id = conversation_id @@ -343,7 +413,14 @@ async def _monitor_planning_task( user_input: UserInput, callback, ) -> AsyncGenerator[BaseResponse, None]: - """Monitor planning task and handle user input interruptions""" + """Monitor an in-progress planning task and handle interruptions. + + While the planner is running this loop watches for pending user input + requests. If the planner pauses for clarification, the method records + the planning context and yields a `plan_require_user_input` response + to the caller. When planning completes, the produced `ExecutionPlan` + is executed. + """ conversation_id = user_input.meta.conversation_id user_id = user_input.meta.user_id @@ -389,7 +466,11 @@ async def _request_user_input(self, conversation_id: str): def _validate_execution_context( self, context: ExecutionContext, user_id: str ) -> bool: - """Validate execution context integrity""" + """Return True if the execution context appears intact and valid. + + Checks include presence of a stage, matching user id and TTL-based + expiration. + """ if not hasattr(context, "stage") or not context.stage: return False @@ -404,7 +485,13 @@ def _validate_execution_context( async def _continue_planning( self, conversation_id: str, thread_id: str, context: ExecutionContext ) -> AsyncGenerator[BaseResponse, None]: - """Resume planning stage execution""" + """Resume a previously-paused planning task and continue execution. + + If required pieces of the planning context are missing this method + fails the plan and cancels the execution. Otherwise it waits for the + planner to finish, handling repeated user-input prompts if needed, + and then proceeds to execute the resulting plan. + """ planning_task = context.get_metadata("planning_task") original_user_input = context.get_metadata("original_user_input") @@ -441,7 +528,13 @@ async def _continue_planning( yield response async def _cancel_execution(self, conversation_id: str): - """Cancel execution and clean up all related resources""" + """Cancel and clean up any execution resources associated with a + conversation. + + This cancels the planner task (if present), removes the execution + context and clears any pending user input. It also resets the + conversation's status back to active. + """ # Clean up execution context if conversation_id in self._execution_contexts: context = self._execution_contexts[conversation_id] @@ -465,7 +558,11 @@ async def _cancel_execution(self, conversation_id: str): async def _cleanup_expired_contexts( self, max_age_seconds: int = DEFAULT_CONTEXT_TIMEOUT_SECONDS ): - """Clean up execution contexts that have been idle for too long""" + """Sweep and remove execution contexts older than `max_age_seconds`. + + For each expired context the method cancels execution and logs a + warning so the operator can investigate frequent expirations. + """ expired_conversations = [ conversation_id for conversation_id, context in self._execution_contexts.items() @@ -490,8 +587,11 @@ async def _execute_plan_with_input_support( during task execution. Args: - plan: The execution plan containing tasks to execute - metadata: Execution metadata containing conversation and user info + plan: The execution plan containing tasks to execute. + metadata: Optional execution metadata containing conversation and user info. + + Yields: + Streaming `BaseResponse` objects produced by each task execution. """ conversation_id = plan.conversation_id @@ -632,6 +732,7 @@ async def _persist_from_buffer(self, response: BaseResponse): await self._persist_items(items) async def _persist_items(self, items: list[SaveItem]): + """Persist a list of SaveItems to the conversation manager.""" for it in items: await self.conversation_manager.add_item( role=it.role, diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index 2d6a0f8ce..85c7d0144 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -1,3 +1,15 @@ +"""Planner: create execution plans from user input. + +This module implements the ExecutionPlanner which uses an LLM-based +planning agent to convert a user request into a structured +`ExecutionPlan` consisting of `Task` objects. The planner supports +Human-in-the-Loop flows by emitting `UserInputRequest` objects (backed by +an asyncio.Event) when the planner requires clarification. + +The planner is intentionally thin: it delegates reasoning to an AI agent +and performs JSON parsing/validation of the planner's output. +""" + import asyncio import logging import os @@ -31,17 +43,31 @@ class UserInputRequest: """ def __init__(self, prompt: str): + """Create a new request object for planner-driven user input. + + Args: + prompt: Human-readable prompt describing the information needed. + """ self.prompt = prompt self.response: Optional[str] = None self.event = asyncio.Event() async def wait_for_response(self) -> str: - """Wait for user response asynchronously""" + """Block until a response is provided and return it. + + This is an awaitable helper designed to be used by planner code that + wants to pause execution until the external caller supplies the + requested value via `provide_response`. + """ await self.event.wait() return self.response def provide_response(self, response: str): - """Provide the user's response and signal completion""" + """Supply the user's response and wake any waiter. + + Args: + response: The text provided by the user to satisfy the prompt. + """ self.response = response self.event.set() @@ -67,11 +93,19 @@ async def create_plan( """ Create an execution plan from user input. + This method orchestrates the planning agent run and returns a + validated `ExecutionPlan` instance. The optional `user_input_callback` + is called whenever the planner requests clarification; the callback + should accept a `UserInputRequest` and arrange for the user's answer to + be provided (typically by calling `UserInputRequest.provide_response`). + Args: - user_input: The user's request to be planned + user_input: The user's request to be planned. + user_input_callback: Optional async callback invoked with + `UserInputRequest` instances when clarification is required. Returns: - ExecutionPlan: A structured plan with tasks for execution + ExecutionPlan: A structured plan with tasks for execution. """ plan = ExecutionPlan( plan_id=generate_uuid("plan"), @@ -93,10 +127,19 @@ async def _analyze_input_and_create_tasks( self, user_input: UserInput, user_input_callback: Optional[Callable] = None ) -> List[Task]: """ - Analyze user input and create tasks for appropriate agents. + Analyze user input and produce a list of `Task` objects. + + The planner delegates reasoning to an LLM agent which must output a + JSON document conforming to `PlannerResponse`. If the planner pauses to + request user input, the provided `user_input_callback` will be + invoked for each requested field. - This method uses an AI agent to understand the user's request and determine - what agents should be involved and what tasks they should perform. + Args: + user_input: The original user input to analyze. + user_input_callback: Optional async callback used for Human-in-the-Loop. + + Returns: + A list of `Task` objects derived from the planner response. """ # Create planning agent with appropriate tools and instructions agent = Agent( @@ -198,7 +241,7 @@ def _create_task( pattern: Execution pattern (once or recurring) Returns: - Task: Configured task ready for execution + Task: Configured task ready for execution. """ return Task( task_id=generate_uuid("task"), @@ -221,7 +264,7 @@ def tool_get_agent_description(self, agent_name: str) -> str: agent_name: The name of the agent whose capabilities are to be retrieved Returns: - str: A description of the agent's capabilities and supported operations + str: A description of the agent's capabilities and supported operations. """ if card := self.agent_connections.get_agent_card(agent_name): if isinstance(card, AgentCard): @@ -234,14 +277,13 @@ def tool_get_agent_description(self, agent_name: str) -> str: def agentcard_to_prompt(card: AgentCard): - """ - Convert AgentCard JSON structure to LLM-friendly prompt string. + """Convert an AgentCard to an LLM-friendly prompt string. Args: - agentcard (AgentCard): The agentcard JSON structure + card: The AgentCard object or JSON structure describing an agent. Returns: - str: Formatted prompt string for LLM processing + A formatted string suitable for inclusion in the planner's instructions. """ # Start with basic agent information diff --git a/python/valuecell/core/coordinate/planner_prompts.py b/python/valuecell/core/coordinate/planner_prompts.py index 5b53edcc7..ac1968884 100644 --- a/python/valuecell/core/coordinate/planner_prompts.py +++ b/python/valuecell/core/coordinate/planner_prompts.py @@ -1,16 +1,38 @@ +"""Planner prompt helpers and constants. + +This module provides utilities for constructing the planner's instruction +prompt, including injecting the current date/time into prompts. The +large `PLANNER_INSTRUCTIONS` constant contains the guidance used by the +ExecutionPlanner when calling the LLM-based planning agent. +""" + from datetime import datetime from textwrap import dedent def create_prompt_with_datetime(base_prompt: str) -> str: + """ + Inject the current date/time into a base prompt string. + + The planner benefits from a stable timestamp in its instructions so it can + make time-sensitive decisions (for example, when interpreting requests + mentioning "today", "this week", etc.). This helper formats the current + local date/time and appends it to the provided prompt text. + + Args: + base_prompt: The base instructions that should receive the timestamp. + + Returns: + A dedented, multi-line prompt string that includes the current date/time. + """ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return dedent( f""" - {base_prompt} + {base_prompt} - **Other Important Context** - - Current date and time: {now} - """ + **Other Important Context** + - Current date and time: {now} + """ ) @@ -22,11 +44,11 @@ def create_prompt_with_datetime(base_prompt: str) -> str: 1. **Understand capabilities**: Call `get_agent_card` with the target agent name 2. **Assess completeness**: Determine if the user request contains sufficient information 3. **Clarify if needed**: Call `get_user_input` only when essential information is missing - - Don't ask user for information that can be inferred or researched (e.g., current date, time ranges, stock symbols, ciks) - - Don't ask for non-essential details or information already provided - - Proceed directly if the request is reasonably complete - - Make your best guess before asking for clarification - - If response is still ambiguous after clarification, make your best guess and proceed + - Don't ask user for information that can be inferred or researched (e.g., current date, time ranges, stock symbols, ciks) + - Don't ask for non-essential details or information already provided + - Proceed directly if the request is reasonably complete + - Make your best guess before asking for clarification + - If response is still ambiguous after clarification, make your best guess and proceed 4. **Generate plan**: Create a structured execution plan with clear, actionable tasks ## Task Creation Guidelines @@ -40,8 +62,8 @@ def create_prompt_with_datetime(base_prompt: str) -> str: ### Task Patterns - **ONCE**: Single execution with immediate results (default) - **RECURRING**: Periodic execution for ongoing monitoring/updates - - Use only when user explicitly requests regular updates - - Always confirm intent before creating recurring tasks: "Do you want regular updates on this?" + - Use only when user explicitly requests regular updates + - Always confirm intent before creating recurring tasks: "Do you want regular updates on this?" ## Response Requirements diff --git a/python/valuecell/core/coordinate/response.py b/python/valuecell/core/coordinate/response.py index 2ad39a86c..a7191eb7c 100644 --- a/python/valuecell/core/coordinate/response.py +++ b/python/valuecell/core/coordinate/response.py @@ -34,9 +34,19 @@ class ResponseFactory: def from_conversation_item(self, item: ConversationItem): """Reconstruct a BaseResponse from a persisted ConversationItem. - - Maps the stored event to the appropriate Response subtype - - Parses payload JSON back into the right payload model when possible - - Preserves the original item_id so callers can correlate history items + This method maps the stored event enum to the appropriate response + subtype, attempts to parse the stored payload JSON into the + corresponding payload model, and preserves the original `item_id` so + callers can correlate the reconstructed response with the persisted + conversation item. + + Args: + item: The persisted ConversationItem to convert. + + Returns: + An instance of a `BaseResponse` subtype (e.g., MessageResponse, + ReasoningResponse, ThreadStartedResponse) corresponding to the + stored event. """ # Coerce enums that may have been persisted as strings @@ -134,6 +144,14 @@ def make_data(payload=None): ) def conversation_started(self, conversation_id: str) -> ConversationStartedResponse: + """Build a `ConversationStartedResponse` for a given conversation id. + + Args: + conversation_id: The id of the conversation that started. + + Returns: + ConversationStartedResponse with system role and the conversation id. + """ return ConversationStartedResponse( data=UnifiedResponseData(conversation_id=conversation_id, role=Role.SYSTEM) ) @@ -141,6 +159,17 @@ def conversation_started(self, conversation_id: str) -> ConversationStartedRespo def thread_started( self, conversation_id: str, thread_id: str, user_query: str ) -> ThreadStartedResponse: + """Create a `ThreadStartedResponse` for a new conversational thread. + + Args: + conversation_id: Conversation the thread belongs to. + thread_id: Newly generated thread identifier. + user_query: The user's original query that started this thread. + + Returns: + ThreadStartedResponse populated with a synthetic ask task id and + the user's query as payload. + """ return ThreadStartedResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -152,6 +181,15 @@ def thread_started( ) def system_failed(self, conversation_id: str, content: str) -> SystemFailedResponse: + """Return a system-level failure response. + + Args: + conversation_id: Conversation where the failure occurred. + content: Human-readable failure message. + + Returns: + SystemFailedResponse with the provided content. + """ return SystemFailedResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -163,6 +201,15 @@ def system_failed(self, conversation_id: str, content: str) -> SystemFailedRespo def done( self, conversation_id: str, thread_id: Optional[str] = None ) -> DoneResponse: + """Return a terminal DoneResponse for the conversation/thread. + + Args: + conversation_id: The conversation id. + thread_id: Optional thread id this done message corresponds to. + + Returns: + A DoneResponse indicating the end of a response stream. + """ return DoneResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -174,6 +221,16 @@ def done( def plan_require_user_input( self, conversation_id: str, thread_id: str, content: str ) -> PlanRequireUserInputResponse: + """Build a PlanRequireUserInputResponse prompting the user for info. + + Args: + conversation_id: Conversation id awaiting user input. + thread_id: Thread id for the pending prompt. + content: Prompt text to present to the user. + + Returns: + PlanRequireUserInputResponse populated with the prompt. + """ return PlanRequireUserInputResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -186,6 +243,16 @@ def plan_require_user_input( def plan_failed( self, conversation_id: str, thread_id: str, content: str ) -> PlanFailedResponse: + """Return a PlanFailedResponse describing why planning failed. + + Args: + conversation_id: Conversation the failed plan belongs to. + thread_id: Thread id associated with the plan. + content: Human-readable reason for failure. + + Returns: + PlanFailedResponse with the provided reason. + """ return PlanFailedResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -202,6 +269,17 @@ def task_failed( task_id: str, content: str, ) -> TaskFailedResponse: + """Create a TaskFailedResponse for a failed task execution. + + Args: + conversation_id: Conversation the task belongs to. + thread_id: Thread id the task was running in. + task_id: Identifier of the failed task. + content: Failure message or error details. + + Returns: + TaskFailedResponse populated with failure details. + """ return TaskFailedResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -218,6 +296,16 @@ def task_started( thread_id: str, task_id: str, ) -> TaskStartedResponse: + """Return a TaskStartedResponse indicating a task has begun execution. + + Args: + conversation_id: Conversation id for the task. + thread_id: Thread id where the task runs. + task_id: The task identifier. + + Returns: + TaskStartedResponse with agent role. + """ return TaskStartedResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -233,6 +321,16 @@ def task_completed( thread_id: str, task_id: str, ) -> TaskCompletedResponse: + """Create a TaskCompletedResponse signalling successful completion. + + Args: + conversation_id: Conversation id for the task. + thread_id: Thread id where the task ran. + task_id: The completed task identifier. + + Returns: + TaskCompletedResponse with agent role. + """ return TaskCompletedResponse( data=UnifiedResponseData( conversation_id=conversation_id, @@ -255,6 +353,20 @@ def tool_call( tool_name: str, tool_result: Optional[str] = None, ) -> ToolCallResponse: + """Build a ToolCallResponse representing a tool invocation/result. + + Args: + conversation_id: Conversation id. + thread_id: Thread id. + task_id: Task id associated with the tool call. + event: The tool call event enum (started/completed). + tool_call_id: Identifier for this tool call. + tool_name: Name of the tool invoked. + tool_result: Optional textual result returned by the tool. + + Returns: + ToolCallResponse containing a ToolCallPayload. + """ return ToolCallResponse( event=event, data=UnifiedResponseData( @@ -279,6 +391,20 @@ def message_response_general( content: str, item_id: Optional[str] = None, ) -> MessageResponse: + """Create a generic message response used for both stream and notify. + + Args: + event: Either StreamResponseEvent.MESSAGE_CHUNK or + NotifyResponseEvent.MESSAGE. + conversation_id: Conversation id. + thread_id: Thread id. + task_id: Task id. + content: Textual content of the message. + item_id: Optional stable paragraph/item id; generated if omitted. + + Returns: + MessageResponse containing the provided content and meta. + """ return MessageResponse( event=event, data=UnifiedResponseData( @@ -305,6 +431,18 @@ def reasoning( ], content: Optional[str] = None, ) -> ReasoningResponse: + """Build a reasoning response used to convey model chain-of-thought. + + Args: + conversation_id: Conversation id. + thread_id: Thread id. + task_id: Task id. + event: One of the reasoning-related stream events. + content: Optional textual reasoning content. + + Returns: + ReasoningResponse with optional payload. + """ return ReasoningResponse( event=event, data=UnifiedResponseData( @@ -324,6 +462,18 @@ def component_generator( content: str, component_type: str, ) -> ComponentGeneratorResponse: + """Create a ComponentGeneratorResponse for UI component generation. + + Args: + conversation_id: Conversation id. + thread_id: Thread id. + task_id: Task id. + content: Serialized component content (e.g., markup or json). + component_type: Free-form type string for the generated component. + + Returns: + ComponentGeneratorResponse wrapping the payload. + """ return ComponentGeneratorResponse( data=UnifiedResponseData( conversation_id=conversation_id, diff --git a/python/valuecell/core/coordinate/response_buffer.py b/python/valuecell/core/coordinate/response_buffer.py index dcb1a8911..94cbc6ccf 100644 --- a/python/valuecell/core/coordinate/response_buffer.py +++ b/python/valuecell/core/coordinate/response_buffer.py @@ -32,6 +32,13 @@ class SaveItem: class BufferEntry: + """Represents an in-memory paragraph buffer for streamed chunks. + + A BufferEntry collects sequential message chunks belonging to the same + logical paragraph. It maintains a stable `item_id` so streamed chunks can + be correlated with the final persisted ConversationItem. + """ + def __init__(self, item_id: Optional[str] = None, role: Optional[Role] = None): self.parts: List[str] = [] self.last_updated: float = time.monotonic() @@ -42,12 +49,16 @@ def __init__(self, item_id: Optional[str] = None, role: Optional[Role] = None): self.role: Optional[Role] = role def append(self, text: str): + """Append a chunk of text to this buffer and update the timestamp.""" if text: self.parts.append(text) self.last_updated = time.monotonic() def snapshot_payload(self) -> Optional[BaseResponseDataPayload]: - """Return current aggregate content without clearing the buffer.""" + """Return the current aggregate content as a payload without clearing. + + Returns None when there is no content buffered. + """ if not self.parts: return None content = "".join(self.parts) @@ -55,14 +66,16 @@ def snapshot_payload(self) -> Optional[BaseResponseDataPayload]: class ResponseBuffer: - """Buffers streaming responses and emits SaveMessage at suitable boundaries. - - Simplified rules (no debounce, no size-based rotation): - - Immediate write-through: tool_call_completed, component_generator, message, plan_require_user_input - - Buffered: message_chunk, reasoning - - Maintain a stable paragraph item_id per (conversation, thread, task, event) - - On every chunk, update the aggregate and return a SaveItem for upsert - - Buffer key = (conversation_id, thread_id, task_id, event) + """Buffer streaming responses and produce persistence-ready SaveItem objects. + + The ResponseBuffer implements a simple buffering strategy: + - Some events are "immediate" and should be persisted as-is (tool results, + component generator events, notify messages, system-level events). + - Other events (message chunks, reasoning) are buffered and aggregated + into paragraph-level items which are upserted as streaming progress + is received. This preserves a stable paragraph `item_id` across chunks. + + The buffer key is a tuple (conversation_id, thread_id, task_id, event). """ def __init__(self): @@ -81,12 +94,12 @@ def __init__(self): } def annotate(self, resp: BaseResponse) -> BaseResponse: - """Ensure buffered events carry a stable paragraph item_id on the response. + """Stamp buffered responses with a stable paragraph `item_id`. - For buffered events (message_chunk, reasoning), we assign a stable - paragraph id per (conversation, thread, task, event) key and stamp it - into resp.data.item_id so the frontend can correlate chunks and the - final persisted SaveItem. Immediate and boundary events are left as-is. + For events that are buffered (e.g. message chunks, reasoning), assign a + stable paragraph `item_id` to resp.data.item_id so the frontend and + storage layer can correlate incremental chunks with the final saved + conversation item. """ data: UnifiedResponseData = resp.data ev = resp.event @@ -108,6 +121,16 @@ def annotate(self, resp: BaseResponse) -> BaseResponse: return resp def ingest(self, resp: BaseResponse) -> List[SaveItem]: + """Ingest a response and return a list of SaveItem objects to persist. + + Depending on the event type this will either: + - Flush and emit an immediate item (for immediate events), or + - Accumulate buffered chunks and emit an upsert SaveItem with the + current aggregated payload for the paragraph entry. + + Returns: + A list of SaveItem objects that should be persisted by the caller. + """ data: UnifiedResponseData = resp.data ev = resp.event diff --git a/python/valuecell/core/coordinate/response_router.py b/python/valuecell/core/coordinate/response_router.py index 8a2f01cda..1f50f6467 100644 --- a/python/valuecell/core/coordinate/response_router.py +++ b/python/valuecell/core/coordinate/response_router.py @@ -17,17 +17,39 @@ class SideEffectKind(Enum): + """Kinds of side-effects that routing logic can request. + + Side effects are actions that the orchestrator should take in response to + routed events (for example, failing a task when the agent reports an + unrecoverable error). + """ + FAIL_TASK = "fail_task" @dataclass class SideEffect: + """Represents a side-effect produced by event routing. + + Attributes: + kind: The kind of side effect to apply (see SideEffectKind). + reason: Optional human-readable reason for the side-effect. + """ + kind: SideEffectKind reason: Optional[str] = None @dataclass class RouteResult: + """Result of routing a single incoming event. + + Contains zero or more `BaseResponse` objects to emit to the orchestrator, + a `done` flag that signals task-level completion (stop processing), and an + optional list of `SideEffect` objects describing actions the orchestrator + should apply (for example, failing a task). + """ + responses: List[BaseResponse] done: bool = False side_effects: List[SideEffect] = None @@ -47,10 +69,12 @@ async def handle_status_update( state = event.status.state logger.info(f"Task {task.task_id} status update: {state}") + # No messaging for submitted/completed states by default if state in {TaskState.submitted, TaskState.completed}: return RouteResult(responses) if state == TaskState.failed: + # Produce a task_failed response and request the task be marked failed err_msg = get_message_text(event.status.message) responses.append( response_factory.task_failed( diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index e029cae9c..a5eeac865 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -35,6 +35,8 @@ class Config: class SystemResponseEvent(str, Enum): + """Events related to system-level responses and status updates.""" + CONVERSATION_STARTED = "conversation_started" THREAD_STARTED = "thread_started" PLAN_REQUIRE_USER_INPUT = "plan_require_user_input" @@ -44,6 +46,8 @@ class SystemResponseEvent(str, Enum): class TaskStatusEvent(str, Enum): + """Events related to task lifecycle status changes.""" + TASK_STARTED = "task_started" TASK_COMPLETED = "task_completed" TASK_FAILED = "task_failed" @@ -51,10 +55,14 @@ class TaskStatusEvent(str, Enum): class CommonResponseEvent(str, Enum): + """Common response events shared across different response types.""" + COMPONENT_GENERATOR = "component_generator" class StreamResponseEvent(str, Enum): + """Events specific to streaming agent responses.""" + MESSAGE_CHUNK = "message_chunk" TOOL_CALL_STARTED = "tool_call_started" TOOL_CALL_COMPLETED = "tool_call_completed" @@ -64,11 +72,18 @@ class StreamResponseEvent(str, Enum): class NotifyResponseEvent(str, Enum): + """Events specific to notification agent responses.""" + MESSAGE = "message" class StreamResponse(BaseModel): - """Response model for streaming agent responses""" + """Response model for streaming agent responses. + + Used by agents that stream progress, tool calls, reasoning, or + component-generation updates. `event` determines how the response + should be interpreted. + """ content: Optional[str] = Field( None, @@ -98,6 +113,11 @@ class NotifyResponse(BaseModel): class ToolCallPayload(BaseModel): + """Payload describing a tool call made by an agent. + + Contains identifiers and optional result content produced by the tool. + """ + tool_call_id: str = Field(..., description="Unique ID for the tool call") tool_name: str = Field(..., description="Name of the tool being called") tool_result: Optional[str] = Field( @@ -107,10 +127,17 @@ class ToolCallPayload(BaseModel): class BaseResponseDataPayload(BaseModel, ABC): + """Base class for response data payloads.""" + content: Optional[str] = Field(None, description="The message content") class ComponentGeneratorResponseDataPayload(BaseResponseDataPayload): + """Payload for responses that generate UI components. + + `component_type` describes the kind of component produced. + """ + component_type: str = Field(..., description="The component type") @@ -131,7 +158,7 @@ class ComponentGeneratorResponseDataPayload(BaseResponseDataPayload): class Role(str, Enum): - """Message role enumeration""" + """Message role enumeration.""" USER = "user" AGENT = "agent" @@ -139,7 +166,11 @@ class Role(str, Enum): class ConversationItem(BaseModel): - """Message item structure for conversation history""" + """Message item structure for conversation history. + + Represents a single message/event within a conversation and stores + identifiers, role, event type and payload. + """ item_id: str = Field(..., description="Unique message identifier") role: Role = Field(..., description="Role of the message sender") @@ -174,7 +205,11 @@ class UnifiedResponseData(BaseModel): class BaseResponse(BaseModel, ABC): - """Top-level response envelope used for all events.""" + """Top-level response envelope used for all events. + + Subclasses narrow the `event` literal and `data` payload for specific + response kinds (message, task updates, system events, etc.). + """ event: ConversationItemEvent = Field( ..., description="The event type of the response" @@ -185,6 +220,8 @@ class BaseResponse(BaseModel, ABC): class ConversationStartedResponse(BaseResponse): + """Response indicating a conversation has started.""" + event: Literal[SystemResponseEvent.CONVERSATION_STARTED] = Field( SystemResponseEvent.CONVERSATION_STARTED, description="The event type of the response", @@ -192,6 +229,8 @@ class ConversationStartedResponse(BaseResponse): class ThreadStartedResponse(BaseResponse): + """Response indicating a thread has started.""" + event: Literal[SystemResponseEvent.THREAD_STARTED] = Field( SystemResponseEvent.THREAD_STARTED, description="The event type of the response", @@ -199,6 +238,8 @@ class ThreadStartedResponse(BaseResponse): class PlanRequireUserInputResponse(BaseResponse): + """Response indicating the execution plan requires user input.""" + event: Literal[SystemResponseEvent.PLAN_REQUIRE_USER_INPUT] = Field( SystemResponseEvent.PLAN_REQUIRE_USER_INPUT, description="The event type of the response", @@ -207,6 +248,8 @@ class PlanRequireUserInputResponse(BaseResponse): class MessageResponse(BaseResponse): + """Response containing a message payload (streamed or notified).""" + event: Literal[ StreamResponseEvent.MESSAGE_CHUNK, NotifyResponseEvent.MESSAGE, @@ -215,6 +258,8 @@ class MessageResponse(BaseResponse): class ComponentGeneratorResponse(BaseResponse): + """Response that carries component generation data for UI rendering.""" + event: Literal[CommonResponseEvent.COMPONENT_GENERATOR] = Field( CommonResponseEvent.COMPONENT_GENERATOR, description="The event type of the response", @@ -223,6 +268,8 @@ class ComponentGeneratorResponse(BaseResponse): class ToolCallResponse(BaseResponse): + """Response representing tool call lifecycle events.""" + event: Literal[ StreamResponseEvent.TOOL_CALL_STARTED, StreamResponseEvent.TOOL_CALL_COMPLETED ] = Field( @@ -233,6 +280,8 @@ class ToolCallResponse(BaseResponse): class ReasoningResponse(BaseResponse): + """Response containing intermediate reasoning events from the agent.""" + event: Literal[ StreamResponseEvent.REASONING_STARTED, StreamResponseEvent.REASONING, @@ -242,6 +291,8 @@ class ReasoningResponse(BaseResponse): class DoneResponse(BaseResponse): + """Response indicating a thread or conversation is done.""" + event: Literal[SystemResponseEvent.DONE] = Field( SystemResponseEvent.DONE, description="The event type of the response" ) @@ -249,6 +300,8 @@ class DoneResponse(BaseResponse): class PlanFailedResponse(BaseResponse): + """Response indicating a plan execution failure.""" + event: Literal[SystemResponseEvent.PLAN_FAILED] = Field( SystemResponseEvent.PLAN_FAILED, description="The event type of the response" ) @@ -256,6 +309,8 @@ class PlanFailedResponse(BaseResponse): class TaskStartedResponse(BaseResponse): + """Response indicating a task has been started.""" + event: Literal[TaskStatusEvent.TASK_STARTED] = Field( TaskStatusEvent.TASK_STARTED, description="The event type of the response" ) @@ -263,6 +318,8 @@ class TaskStartedResponse(BaseResponse): class TaskFailedResponse(BaseResponse): + """Response indicating a task has failed.""" + event: Literal[TaskStatusEvent.TASK_FAILED] = Field( TaskStatusEvent.TASK_FAILED, description="The event type of the response" ) @@ -270,6 +327,8 @@ class TaskFailedResponse(BaseResponse): class TaskCompletedResponse(BaseResponse): + """Response indicating a task has completed successfully.""" + event: Literal[TaskStatusEvent.TASK_COMPLETED] = Field( TaskStatusEvent.TASK_COMPLETED, description="The event type of the response" ) @@ -277,6 +336,8 @@ class TaskCompletedResponse(BaseResponse): class SystemFailedResponse(BaseResponse): + """Response indicating a system-level failure for the conversation.""" + event: Literal[SystemResponseEvent.SYSTEM_FAILED] = Field( SystemResponseEvent.SYSTEM_FAILED, description="The event type of the response" )