diff --git a/python/.env.example b/python/.env.example index 2972c2341..d7fba4fc3 100644 --- a/python/.env.example +++ b/python/.env.example @@ -34,6 +34,7 @@ AGENT_DEBUG_MODE=false SEC_EMAIL=your.name@example.com # Model IDs for OpenRouter +PLANNER_MODEL_ID=openai/gpt-4o-mini SEC_PARSER_MODEL_ID=openai/gpt-4o-mini SEC_ANALYSIS_MODEL_ID=deepseek/deepseek-chat-v3-0324 AI_HEDGE_FUND_PARSER_MODEL_ID=openai/gpt-4o-mini diff --git a/python/configs/agent_cards/hedge_fund_agent.json b/python/configs/agent_cards/hedge_fund_agent.json index 644a1599c..d3fcb8172 100644 --- a/python/configs/agent_cards/hedge_fund_agent.json +++ b/python/configs/agent_cards/hedge_fund_agent.json @@ -1,7 +1,7 @@ { "name": "AIHedgeFundAgent", "url": "http://localhost:10001/", - "description": "AI Hedge Fund Agent provides multi-strategy analysis and investment insights. It can act like various famous analysts and specialists (valuation, technical, sentiment) to analyze stocks, portfolios, and market trends. Use its skills to get tailored perspectives ranging from deep fundamental valuation to technical trading signals.", + "description": "AI Hedge Fund Agent provides multi-strategy analysis and investment insights. It can act like various famous analysts and specialists (valuation, technical, sentiment) to analyze stocks, portfolios, and market trends. **CAUTION**: Only stock symbols in {AAPL, GOOGL, MSFT, NVDA, TSLA} is/are supported and the agent will refuse to answer for other symbols. The input should contain one or more stock symbol(s) from this list. ", "skills": [ { "id": "aswath_damodaran_agent", diff --git a/python/third_party/ai-hedge-fund/adapter/__main__.py b/python/third_party/ai-hedge-fund/adapter/__main__.py index 820f8d2bc..cfbb72466 100644 --- a/python/third_party/ai-hedge-fund/adapter/__main__.py +++ b/python/third_party/ai-hedge-fund/adapter/__main__.py @@ -2,7 +2,7 @@ import logging import os from datetime import datetime -from typing import List +from typing import AsyncGenerator, List from agno.agent import Agent from agno.models.openrouter import OpenRouter @@ -10,7 +10,7 @@ from langchain_core.messages import HumanMessage from pydantic import BaseModel, Field, field_validator from valuecell.core.agent.decorator import create_wrapped_agent -from valuecell.core.types import BaseAgent +from valuecell.core import BaseAgent, StreamResponse, streaming from src.main import create_workflow from src.utils.analysts import ANALYST_ORDER @@ -69,7 +69,9 @@ def __init__(self): markdown=True, ) - async def stream(self, query, session_id, task_id): + async def stream( + self, query, session_id, task_id + ) -> AsyncGenerator[StreamResponse, None]: logger.info( f"Parsing query: {query}. Task ID: {task_id}, Session ID: {session_id}" ) @@ -123,15 +125,8 @@ async def stream(self, query, session_id, task_id): ): if not isinstance(chunk, str): continue - yield { - "content": chunk, - "is_task_complete": False, - } - - yield { - "content": "", - "is_task_complete": True, - } + yield streaming.message_chunk(chunk) + yield streaming.done() def run_hedge_fund_stream( diff --git a/python/valuecell/core/__init__.py b/python/valuecell/core/__init__.py index ca0a90f49..7efffec1f 100644 --- a/python/valuecell/core/__init__.py +++ b/python/valuecell/core/__init__.py @@ -30,6 +30,9 @@ RemoteAgentResponse, ) +from .agent.decorator import serve, create_wrapped_agent +from .agent.responses import streaming, notification + __all__ = [ # Session exports "Message", @@ -54,4 +57,10 @@ "BaseAgent", "StreamResponse", "RemoteAgentResponse", + # Agent utilities + "serve", + "create_wrapped_agent", + # Response utilities + "streaming", + "notification", ] diff --git a/python/valuecell/core/agent/__init__.py b/python/valuecell/core/agent/__init__.py index 0902cf057..2005b5024 100644 --- a/python/valuecell/core/agent/__init__.py +++ b/python/valuecell/core/agent/__init__.py @@ -6,17 +6,10 @@ from .decorator import serve from .registry import AgentRegistry -# Import types from the unified types module -from ..types import BaseAgent, RemoteAgentResponse, StreamResponse - - __all__ = [ # Core agent exports "AgentClient", "RemoteConnections", "serve", "AgentRegistry", - "BaseAgent", - "RemoteAgentResponse", - "StreamResponse", ] diff --git a/python/valuecell/core/agent/client.py b/python/valuecell/core/agent/client.py index 0852f3571..46687d5f2 100644 --- a/python/valuecell/core/agent/client.py +++ b/python/valuecell/core/agent/client.py @@ -50,7 +50,7 @@ async def _setup_client(self): async def send_message( self, query: str, - context_id: str = None, + session_id: str = None, metadata: dict = None, streaming: bool = False, ) -> AsyncIterator[RemoteAgentResponse]: @@ -65,7 +65,7 @@ async def send_message( role=Role.user, parts=[Part(root=TextPart(text=query))], message_id=generate_uuid("msg"), - context_id=context_id or generate_uuid("ctx"), + context_id=session_id or generate_uuid("ctx"), metadata=metadata if metadata else None, ) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 59fdb7454..c3eb91e56 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -27,7 +27,13 @@ from a2a.utils import new_agent_text_message, new_task from a2a.utils.errors import ServerError from valuecell.core.agent import registry -from valuecell.core.types import BaseAgent +from valuecell.core.types import ( + BaseAgent, + NotifyResponse, + NotifyResponseEvent, + StreamResponse, + StreamResponseEvent, +) from valuecell.utils import ( get_agent_card_path, get_next_available_port, @@ -153,55 +159,90 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non # Prepare query and ensure a task exists in the system query = context.get_user_input() task = context.current_task - metadata = context.metadata + task_meta = context.metadata + agent_name = self.agent.__class__.__name__ if not task: message = context.message task = new_task(message) - task.metadata = metadata + task.metadata = task_meta await event_queue.enqueue_event(task) # Helper state - updater = TaskUpdater(event_queue, task.id, task.context_id) - artifact_id = f"{self.agent.__class__.__name__}-artifact" - chunk_idx = 0 + task_id = task.id + session_id = task.context_id + updater = TaskUpdater(event_queue, task_id, session_id) + artifact_id = f"artifact-{agent_name}-{session_id}-{task_id}" + chunk_idx = -1 # Local helper to add a chunk - async def _add_chunk(content: str, last: bool = False): + async def _add_chunk( + response: StreamResponse | NotifyResponse, is_complete: bool + ): nonlocal chunk_idx - parts = [Part(root=TextPart(text=content))] + + chunk_idx += 1 + if not response.content: + return + + response_event = response.event + parts = [Part(root=TextPart(text=response.content))] + metadata = {"response_event": response_event.value} await updater.add_artifact( parts=parts, artifact_id=artifact_id, append=chunk_idx > 0, - last_chunk=last, + last_chunk=is_complete, + metadata=metadata, ) - if not last: - chunk_idx += 1 # Stream from the user agent and update task incrementally - await updater.update_status(TaskState.working) + await updater.update_status( + TaskState.working, + message=new_agent_text_message( + f"Task received by {agent_name}", session_id, task_id + ), + ) try: query_handler = ( - self.agent.notify if metadata.get("notify") else self.agent.stream + self.agent.notify if task_meta.get("notify") else self.agent.stream ) - async for item in query_handler(query, task.context_id, task.id): - content = item.get("content", "") - is_complete = item.get("is_task_complete", True) - - await _add_chunk(content, last=is_complete) - + async for response in query_handler(query, session_id, task_id): + if not isinstance(response, (StreamResponse, NotifyResponse)): + raise ValueError( + f"Agent {agent_name} yielded invalid response type: {type(response)}" + ) + + response_event = response.event + if is_task_failed(response_event): + raise RuntimeError( + f"Agent {agent_name} reported failure: {response.content}" + ) + + is_complete = is_task_completed(response_event) + if is_tool_call(response_event): + await updater.update_status( + TaskState.working, + message=message, + metadata={ + "event": response_event.value, + "tool_call_id": response.metadata.get("tool_call_id"), + "tool_name": response.metadata.get("tool_name"), + "tool_result": response.metadata.get("content"), + }, + ) + continue + + await _add_chunk(response, is_complete=is_complete) if is_complete: await updater.complete() break except Exception as e: - message = ( - f"Error during {self.agent.__class__.__name__} agent execution: {e}" - ) + message = f"Error during {agent_name} agent execution: {e}" logger.error(message) await updater.update_status( TaskState.failed, - message=new_agent_text_message(message, task.context_id, task.id), + message=new_agent_text_message(message, session_id, task_id), final=True, ) @@ -210,6 +251,29 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None raise ServerError(error=UnsupportedOperationError()) +def is_task_completed(response_type: str) -> bool: + return response_type in { + StreamResponseEvent.TASK_DONE, + StreamResponseEvent.TASK_FAILED, + NotifyResponseEvent.TASK_DONE, + NotifyResponseEvent.TASK_FAILED, + } + + +def is_task_failed(response_type: str) -> bool: + return response_type in { + StreamResponseEvent.TASK_FAILED, + NotifyResponseEvent.TASK_FAILED, + } + + +def is_tool_call(response_type: str) -> bool: + return response_type in { + StreamResponseEvent.TOOL_CALL_STARTED, + StreamResponseEvent.TOOL_CALL_COMPLETED, + } + + def _create_agent_executor(agent_instance): return GenericAgentExecutor(agent_instance) diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py new file mode 100644 index 000000000..64f768546 --- /dev/null +++ b/python/valuecell/core/agent/responses.py @@ -0,0 +1,105 @@ +"""User-facing response constructors under valuecell.core.agent. + +Prefer importing from here if you're already working inside the core.agent +namespace. For a stable top-level import, you can also use +`valuecell.responses` which provides the same API. + +Example: + from valuecell.core.agent.responses import stream, notify + # Or explicit aliases for clarity: + from valuecell.core.agent.responses import streaming, notification + + yield stream.message_chunk("Thinking…") + yield stream.reasoning("Plan: 1) fetch 2) analyze") + yield stream.tool_call_start("call_1", "search") + yield stream.tool_call_result('{"items": 12}', "call_1", "search") + yield stream.done() + + send(notify.message("Task submitted")) + send(notify.done("OK")) +""" + +from __future__ import annotations + +from typing import Optional + +from valuecell.core.types import ( + NotifyResponse, + NotifyResponseEvent, + StreamResponse, + StreamResponseEvent, + ToolCallContent, +) + + +class _StreamResponseNamespace: + """Factory methods for streaming responses.""" + + def message_chunk(self, content: str) -> StreamResponse: + return StreamResponse(event=StreamResponseEvent.MESSAGE_CHUNK, content=content) + + def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.TOOL_CALL_STARTED, + metadata=ToolCallContent( + tool_call_id=tool_call_id, tool_name=tool_name + ).model_dump(), + ) + + def tool_call_completed( + self, tool_result: str, tool_call_id: str, tool_name: str + ) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + metadata=ToolCallContent( + tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result + ).model_dump(), + ) + + def reasoning(self, content: str) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.REASONING, + content=content, + ) + + def done(self, content: Optional[str] = None) -> StreamResponse: + return StreamResponse( + content=content, + event=StreamResponseEvent.TASK_DONE, + ) + + def failed(self, content: Optional[str] = None) -> StreamResponse: + return StreamResponse( + content=content, + event=StreamResponseEvent.TASK_FAILED, + ) + + +streaming = _StreamResponseNamespace() + + +class _NotifyResponseNamespace: + """Factory methods for notify responses.""" + + def message(self, content: str) -> NotifyResponse: + return NotifyResponse( + content=content, + event=NotifyResponseEvent.MESSAGE, + ) + + def done(self, content: Optional[str] = None) -> NotifyResponse: + return NotifyResponse( + content=content, + event=NotifyResponseEvent.TASK_DONE, + ) + + +notification = _NotifyResponseNamespace() + + +__all__ = [ + "streaming", + "notification", + "StreamResponse", + "NotifyResponse", +] diff --git a/python/valuecell/core/coordinate/models.py b/python/valuecell/core/coordinate/models.py index 3bd9f8749..ff3732ef5 100644 --- a/python/valuecell/core/coordinate/models.py +++ b/python/valuecell/core/coordinate/models.py @@ -18,7 +18,9 @@ class ExecutionPlan(BaseModel): ..., description="Session ID this plan belongs to" ) user_id: str = Field(..., description="User ID who requested this plan") - query: str = Field(..., description="Original user query that generated this plan") + orig_query: str = Field( + ..., description="Original user query that generated this plan" + ) tasks: List[Task] = Field(default_factory=list, description="Tasks to execute") created_at: str = Field(..., description="Plan creation timestamp") diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index bf326d6f1..a711a0e9c 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -6,16 +6,19 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from a2a.utils import get_message_text from valuecell.core.agent.connect import get_default_remote_connections +from valuecell.core.agent.decorator import is_tool_call, is_task_completed from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager from valuecell.core.task.models import TaskPattern from valuecell.core.types import ( - MessageChunk, - MessageChunkMetadata, - MessageChunkStatus, - MessageDataKind, + NotifyResponseEvent, + ProcessMessage, + ProcessMessageData, + StreamResponseEvent, + ToolCallContent, UserInput, ) +from valuecell.utils.uuid import generate_message_id from .callback import store_task_in_session from .models import ExecutionPlan @@ -121,7 +124,7 @@ def __init__(self): async def process_user_input( self, user_input: UserInput - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """ Main entry point for processing user requests with Human-in-the-Loop support. @@ -145,16 +148,16 @@ async def process_user_input( # Handle session continuation vs new request if session.status == SessionStatus.REQUIRE_USER_INPUT: - async for chunk in self._handle_session_continuation(user_input): - yield chunk + async for message in self._handle_session_continuation(user_input): + yield message else: - async for chunk in self._handle_new_request(user_input): - yield chunk + async for message in self._handle_new_request(user_input): + yield message except Exception as e: logger.exception(f"Error processing user input for session {session_id}") - yield self._create_error_message_chunk( - f"Error processing request: {str(e)}", session_id, user_id, "__system__" + yield self._create_error_message( + f"Error processing request: {str(e)}", session_id ) async def provide_user_input(self, session_id: str, response: str): @@ -234,18 +237,16 @@ async def _ensure_session_exists(self, session_id: str, user_id: str): async def _handle_session_continuation( self, user_input: UserInput - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Handle continuation of an interrupted session""" session_id = user_input.meta.session_id user_id = user_input.meta.user_id # Validate execution context exists if session_id not in self._execution_contexts: - yield self._create_error_message_chunk( + yield self._create_error_message( "No execution context found for this session. The session may have expired.", session_id, - user_id, - "__system__", ) return @@ -253,11 +254,9 @@ async def _handle_session_continuation( # Validate context integrity and user consistency if not self._validate_execution_context(context, user_id): - yield self._create_error_message_chunk( + yield self._create_error_message( "Invalid execution context or user mismatch.", session_id, - user_id, - "__system__", ) await self._cancel_execution(session_id) return @@ -273,16 +272,14 @@ async def _handle_session_continuation( yield chunk # TODO: Add support for resuming execution stage if needed else: - yield self._create_error_message_chunk( + yield self._create_error_message( "Resuming execution stage is not yet supported.", session_id, - user_id, - "__system__", ) async def _handle_new_request( self, user_input: UserInput - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Handle a new user request""" session_id = user_input.meta.session_id @@ -315,7 +312,7 @@ async def context_aware_handle(request): async def _monitor_planning_task( self, planning_task, user_input: UserInput, callback - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Monitor planning task and handle user input interruptions""" session_id = user_input.meta.session_id user_id = user_input.meta.user_id @@ -333,9 +330,9 @@ async def _monitor_planning_task( self._execution_contexts[session_id] = context # Update session status and send user input request - await self._request_user_input(session_id, user_id) - yield self._create_user_input_request_chunk( - self.get_user_input_prompt(session_id), session_id, context.user_id + await self._request_user_input(session_id) + yield self._create_user_input_request( + self.get_user_input_prompt(session_id), session_id ) return @@ -343,14 +340,11 @@ async def _monitor_planning_task( # Planning completed, execute plan plan = await planning_task - async for chunk in self._execute_plan_with_input_support( - plan, user_input.meta.model_dump() - ): + async for chunk in self._execute_plan_with_input_support(plan): yield chunk - async def _request_user_input(self, session_id: str, _user_id: str): + async def _request_user_input(self, session_id: str): """Set session to require user input and send the request""" - # Note: _user_id parameter kept for potential future use in user validation session = await self.session_manager.get_session(session_id) if session: session.require_user_input() @@ -371,73 +365,78 @@ def _validate_execution_context( return True - def _create_message_chunk( + def _create_message( self, content: str, - session_id: str, - user_id: str, - agent_name: str, - kind: MessageDataKind = MessageDataKind.TEXT, - is_final: bool = False, - status: MessageChunkStatus = MessageChunkStatus.partial, - ) -> MessageChunk: - """Create a MessageChunk with standardized metadata""" - return MessageChunk( - content=content, - kind=kind, - meta=MessageChunkMetadata( - session_id=session_id, - user_id=user_id, - agent_name=agent_name, - status=status, + conversation_id: str, + event: ( + StreamResponseEvent | NotifyResponseEvent + ) = StreamResponseEvent.MESSAGE_CHUNK, + message_id: Optional[str] = None, + ) -> ProcessMessage: + """Create a ProcessMessage for plain text content using the new schema.""" + return ProcessMessage( + event=event, + data=ProcessMessageData( + conversation_id=conversation_id, + message_id=message_id or generate_message_id(), + content=content, + ), + ) + + def _create_tool_message( + self, + event: StreamResponseEvent | NotifyResponseEvent, + conversation_id: str, + tool_call_id: str, + tool_name: str, + tool_result: Optional[str] = None, + ) -> ProcessMessage: + """Create a ProcessMessage for tool call events with ToolCallContent.""" + return ProcessMessage( + event=event, + data=ProcessMessageData( + conversation_id=conversation_id, + message_id=generate_message_id(), + content=ToolCallContent( + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_result=tool_result, + ), ), - is_final=is_final, ) - def _create_error_message_chunk( - self, error_msg: str, session_id: str, user_id: str, agent_name: str - ) -> MessageChunk: - """Create an error MessageChunk with standardized format""" - return self._create_message_chunk( + def _create_error_message(self, error_msg: str, session_id: str) -> ProcessMessage: + """Create an error ProcessMessage with standardized format (TASK_FAILED).""" + return self._create_message( content=f"(Error): {error_msg}", - session_id=session_id, - user_id=user_id, - agent_name=agent_name, - is_final=True, - status=MessageChunkStatus.failure, + conversation_id=session_id, + event=StreamResponseEvent.TASK_FAILED, ) - def _create_user_input_request_chunk( + def _create_user_input_request( self, prompt: str, session_id: str, - user_id: str, - agent_name: str = "__planner__", - ) -> MessageChunk: - """Create a user input request MessageChunk""" - return self._create_message_chunk( + ) -> ProcessMessage: + """Create a user input request ProcessMessage. The consumer should parse the prefix.""" + return self._create_message( content=f"USER_INPUT_REQUIRED:{prompt}", - session_id=session_id, - user_id=user_id, - agent_name=agent_name, - kind=MessageDataKind.COMMAND, - is_final=True, - status=MessageChunkStatus.partial, + conversation_id=session_id, + event=StreamResponseEvent.MESSAGE_CHUNK, ) async def _continue_planning( self, session_id: str, context: ExecutionContext - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Resume planning stage execution""" planning_task = context.get_metadata("planning_task") original_user_input = context.get_metadata("original_user_input") if not all([planning_task, original_user_input]): - yield self._create_error_message_chunk( + yield self._create_error_message( "Invalid planning context - missing required data", session_id, - context.user_id, - "__planner__", ) await self._cancel_execution(session_id) return @@ -448,10 +447,8 @@ async def _continue_planning( # Still need more user input, send request prompt = self.get_user_input_prompt(session_id) # Ensure session is set to require user input again for repeated prompts - await self._request_user_input(session_id, context.user_id) - yield self._create_user_input_request_chunk( - prompt, session_id, context.user_id - ) + await self._request_user_input(session_id) + yield self._create_user_input_request(prompt, session_id) return await asyncio.sleep(ASYNC_SLEEP_INTERVAL) @@ -460,10 +457,8 @@ async def _continue_planning( plan = await planning_task del self._execution_contexts[session_id] - async for chunk in self._execute_plan_with_input_support( - plan, original_user_input.meta.model_dump() - ): - yield chunk + async for message in self._execute_plan_with_input_support(plan): + yield message async def _cancel_execution(self, session_id: str): """Cancel execution and clean up all related resources""" @@ -506,8 +501,8 @@ async def _cleanup_expired_contexts( # ==================== Plan and Task Execution Methods ==================== async def _execute_plan_with_input_support( - self, plan: ExecutionPlan, metadata: dict - ) -> AsyncGenerator[MessageChunk, None]: + self, plan: ExecutionPlan, metadata: Optional[dict] = None + ) -> AsyncGenerator[ProcessMessage, None]: """ Execute an execution plan with Human-in-the-Loop support. @@ -518,11 +513,11 @@ async def _execute_plan_with_input_support( plan: The execution plan containing tasks to execute metadata: Execution metadata containing session and user info """ - session_id, user_id = metadata["session_id"], metadata["user_id"] + session_id = plan.session_id if not plan.tasks: - yield self._create_error_message_chunk( - "No tasks found for this request.", session_id, user_id, "__system__" + yield self._create_error_message( + "No tasks found for this request.", session_id ) return @@ -535,37 +530,42 @@ async def _execute_plan_with_input_support( await self.task_manager.store.save_task(task) # Execute task with input support - async for chunk in self._execute_task_with_input_support( - task, plan.query, metadata + async for message in self._execute_task_with_input_support( + task, metadata ): - # Accumulate agent responses - agent_name = chunk.meta.agent_name - agent_responses[agent_name] += chunk.content - yield chunk - - # Save complete responses to session - if chunk.is_final and agent_responses[agent_name].strip(): - await self.session_manager.add_message( - session_id, - Role.AGENT, - agent_responses[agent_name], - agent_name=agent_name, - ) - agent_responses[agent_name] = "" + # Accumulate based on event + if message.event in { + StreamResponseEvent.MESSAGE_CHUNK, + StreamResponseEvent.REASONING, + NotifyResponseEvent.MESSAGE, + } and isinstance(message.data.content, str): + agent_responses[task.agent_name] += message.data.content + yield message + + if ( + is_task_completed(message.event) + or task.pattern == TaskPattern.RECURRING + ): + if agent_responses[task.agent_name].strip(): + await self.session_manager.add_message( + session_id, + Role.AGENT, + agent_responses[task.agent_name], + agent_name=task.agent_name, + ) + agent_responses[task.agent_name] = "" except Exception as e: error_msg = f"Error executing {task.agent_name}: {str(e)}" logger.exception(f"Task execution failed: {error_msg}") - yield self._create_error_message_chunk( - error_msg, session_id, user_id, task.agent_name - ) + yield self._create_error_message(error_msg, session_id) # Save any remaining agent responses await self._save_remaining_responses(session_id, agent_responses) async def _execute_task_with_input_support( - self, task: Task, query: str, metadata: dict - ) -> AsyncGenerator[MessageChunk, None]: + self, task: Task, metadata: Optional[dict] = None + ) -> AsyncGenerator[ProcessMessage, None]: """ Execute a single task with user input interruption support. @@ -579,85 +579,80 @@ async def _execute_task_with_input_support( await self.task_manager.start_task(task.task_id) # Get agent connection + agent_name = task.agent_name agent_card = await self.agent_connections.start_agent( - task.agent_name, + agent_name, with_listener=False, notification_callback=store_task_in_session, ) - client = await self.agent_connections.get_client(task.agent_name) - + client = await self.agent_connections.get_client(agent_name) if not client: - raise RuntimeError(f"Could not connect to agent {task.agent_name}") + raise RuntimeError(f"Could not connect to agent {agent_name}") # Configure metadata for notifications + metadata = metadata or {} if task.pattern != TaskPattern.ONCE: metadata["notify"] = True # Send message to agent - response = await client.send_message( - query, - context_id=task.session_id, + remote_response = await client.send_message( + task.query, + session_id=task.session_id, metadata=metadata, streaming=agent_card.capabilities.streaming, ) # Process streaming responses - async for remote_task, event in response: + async for remote_task, event in remote_response: if event is None and remote_task.status.state == TaskState.submitted: task.remote_task_ids.append(remote_task.id) continue if isinstance(event, TaskStatusUpdateEvent): - await self._handle_task_status_update(event, task) - - # TODO: Check for user input requirement + state = event.status.state + logger.info(f"Task {task.task_id} status update: {state}") + if state in {TaskState.submitted, TaskState.completed}: + continue # Handle task failure - if event.status.state == TaskState.failed: + if state == TaskState.failed: err_msg = get_message_text(event.status.message) await self.task_manager.fail_task(task.task_id, err_msg) - yield self._create_error_message_chunk( - err_msg, task.session_id, task.user_id, task.agent_name - ) + yield self._create_error_message(err_msg, task.session_id) return + # if state == TaskState.input_required: + # Handle tool call start + if not event.metadata: + continue + response_event = event.metadata.get("response_event") + if state == TaskState.working and is_tool_call(response_event): + yield self._create_tool_message( + response_event, + task.session_id, + tool_call_id=event.metadata.get("tool_call_id", ""), + tool_name=event.metadata.get("tool_name", ""), + tool_result=event.metadata.get("tool_result"), + ) + continue elif isinstance(event, TaskArtifactUpdateEvent): - yield self._create_message_chunk( + yield self._create_message( get_message_text(event.artifact, ""), task.session_id, - task.user_id, - task.agent_name, - is_final=metadata.get("notify", False), + event=StreamResponseEvent.MESSAGE_CHUNK, ) # Complete task successfully await self.task_manager.complete_task(task.task_id) - yield self._create_message_chunk( + yield self._create_message( "", task.session_id, - task.user_id, - task.agent_name, - is_final=True, - status=MessageChunkStatus.success, + event=StreamResponseEvent.TASK_DONE, ) except Exception as e: await self.task_manager.fail_task(task.task_id, str(e)) raise e - async def _handle_task_status_update( - self, event: TaskStatusUpdateEvent, task: Task - ): - """Handle task status update events""" - logger.info(f"Task {task.task_id} status update: {event.status.state}") - - # Add any additional status-specific handling here - if event.status.state == TaskState.submitted: - # Task was submitted successfully - pass - elif event.status.state == TaskState.completed: - # Task completed successfully - pass - async def _save_remaining_responses(self, session_id: str, agent_responses: dict): """Save any remaining agent responses to the session""" for agent_name, full_response in agent_responses.items(): @@ -666,112 +661,6 @@ async def _save_remaining_responses(self, session_id: str, agent_responses: dict session_id, Role.AGENT, full_response, agent_name=agent_name ) - # ==================== Legacy Task Execution (No HIL Support) ==================== - - async def _execute_plan_legacy( - self, plan: ExecutionPlan, metadata: dict - ) -> AsyncGenerator[MessageChunk, None]: - """ - Execute an execution plan without Human-in-the-Loop support. - - This is a simplified version for backwards compatibility. - """ - session_id, user_id = metadata["session_id"], metadata["user_id"] - - if not plan.tasks: - yield self._create_error_message_chunk( - "No tasks found for this request.", session_id, user_id, "__system__" - ) - return - - # Execute tasks sequentially - for task in plan.tasks: - try: - await self.task_manager.store.save_task(task) - async for chunk in self._execute_task_legacy( - task, plan.query, metadata - ): - yield chunk - except Exception as e: - error_msg = f"Error executing {task.agent_name}: {str(e)}" - yield self._create_error_message_chunk( - error_msg, session_id, user_id, task.agent_name - ) - - async def _execute_task_legacy( - self, task: Task, query: str, metadata: dict - ) -> AsyncGenerator[MessageChunk, None]: - """ - Execute a single task without user input interruption support. - - This is a simplified version for backwards compatibility. - """ - try: - await self.task_manager.start_task(task.task_id) - - # Get agent connection - agent_card = await self.agent_connections.start_agent( - task.agent_name, - with_listener=False, - notification_callback=store_task_in_session, - ) - client = await self.agent_connections.get_client(task.agent_name) - - if not client: - raise RuntimeError(f"Could not connect to agent {task.agent_name}") - - if task.pattern != TaskPattern.ONCE: - metadata["notify"] = True - - response = await client.send_message( - query, - context_id=task.session_id, - metadata=metadata, - streaming=agent_card.capabilities.streaming, - ) - - # Process streaming responses - async for remote_task, event in response: - if event is None and remote_task.status.state == TaskState.submitted: - task.remote_task_ids.append(remote_task.id) - continue - - if isinstance(event, TaskStatusUpdateEvent): - logger.info(f"Task status update: {event.status.state}") - - if event.status.state == TaskState.failed: - err_msg = get_message_text(event.status.message) - await self.task_manager.fail_task(task.task_id, err_msg) - yield self._create_error_message_chunk( - err_msg, task.session_id, task.user_id, task.agent_name - ) - return - continue - - if isinstance(event, TaskArtifactUpdateEvent): - yield self._create_message_chunk( - get_message_text(event.artifact, ""), - task.session_id, - task.user_id, - task.agent_name, - is_final=metadata.get("notify", False), - ) - - # Complete task - await self.task_manager.complete_task(task.task_id) - yield self._create_message_chunk( - "", - task.session_id, - task.user_id, - task.agent_name, - is_final=True, - status=MessageChunkStatus.success, - ) - - except Exception as e: - await self.task_manager.fail_task(task.task_id, str(e)) - raise e - # ==================== Module-level Factory Function ==================== diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index 07b89b142..21cb98ffc 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -77,7 +77,7 @@ async def create_plan( plan_id=generate_uuid("plan"), session_id=user_input.meta.session_id, user_id=user_input.meta.user_id, - query=user_input.query, # Store the original query + orig_query=user_input.query, # Store the original query created_at=datetime.now().isoformat(), ) @@ -100,7 +100,10 @@ async def _analyze_input_and_create_tasks( """ # Create planning agent with appropriate tools and instructions agent = Agent( - model=OpenRouter(id="openai/gpt-4o-mini"), + model=OpenRouter( + id=os.getenv("PLANNER_MODEL_ID", "openai/gpt-4o-mini"), + max_tokens=None, + ), tools=[ UserControlFlowTools(), self.tool_get_agent_description, @@ -147,7 +150,11 @@ async def _analyze_input_and_create_tasks( # Parse planning result and create tasks try: - plan_raw = PlannerResponse.model_validate_json(run_response.content) + plan_content = run_response.content + if plan_content.startswith("```json\n") and plan_content.endswith("\n```"): + # Strip markdown code block if present + plan_content = "\n".join(plan_content.split("\n")[1:-1]) + plan_raw = PlannerResponse.model_validate_json(plan_content) except Exception as e: raise ValueError( f"Planner produced invalid JSON for PlannerResponse: {e}. " @@ -217,7 +224,10 @@ def tool_get_agent_description(self, agent_name: str) -> str: """ self.agent_connections.list_remote_agents() if card := self.agent_connections.get_remote_agent_card(agent_name): - return agentcard_to_prompt(card) + if isinstance(card, AgentCard): + return agentcard_to_prompt(card) + if isinstance(card, dict): + return str(card) return "The requested agent could not be found or is not available." diff --git a/python/valuecell/core/coordinate/planner_prompts.py b/python/valuecell/core/coordinate/planner_prompts.py index 534ca236f..6180fccc8 100644 --- a/python/valuecell/core/coordinate/planner_prompts.py +++ b/python/valuecell/core/coordinate/planner_prompts.py @@ -33,10 +33,9 @@ def create_prompt_with_datetime(base_prompt: str) -> str: - Ask only one clarification question at a time - Wait for user response before asking additional questions - Generate clear, specific prompts suitable for AI model execution -- Output must be valid JSON following the Response Format -- Output will be parsed programmatically, so ensure strict adherence to the format and do not include any extra text +- Output must be valid JSON string following the Response Format -**Response Format:** +**Response Format (return exactly this structure as JSON string; no extra keys, no comments, no backticks, no markdown format):** { "tasks": [ { diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index d9a3003c1..e78370a54 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -82,7 +82,7 @@ def sample_plan( plan_id="plan-1", session_id=session_id, user_id=user_id, - query=sample_query, + orig_query=sample_query, tasks=[sample_task], created_at="2025-09-16T10:00:00", ) @@ -309,8 +309,8 @@ async def test_planner_error( out.append(chunk) assert len(out) == 1 - assert "(Error)" in out[0].content - assert "Planning failed" in out[0].content + assert "(Error)" in out[0].data.content + assert "Planning failed" in out[0].data.content @pytest.mark.asyncio @@ -328,7 +328,7 @@ async def test_agent_connection_error( async for chunk in orchestrator.process_user_input(sample_user_input): out.append(chunk) - assert any("(Error)" in c.content for c in out) + assert any("(Error)" in c.data.content for c in out) @pytest.mark.asyncio diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index f80a5ee74..f5463637d 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -47,56 +47,35 @@ def clear_desired_agent(self) -> None: self.desired_agent_name = None -class MessageDataKind(str, Enum): - """Types of messages exchanged with agents""" +class StreamResponseEvent(str, Enum): + MESSAGE_CHUNK = "message_chunk" + TOOL_CALL_STARTED = "tool_call_started" + TOOL_CALL_COMPLETED = "tool_call_completed" + REASONING = "reasoning" + TASK_DONE = "task_done" + TASK_FAILED = "task_failed" - TEXT = "text" - IMAGE = "image" - COMMAND = "command" - -class MessageChunkStatus(str, Enum): - partial = "partial" - success = "success" - failure = "failure" - cancelled = "cancelled" - - -class MessageChunkMetadata(BaseModel): - status: MessageChunkStatus = Field( - default=MessageChunkStatus.partial, - description="Chunk outcome: use partial for intermediate chunks; success/failure for final.", - ) - session_id: str = Field(..., description="Session ID for this request") - user_id: str = Field(..., description="User ID who made this request") - agent_name: str = Field(..., description="Agent name handling this message") - - -class MessageChunk(BaseModel): - """Chunk of a message, useful for streaming responses""" - - content: str = Field(..., description="Content of the message chunk") - is_final: bool = Field( - default=False, description="Indicates if this is the final chunk" - ) - kind: MessageDataKind = Field( - ..., description="The type of data contained in the chunk" - ) - meta: MessageChunkMetadata = Field( - ..., description="Metadata associated with the message chunk" - ) +class NotifyResponseEvent(str, Enum): + MESSAGE = "message" + TASK_DONE = "task_done" + TASK_FAILED = "task_failed" class StreamResponse(BaseModel): """Response model for streaming agent responses""" - is_task_complete: bool = Field( - default=False, - description="Indicates whether the task associated with this stream response is complete.", + content: Optional[str] = Field( + None, + description="The content of the stream response, typically a chunk of data or message.", ) - content: str = Field( + event: StreamResponseEvent = Field( ..., - description="The content of the stream response, typically a chunk of data or message.", + description="The type of stream response, indicating its purpose or content nature.", + ) + metadata: Optional[dict] = Field( + None, + description="Optional metadata providing additional context about the response", ) @@ -107,6 +86,36 @@ class NotifyResponse(BaseModel): ..., description="The content of the notification response", ) + event: NotifyResponseEvent = Field( + ..., + description="The type of notification response", + ) + + +class ToolCallContent(BaseModel): + tool_call_id: str = Field(..., description="Unique ID for the tool call") + tool_name: str = Field(..., description="Name of the tool being called") + tool_result: Optional[str] = Field( + None, + description="The content returned from the tool call, if any.", + ) + + +class ProcessMessageData(BaseModel): + conversation_id: str = Field(..., description="Conversation ID for this request") + message_id: str = Field(..., description="Message ID for this request") + content: str | ToolCallContent = Field( + ..., description="Content of the message chunk" + ) + + +class ProcessMessage(BaseModel): + """Chunk of a message, useful for streaming responses""" + + event: StreamResponseEvent | NotifyResponseEvent = Field( + ..., description="The event type of the message chunk" + ) + data: ProcessMessageData = Field(..., description="Content of the message chunk") # TODO: keep only essential parameters @@ -144,7 +153,7 @@ async def notify( user_id: Target user ID for the notification Yields: - StreamResponse: Notification content and status + NotifyResponse: Notification content and status """ raise NotImplementedError diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 0cf33726c..2c67efbec 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -48,12 +48,7 @@ async def stream_query_agent( async for response_chunk in self.orchestrator.process_user_input( user_input ): - if ( - response_chunk - and response_chunk.content - and response_chunk.content.strip() - ): - yield response_chunk.content + yield response_chunk.model_dump() except Exception as e: logger.error(f"Error in stream_query_agent: {str(e)}") diff --git a/python/valuecell/utils/uuid.py b/python/valuecell/utils/uuid.py index a904c06b2..501c95f8b 100644 --- a/python/valuecell/utils/uuid.py +++ b/python/valuecell/utils/uuid.py @@ -6,3 +6,7 @@ def generate_uuid(prefix: str = None) -> str: return str(uuid4().hex) return f"{prefix}-{uuid4().hex}" + + +def generate_message_id() -> str: + return generate_uuid("msg")