From bd54b357558d2cc8ebf11f0036ca4ada96e0a1cf Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 20 Sep 2025 17:33:27 +0800 Subject: [PATCH 01/11] feat: add response constructors and refactor agent communication for tool calls --- python/valuecell/__init__.py | 4 + python/valuecell/core/agent/__init__.py | 3 + python/valuecell/core/agent/client.py | 4 +- python/valuecell/core/agent/decorator.py | 111 ++++++++++++++---- python/valuecell/core/agent/responses.py | 106 +++++++++++++++++ .../valuecell/core/coordinate/orchestrator.py | 103 ++++++++++------ python/valuecell/core/types.py | 48 ++++++-- 7 files changed, 309 insertions(+), 70 deletions(-) create mode 100644 python/valuecell/core/agent/responses.py diff --git a/python/valuecell/__init__.py b/python/valuecell/__init__.py index e49b81bb3..819a0b2b4 100644 --- a/python/valuecell/__init__.py +++ b/python/valuecell/__init__.py @@ -17,3 +17,7 @@ # registers agents on import import valuecell.agents as _ # noqa: F401 + +# Optional convenience re-exports (not added to __all__ to keep root clean) +# Users can import: from valuecell import responses +from . import responses as responses # noqa: E402,F401 diff --git a/python/valuecell/core/agent/__init__.py b/python/valuecell/core/agent/__init__.py index 0902cf057..7becdb9d8 100644 --- a/python/valuecell/core/agent/__init__.py +++ b/python/valuecell/core/agent/__init__.py @@ -20,3 +20,6 @@ "RemoteAgentResponse", "StreamResponse", ] + +# Convenience re-export for response constructors +from . import responses as responses # noqa: E402,F401 diff --git a/python/valuecell/core/agent/client.py b/python/valuecell/core/agent/client.py index 0852f3571..46687d5f2 100644 --- a/python/valuecell/core/agent/client.py +++ b/python/valuecell/core/agent/client.py @@ -50,7 +50,7 @@ async def _setup_client(self): async def send_message( self, query: str, - context_id: str = None, + session_id: str = None, metadata: dict = None, streaming: bool = False, ) -> AsyncIterator[RemoteAgentResponse]: @@ -65,7 +65,7 @@ async def send_message( role=Role.user, parts=[Part(root=TextPart(text=query))], message_id=generate_uuid("msg"), - context_id=context_id or generate_uuid("ctx"), + context_id=session_id or generate_uuid("ctx"), metadata=metadata if metadata else None, ) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 59fdb7454..dcdd0c769 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -27,7 +27,13 @@ from a2a.utils import new_agent_text_message, new_task from a2a.utils.errors import ServerError from valuecell.core.agent import registry -from valuecell.core.types import BaseAgent +from valuecell.core.types import ( + BaseAgent, + NotifyResponse, + NotifyResponseEvent, + StreamResponse, + StreamResponseEvent, +) from valuecell.utils import ( get_agent_card_path, get_next_available_port, @@ -153,55 +159,91 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non # Prepare query and ensure a task exists in the system query = context.get_user_input() task = context.current_task - metadata = context.metadata + task_meta = context.metadata + agent_name = self.agent.__class__.__name__ if not task: message = context.message task = new_task(message) - task.metadata = metadata + task.metadata = task_meta await event_queue.enqueue_event(task) # Helper state - updater = TaskUpdater(event_queue, task.id, task.context_id) - artifact_id = f"{self.agent.__class__.__name__}-artifact" - chunk_idx = 0 + task_id = task.id + session_id = task.context_id + updater = TaskUpdater(event_queue, task_id, session_id) + artifact_id = f"artifact-{agent_name}-{session_id}-{task_id}" + chunk_idx = -1 # Local helper to add a chunk - async def _add_chunk(content: str, last: bool = False): + async def _add_chunk( + response: StreamResponse | NotifyResponse, is_complete: bool + ): nonlocal chunk_idx - parts = [Part(root=TextPart(text=content))] + + chunk_idx += 1 + if not response.content: + return + + response_event = response.event + parts = [Part(root=TextPart(text=response.content))] + metadata = {"response_event": response_event.value} await updater.add_artifact( parts=parts, artifact_id=artifact_id, append=chunk_idx > 0, - last_chunk=last, + last_chunk=is_complete, + metadata=metadata, ) - if not last: - chunk_idx += 1 # Stream from the user agent and update task incrementally - await updater.update_status(TaskState.working) + await updater.update_status( + TaskState.working, message=f"Task received by {agent_name}" + ) try: query_handler = ( - self.agent.notify if metadata.get("notify") else self.agent.stream + self.agent.notify if task_meta.get("notify") else self.agent.stream ) - async for item in query_handler(query, task.context_id, task.id): - content = item.get("content", "") - is_complete = item.get("is_task_complete", True) - - await _add_chunk(content, last=is_complete) - + async for response in query_handler(query, session_id, task_id): + if not isinstance(response, (StreamResponse, NotifyResponse)): + raise ValueError( + f"Agent {agent_name} yielded invalid response type: {type(response)}" + ) + + response_event = response.event + if is_task_failed(response_event): + raise RuntimeError( + f"Agent {agent_name} reported failure: {response.content}" + ) + + is_complete = is_task_complete(response_event) + if is_tool_call(response_event): + message = None + if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: + message = new_agent_text_message( + response.content, session_id, task_id + ) + await updater.update_status( + TaskState.working, + message=message, + metadata={ + "event": response_event.value, + "tool_call_id": response.metadata.get("tool_call_id"), + "tool_name": response.metadata.get("tool_name"), + }, + ) + continue + + await _add_chunk(response, is_complete=is_complete) if is_complete: await updater.complete() break except Exception as e: - message = ( - f"Error during {self.agent.__class__.__name__} agent execution: {e}" - ) + message = f"Error during {agent_name} agent execution: {e}" logger.error(message) await updater.update_status( TaskState.failed, - message=new_agent_text_message(message, task.context_id, task.id), + message=new_agent_text_message(message, session_id, task_id), final=True, ) @@ -210,6 +252,29 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None raise ServerError(error=UnsupportedOperationError()) +def is_task_complete(response_type: str) -> bool: + return response_type in { + StreamResponseEvent.TASK_DONE, + StreamResponseEvent.TASK_FAILED, + NotifyResponseEvent.TASK_DONE, + NotifyResponseEvent.TASK_FAILED, + } + + +def is_task_failed(response_type: str) -> bool: + return response_type in { + StreamResponseEvent.TASK_FAILED, + NotifyResponseEvent.TASK_FAILED, + } + + +def is_tool_call(response_type: str) -> bool: + return response_type in { + StreamResponseEvent.TOOL_CALL_STARTED, + StreamResponseEvent.TOOL_CALL_COMPLETED, + } + + def _create_agent_executor(agent_instance): return GenericAgentExecutor(agent_instance) diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py new file mode 100644 index 000000000..78a36e739 --- /dev/null +++ b/python/valuecell/core/agent/responses.py @@ -0,0 +1,106 @@ +"""User-facing response constructors under valuecell.core.agent. + +Prefer importing from here if you're already working inside the core.agent +namespace. For a stable top-level import, you can also use +`valuecell.responses` which provides the same API. + +Example: + from valuecell.core.agent.responses import stream, notify + # Or explicit aliases for clarity: + from valuecell.core.agent.responses import streaming, notification + + yield stream.message_chunk("Thinking…") + yield stream.reasoning("Plan: 1) fetch 2) analyze") + yield stream.tool_call_start("call_1", "search") + yield stream.tool_call_result('{"items": 12}', "call_1", "search") + yield stream.done() + + send(notify.message("Task submitted")) + send(notify.done("OK")) +""" + +from __future__ import annotations + +from typing import Optional + +from valuecell.core.types import ( + NotifyResponse, + NotifyResponseEvent, + StreamResponse, + StreamResponseEvent, + ToolCallMeta, +) + + +class _StreamResponseNamespace: + """Factory methods for streaming responses.""" + + def message_chunk(self, content: str) -> StreamResponse: + return StreamResponse(event=StreamResponseEvent.MESSAGE_CHUNK, content=content) + + def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.TOOL_CALL_STARTED, + metadata=ToolCallMeta( + tool_call_id=tool_call_id, tool_name=tool_name + ).model_dump(), + ) + + def tool_call_completed( + self, content: str, tool_call_id: str, tool_name: str + ) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + content=content, + metadata=ToolCallMeta( + tool_call_id=tool_call_id, tool_name=tool_name + ).model_dump(), + ) + + def reasoning(self, content: str) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.REASONING, + content=content, + ) + + def done(self, content: Optional[str] = None) -> StreamResponse: + return StreamResponse( + content=content, + event=StreamResponseEvent.TASK_DONE, + ) + + def failed(self, content: Optional[str] = None) -> StreamResponse: + return StreamResponse( + content=content, + event=StreamResponseEvent.TASK_FAILED, + ) + + +streaming = _StreamResponseNamespace() + + +class _NotifyResponseNamespace: + """Factory methods for notify responses.""" + + def message(self, content: str) -> NotifyResponse: + return NotifyResponse( + content=content, + type=NotifyResponseEvent.MESSAGE, + ) + + def done(self, content: Optional[str] = None) -> NotifyResponse: + return NotifyResponse( + content=content, + type=NotifyResponseEvent.TASK_DONE, + ) + + +notification = _NotifyResponseNamespace() + + +__all__ = [ + "streaming", + "notification", + "StreamResponse", + "NotifyResponse", +] diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index bf326d6f1..1d3856595 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -6,6 +6,7 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from a2a.utils import get_message_text from valuecell.core.agent.connect import get_default_remote_connections +from valuecell.core.agent.decorator import is_tool_call from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager from valuecell.core.task.models import TaskPattern @@ -14,6 +15,7 @@ MessageChunkMetadata, MessageChunkStatus, MessageDataKind, + StreamResponseEvent, UserInput, ) @@ -343,9 +345,7 @@ async def _monitor_planning_task( # Planning completed, execute plan plan = await planning_task - async for chunk in self._execute_plan_with_input_support( - plan, user_input.meta.model_dump() - ): + async for chunk in self._execute_plan_with_input_support(plan): yield chunk async def _request_user_input(self, session_id: str, _user_id: str): @@ -394,6 +394,30 @@ def _create_message_chunk( is_final=is_final, ) + def _create_tool_message_chunk( + self, + session_id: str, + user_id: str, + agent_name: str, + tool_call_id: str, + tool_name: str, + content: Optional[str] = None, + ) -> MessageChunk: + """Create a MessageChunk with tool call metadata""" + return MessageChunk( + content=content, + kind=MessageDataKind.TEXT, + meta=MessageChunkMetadata( + session_id=session_id, + user_id=user_id, + agent_name=agent_name, + status=MessageChunkStatus.partial, + tool_call_id=tool_call_id, + tool_call_name=tool_name, + ), + is_final=False, + ) + def _create_error_message_chunk( self, error_msg: str, session_id: str, user_id: str, agent_name: str ) -> MessageChunk: @@ -460,9 +484,7 @@ async def _continue_planning( plan = await planning_task del self._execution_contexts[session_id] - async for chunk in self._execute_plan_with_input_support( - plan, original_user_input.meta.model_dump() - ): + async for chunk in self._execute_plan_with_input_support(plan): yield chunk async def _cancel_execution(self, session_id: str): @@ -506,7 +528,7 @@ async def _cleanup_expired_contexts( # ==================== Plan and Task Execution Methods ==================== async def _execute_plan_with_input_support( - self, plan: ExecutionPlan, metadata: dict + self, plan: ExecutionPlan, metadata: Optional[dict] = None ) -> AsyncGenerator[MessageChunk, None]: """ Execute an execution plan with Human-in-the-Loop support. @@ -518,7 +540,7 @@ async def _execute_plan_with_input_support( plan: The execution plan containing tasks to execute metadata: Execution metadata containing session and user info """ - session_id, user_id = metadata["session_id"], metadata["user_id"] + session_id, user_id = plan.session_id, plan.user_id if not plan.tasks: yield self._create_error_message_chunk( @@ -564,7 +586,7 @@ async def _execute_plan_with_input_support( await self._save_remaining_responses(session_id, agent_responses) async def _execute_task_with_input_support( - self, task: Task, query: str, metadata: dict + self, task: Task, query: str, metadata: Optional[dict] = None ) -> AsyncGenerator[MessageChunk, None]: """ Execute a single task with user input interruption support. @@ -579,53 +601,72 @@ async def _execute_task_with_input_support( await self.task_manager.start_task(task.task_id) # Get agent connection + agent_name = task.agent_name agent_card = await self.agent_connections.start_agent( - task.agent_name, + agent_name, with_listener=False, notification_callback=store_task_in_session, ) - client = await self.agent_connections.get_client(task.agent_name) - + client = await self.agent_connections.get_client(agent_name) if not client: - raise RuntimeError(f"Could not connect to agent {task.agent_name}") + raise RuntimeError(f"Could not connect to agent {agent_name}") # Configure metadata for notifications + metadata = metadata or {} if task.pattern != TaskPattern.ONCE: metadata["notify"] = True # Send message to agent - response = await client.send_message( + remote_response = await client.send_message( query, - context_id=task.session_id, + session_id=task.session_id, metadata=metadata, streaming=agent_card.capabilities.streaming, ) # Process streaming responses - async for remote_task, event in response: + async for remote_task, event in remote_response: if event is None and remote_task.status.state == TaskState.submitted: task.remote_task_ids.append(remote_task.id) continue if isinstance(event, TaskStatusUpdateEvent): - await self._handle_task_status_update(event, task) - - # TODO: Check for user input requirement + state = event.status.state + logger.info(f"Task {task.task_id} status update: {state}") + if state in {TaskState.submitted, TaskState.completed}: + continue # Handle task failure - if event.status.state == TaskState.failed: + if state == TaskState.failed: err_msg = get_message_text(event.status.message) await self.task_manager.fail_task(task.task_id, err_msg) yield self._create_error_message_chunk( - err_msg, task.session_id, task.user_id, task.agent_name + err_msg, task.session_id, task.user_id, agent_name ) return + # TODO: Check for user input requirement + # if state == TaskState.input_required: + # Handle tool call start + response_event = event.metadata.get("response_event") + if state == TaskState.working and is_tool_call(response_event): + content = None + if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: + content = get_message_text(event.status.message, "") + yield self._create_tool_message_chunk( + task.session_id, + task.user_id, + agent_name, + tool_call_id=event.metadata.get("tool_call_id", ""), + tool_name=event.metadata.get("tool_name", ""), + content=content, + ) + continue elif isinstance(event, TaskArtifactUpdateEvent): yield self._create_message_chunk( get_message_text(event.artifact, ""), task.session_id, task.user_id, - task.agent_name, + agent_name, is_final=metadata.get("notify", False), ) @@ -635,7 +676,7 @@ async def _execute_task_with_input_support( "", task.session_id, task.user_id, - task.agent_name, + agent_name, is_final=True, status=MessageChunkStatus.success, ) @@ -644,20 +685,6 @@ async def _execute_task_with_input_support( await self.task_manager.fail_task(task.task_id, str(e)) raise e - async def _handle_task_status_update( - self, event: TaskStatusUpdateEvent, task: Task - ): - """Handle task status update events""" - logger.info(f"Task {task.task_id} status update: {event.status.state}") - - # Add any additional status-specific handling here - if event.status.state == TaskState.submitted: - # Task was submitted successfully - pass - elif event.status.state == TaskState.completed: - # Task completed successfully - pass - async def _save_remaining_responses(self, session_id: str, agent_responses: dict): """Save any remaining agent responses to the session""" for agent_name, full_response in agent_responses.items(): @@ -725,7 +752,7 @@ async def _execute_task_legacy( response = await client.send_message( query, - context_id=task.session_id, + session_id=task.session_id, metadata=metadata, streaming=agent_card.capabilities.streaming, ) diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index f80a5ee74..5f35ef459 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -70,12 +70,18 @@ class MessageChunkMetadata(BaseModel): session_id: str = Field(..., description="Session ID for this request") user_id: str = Field(..., description="User ID who made this request") agent_name: str = Field(..., description="Agent name handling this message") + tool_call_id: Optional[str] = Field( + None, description="ID of the tool call being made" + ) + tool_call_name: Optional[str] = Field( + None, description="Name of the tool being called" + ) class MessageChunk(BaseModel): """Chunk of a message, useful for streaming responses""" - content: str = Field(..., description="Content of the message chunk") + content: Optional[str] = Field(None, description="Content of the message chunk") is_final: bool = Field( default=False, description="Indicates if this is the final chunk" ) @@ -87,16 +93,40 @@ class MessageChunk(BaseModel): ) +class StreamResponseEvent(str, Enum): + MESSAGE_CHUNK = "message_chunk" + TOOL_CALL_STARTED = "tool_call_started" + TOOL_CALL_COMPLETED = "tool_call_completed" + REASONING = "reasoning" + TASK_DONE = "task_done" + TASK_FAILED = "task_failed" + + +class NotifyResponseEvent(str, Enum): + MESSAGE = "message" + TASK_DONE = "task_done" + TASK_FAILED = "task_failed" + + +class ToolCallMeta(BaseModel): + tool_call_id: str = Field(..., description="Unique ID for the tool call") + tool_name: str = Field(..., description="Name of the tool being called") + + class StreamResponse(BaseModel): """Response model for streaming agent responses""" - is_task_complete: bool = Field( - default=False, - description="Indicates whether the task associated with this stream response is complete.", + content: Optional[str] = Field( + None, + description="The content of the stream response, typically a chunk of data or message.", ) - content: str = Field( + event: StreamResponseEvent = Field( ..., - description="The content of the stream response, typically a chunk of data or message.", + description="The type of stream response, indicating its purpose or content nature.", + ) + metadata: Optional[dict] = Field( + None, + description="Optional metadata providing additional context about the response", ) @@ -107,6 +137,10 @@ class NotifyResponse(BaseModel): ..., description="The content of the notification response", ) + type: NotifyResponseEvent = Field( + ..., + description="The type of notification response", + ) # TODO: keep only essential parameters @@ -144,7 +178,7 @@ async def notify( user_id: Target user ID for the notification Yields: - StreamResponse: Notification content and status + NotifyResponse: Notification content and status """ raise NotImplementedError From 23adbf8c26617c08d45ff82ee65584298d97fd3c Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 20 Sep 2025 18:00:32 +0800 Subject: [PATCH 02/11] refactor: update tool call handling and message structures for improved clarity and functionality --- python/valuecell/core/agent/decorator.py | 6 +- python/valuecell/core/agent/responses.py | 15 ++-- .../valuecell/core/coordinate/orchestrator.py | 46 +++++------ python/valuecell/core/types.py | 79 +++++++------------ 4 files changed, 53 insertions(+), 93 deletions(-) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index dcdd0c769..9c3e02825 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -217,11 +217,6 @@ async def _add_chunk( is_complete = is_task_complete(response_event) if is_tool_call(response_event): - message = None - if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: - message = new_agent_text_message( - response.content, session_id, task_id - ) await updater.update_status( TaskState.working, message=message, @@ -229,6 +224,7 @@ async def _add_chunk( "event": response_event.value, "tool_call_id": response.metadata.get("tool_call_id"), "tool_name": response.metadata.get("tool_name"), + "tool_result": response.metadata.get("content"), }, ) continue diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index 78a36e739..64f768546 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -28,7 +28,7 @@ NotifyResponseEvent, StreamResponse, StreamResponseEvent, - ToolCallMeta, + ToolCallContent, ) @@ -41,19 +41,18 @@ def message_chunk(self, content: str) -> StreamResponse: def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.TOOL_CALL_STARTED, - metadata=ToolCallMeta( + metadata=ToolCallContent( tool_call_id=tool_call_id, tool_name=tool_name ).model_dump(), ) def tool_call_completed( - self, content: str, tool_call_id: str, tool_name: str + self, tool_result: str, tool_call_id: str, tool_name: str ) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.TOOL_CALL_COMPLETED, - content=content, - metadata=ToolCallMeta( - tool_call_id=tool_call_id, tool_name=tool_name + metadata=ToolCallContent( + tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result ).model_dump(), ) @@ -85,13 +84,13 @@ class _NotifyResponseNamespace: def message(self, content: str) -> NotifyResponse: return NotifyResponse( content=content, - type=NotifyResponseEvent.MESSAGE, + event=NotifyResponseEvent.MESSAGE, ) def done(self, content: Optional[str] = None) -> NotifyResponse: return NotifyResponse( content=content, - type=NotifyResponseEvent.TASK_DONE, + event=NotifyResponseEvent.TASK_DONE, ) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 1d3856595..275d13bd5 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -10,14 +10,7 @@ from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager from valuecell.core.task.models import TaskPattern -from valuecell.core.types import ( - MessageChunk, - MessageChunkMetadata, - MessageChunkStatus, - MessageDataKind, - StreamResponseEvent, - UserInput, -) +from valuecell.core.types import ProcessMessage, UserInput from .callback import store_task_in_session from .models import ExecutionPlan @@ -123,7 +116,7 @@ def __init__(self): async def process_user_input( self, user_input: UserInput - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """ Main entry point for processing user requests with Human-in-the-Loop support. @@ -236,7 +229,7 @@ async def _ensure_session_exists(self, session_id: str, user_id: str): async def _handle_session_continuation( self, user_input: UserInput - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Handle continuation of an interrupted session""" session_id = user_input.meta.session_id user_id = user_input.meta.user_id @@ -284,7 +277,7 @@ async def _handle_session_continuation( async def _handle_new_request( self, user_input: UserInput - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Handle a new user request""" session_id = user_input.meta.session_id @@ -317,7 +310,7 @@ async def context_aware_handle(request): async def _monitor_planning_task( self, planning_task, user_input: UserInput, callback - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Monitor planning task and handle user input interruptions""" session_id = user_input.meta.session_id user_id = user_input.meta.user_id @@ -380,9 +373,9 @@ def _create_message_chunk( kind: MessageDataKind = MessageDataKind.TEXT, is_final: bool = False, status: MessageChunkStatus = MessageChunkStatus.partial, - ) -> MessageChunk: + ) -> ProcessMessage: """Create a MessageChunk with standardized metadata""" - return MessageChunk( + return Message( content=content, kind=kind, meta=MessageChunkMetadata( @@ -401,10 +394,10 @@ def _create_tool_message_chunk( agent_name: str, tool_call_id: str, tool_name: str, - content: Optional[str] = None, - ) -> MessageChunk: + tool_result: Optional[str] = None, + ) -> ProcessMessage: """Create a MessageChunk with tool call metadata""" - return MessageChunk( + return Message( content=content, kind=MessageDataKind.TEXT, meta=MessageChunkMetadata( @@ -420,7 +413,7 @@ def _create_tool_message_chunk( def _create_error_message_chunk( self, error_msg: str, session_id: str, user_id: str, agent_name: str - ) -> MessageChunk: + ) -> ProcessMessage: """Create an error MessageChunk with standardized format""" return self._create_message_chunk( content=f"(Error): {error_msg}", @@ -437,7 +430,7 @@ def _create_user_input_request_chunk( session_id: str, user_id: str, agent_name: str = "__planner__", - ) -> MessageChunk: + ) -> ProcessMessage: """Create a user input request MessageChunk""" return self._create_message_chunk( content=f"USER_INPUT_REQUIRED:{prompt}", @@ -451,7 +444,7 @@ def _create_user_input_request_chunk( async def _continue_planning( self, session_id: str, context: ExecutionContext - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[Message, None]: """Resume planning stage execution""" planning_task = context.get_metadata("planning_task") original_user_input = context.get_metadata("original_user_input") @@ -529,7 +522,7 @@ async def _cleanup_expired_contexts( async def _execute_plan_with_input_support( self, plan: ExecutionPlan, metadata: Optional[dict] = None - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """ Execute an execution plan with Human-in-the-Loop support. @@ -587,7 +580,7 @@ async def _execute_plan_with_input_support( async def _execute_task_with_input_support( self, task: Task, query: str, metadata: Optional[dict] = None - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """ Execute a single task with user input interruption support. @@ -648,16 +641,13 @@ async def _execute_task_with_input_support( # Handle tool call start response_event = event.metadata.get("response_event") if state == TaskState.working and is_tool_call(response_event): - content = None - if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: - content = get_message_text(event.status.message, "") yield self._create_tool_message_chunk( task.session_id, task.user_id, agent_name, tool_call_id=event.metadata.get("tool_call_id", ""), tool_name=event.metadata.get("tool_name", ""), - content=content, + tool_result=event.metadata.get("tool_result", ""), ) continue @@ -697,7 +687,7 @@ async def _save_remaining_responses(self, session_id: str, agent_responses: dict async def _execute_plan_legacy( self, plan: ExecutionPlan, metadata: dict - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """ Execute an execution plan without Human-in-the-Loop support. @@ -727,7 +717,7 @@ async def _execute_plan_legacy( async def _execute_task_legacy( self, task: Task, query: str, metadata: dict - ) -> AsyncGenerator[MessageChunk, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """ Execute a single task without user input interruption support. diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 5f35ef459..e1d264a61 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -47,52 +47,6 @@ def clear_desired_agent(self) -> None: self.desired_agent_name = None -class MessageDataKind(str, Enum): - """Types of messages exchanged with agents""" - - TEXT = "text" - IMAGE = "image" - COMMAND = "command" - - -class MessageChunkStatus(str, Enum): - partial = "partial" - success = "success" - failure = "failure" - cancelled = "cancelled" - - -class MessageChunkMetadata(BaseModel): - status: MessageChunkStatus = Field( - default=MessageChunkStatus.partial, - description="Chunk outcome: use partial for intermediate chunks; success/failure for final.", - ) - session_id: str = Field(..., description="Session ID for this request") - user_id: str = Field(..., description="User ID who made this request") - agent_name: str = Field(..., description="Agent name handling this message") - tool_call_id: Optional[str] = Field( - None, description="ID of the tool call being made" - ) - tool_call_name: Optional[str] = Field( - None, description="Name of the tool being called" - ) - - -class MessageChunk(BaseModel): - """Chunk of a message, useful for streaming responses""" - - content: Optional[str] = Field(None, description="Content of the message chunk") - is_final: bool = Field( - default=False, description="Indicates if this is the final chunk" - ) - kind: MessageDataKind = Field( - ..., description="The type of data contained in the chunk" - ) - meta: MessageChunkMetadata = Field( - ..., description="Metadata associated with the message chunk" - ) - - class StreamResponseEvent(str, Enum): MESSAGE_CHUNK = "message_chunk" TOOL_CALL_STARTED = "tool_call_started" @@ -108,11 +62,6 @@ class NotifyResponseEvent(str, Enum): TASK_FAILED = "task_failed" -class ToolCallMeta(BaseModel): - tool_call_id: str = Field(..., description="Unique ID for the tool call") - tool_name: str = Field(..., description="Name of the tool being called") - - class StreamResponse(BaseModel): """Response model for streaming agent responses""" @@ -137,12 +86,38 @@ class NotifyResponse(BaseModel): ..., description="The content of the notification response", ) - type: NotifyResponseEvent = Field( + event: NotifyResponseEvent = Field( ..., description="The type of notification response", ) +class ToolCallContent(BaseModel): + tool_call_id: str = Field(..., description="Unique ID for the tool call") + tool_name: str = Field(..., description="Name of the tool being called") + tool_result: Optional[str] = Field( + None, + description="The content returned from the tool call, if any.", + ) + + +class ProcessMessageData(BaseModel): + conversation_id: str = Field(..., description="Conversation ID for this request") + message_id: Optional[str] = Field(None, description="Message ID for this request") + content: str | ToolCallContent = Field( + ..., description="Content of the message chunk" + ) + + +class ProcessMessage(BaseModel): + """Chunk of a message, useful for streaming responses""" + + event: StreamResponseEvent | NotifyResponseEvent = Field( + ..., description="The event type of the message chunk" + ) + data: ProcessMessageData = Field(..., description="Content of the message chunk") + + # TODO: keep only essential parameters class BaseAgent(ABC): """ From ba48663826f60eacb7ac7a7dbd85e538e5e3fbe6 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 20 Sep 2025 18:29:15 +0800 Subject: [PATCH 03/11] refactor: rename task completion functions and standardize message creation methods --- python/valuecell/core/agent/decorator.py | 4 +- .../valuecell/core/coordinate/orchestrator.py | 328 ++++++------------ 2 files changed, 100 insertions(+), 232 deletions(-) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 9c3e02825..034a5d7dd 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -215,7 +215,7 @@ async def _add_chunk( f"Agent {agent_name} reported failure: {response.content}" ) - is_complete = is_task_complete(response_event) + is_complete = is_task_completed(response_event) if is_tool_call(response_event): await updater.update_status( TaskState.working, @@ -248,7 +248,7 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None raise ServerError(error=UnsupportedOperationError()) -def is_task_complete(response_type: str) -> bool: +def is_task_completed(response_type: str) -> bool: return response_type in { StreamResponseEvent.TASK_DONE, StreamResponseEvent.TASK_FAILED, diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 275d13bd5..455c8bcee 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -6,11 +6,18 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from a2a.utils import get_message_text from valuecell.core.agent.connect import get_default_remote_connections -from valuecell.core.agent.decorator import is_tool_call +from valuecell.core.agent.decorator import is_tool_call, is_task_completed from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager from valuecell.core.task.models import TaskPattern -from valuecell.core.types import ProcessMessage, UserInput +from valuecell.core.types import ( + NotifyResponseEvent, + ProcessMessage, + ProcessMessageData, + StreamResponseEvent, + ToolCallContent, + UserInput, +) from .callback import store_task_in_session from .models import ExecutionPlan @@ -140,16 +147,16 @@ async def process_user_input( # Handle session continuation vs new request if session.status == SessionStatus.REQUIRE_USER_INPUT: - async for chunk in self._handle_session_continuation(user_input): - yield chunk + async for message in self._handle_session_continuation(user_input): + yield message else: - async for chunk in self._handle_new_request(user_input): - yield chunk + async for message in self._handle_new_request(user_input): + yield message except Exception as e: logger.exception(f"Error processing user input for session {session_id}") - yield self._create_error_message_chunk( - f"Error processing request: {str(e)}", session_id, user_id, "__system__" + yield self._create_error_message( + f"Error processing request: {str(e)}", session_id ) async def provide_user_input(self, session_id: str, response: str): @@ -236,11 +243,9 @@ async def _handle_session_continuation( # Validate execution context exists if session_id not in self._execution_contexts: - yield self._create_error_message_chunk( + yield self._create_error_message( "No execution context found for this session. The session may have expired.", session_id, - user_id, - "__system__", ) return @@ -248,11 +253,9 @@ async def _handle_session_continuation( # Validate context integrity and user consistency if not self._validate_execution_context(context, user_id): - yield self._create_error_message_chunk( + yield self._create_error_message( "Invalid execution context or user mismatch.", session_id, - user_id, - "__system__", ) await self._cancel_execution(session_id) return @@ -268,11 +271,9 @@ async def _handle_session_continuation( yield chunk # TODO: Add support for resuming execution stage if needed else: - yield self._create_error_message_chunk( + yield self._create_error_message( "Resuming execution stage is not yet supported.", session_id, - user_id, - "__system__", ) async def _handle_new_request( @@ -328,9 +329,9 @@ async def _monitor_planning_task( self._execution_contexts[session_id] = context # Update session status and send user input request - await self._request_user_input(session_id, user_id) - yield self._create_user_input_request_chunk( - self.get_user_input_prompt(session_id), session_id, context.user_id + await self._request_user_input(session_id) + yield self._create_user_input_request( + self.get_user_input_prompt(session_id), session_id ) return @@ -341,9 +342,8 @@ async def _monitor_planning_task( async for chunk in self._execute_plan_with_input_support(plan): yield chunk - async def _request_user_input(self, session_id: str, _user_id: str): + async def _request_user_input(self, session_id: str): """Set session to require user input and send the request""" - # Note: _user_id parameter kept for potential future use in user validation session = await self.session_manager.get_session(session_id) if session: session.require_user_input() @@ -364,97 +364,77 @@ def _validate_execution_context( return True - def _create_message_chunk( + def _create_message( self, content: str, - session_id: str, - user_id: str, - agent_name: str, - kind: MessageDataKind = MessageDataKind.TEXT, - is_final: bool = False, - status: MessageChunkStatus = MessageChunkStatus.partial, + conversation_id: str, + event: ( + StreamResponseEvent | NotifyResponseEvent + ) = StreamResponseEvent.MESSAGE_CHUNK, + message_id: Optional[str] = None, ) -> ProcessMessage: - """Create a MessageChunk with standardized metadata""" - return Message( - content=content, - kind=kind, - meta=MessageChunkMetadata( - session_id=session_id, - user_id=user_id, - agent_name=agent_name, - status=status, + """Create a ProcessMessage for plain text content using the new schema.""" + return ProcessMessage( + event=event, + data=ProcessMessageData( + conversation_id=conversation_id, + message_id=message_id, + content=content, ), - is_final=is_final, ) - def _create_tool_message_chunk( + def _create_tool_message( self, - session_id: str, - user_id: str, - agent_name: str, + event: StreamResponseEvent | NotifyResponseEvent, + conversation_id: str, tool_call_id: str, tool_name: str, tool_result: Optional[str] = None, ) -> ProcessMessage: - """Create a MessageChunk with tool call metadata""" - return Message( - content=content, - kind=MessageDataKind.TEXT, - meta=MessageChunkMetadata( - session_id=session_id, - user_id=user_id, - agent_name=agent_name, - status=MessageChunkStatus.partial, - tool_call_id=tool_call_id, - tool_call_name=tool_name, + """Create a ProcessMessage for tool call events with ToolCallContent.""" + return ProcessMessage( + event=event, + data=ProcessMessageData( + conversation_id=conversation_id, + content=ToolCallContent( + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_result=tool_result, + ), ), - is_final=False, ) - def _create_error_message_chunk( - self, error_msg: str, session_id: str, user_id: str, agent_name: str - ) -> ProcessMessage: - """Create an error MessageChunk with standardized format""" - return self._create_message_chunk( + def _create_error_message(self, error_msg: str, session_id: str) -> ProcessMessage: + """Create an error ProcessMessage with standardized format (TASK_FAILED).""" + return self._create_message( content=f"(Error): {error_msg}", - session_id=session_id, - user_id=user_id, - agent_name=agent_name, - is_final=True, - status=MessageChunkStatus.failure, + conversation_id=session_id, + event=StreamResponseEvent.TASK_FAILED, ) - def _create_user_input_request_chunk( + def _create_user_input_request( self, prompt: str, session_id: str, - user_id: str, - agent_name: str = "__planner__", ) -> ProcessMessage: - """Create a user input request MessageChunk""" - return self._create_message_chunk( + """Create a user input request ProcessMessage. The consumer should parse the prefix.""" + return self._create_message( content=f"USER_INPUT_REQUIRED:{prompt}", - session_id=session_id, - user_id=user_id, - agent_name=agent_name, - kind=MessageDataKind.COMMAND, - is_final=True, - status=MessageChunkStatus.partial, + conversation_id=session_id, + event=StreamResponseEvent.MESSAGE_CHUNK, ) async def _continue_planning( self, session_id: str, context: ExecutionContext - ) -> AsyncGenerator[Message, None]: + ) -> AsyncGenerator[ProcessMessage, None]: """Resume planning stage execution""" planning_task = context.get_metadata("planning_task") original_user_input = context.get_metadata("original_user_input") if not all([planning_task, original_user_input]): - yield self._create_error_message_chunk( + yield self._create_error_message( "Invalid planning context - missing required data", session_id, - context.user_id, - "__planner__", ) await self._cancel_execution(session_id) return @@ -465,10 +445,8 @@ async def _continue_planning( # Still need more user input, send request prompt = self.get_user_input_prompt(session_id) # Ensure session is set to require user input again for repeated prompts - await self._request_user_input(session_id, context.user_id) - yield self._create_user_input_request_chunk( - prompt, session_id, context.user_id - ) + await self._request_user_input(session_id) + yield self._create_user_input_request(prompt, session_id) return await asyncio.sleep(ASYNC_SLEEP_INTERVAL) @@ -477,8 +455,8 @@ async def _continue_planning( plan = await planning_task del self._execution_contexts[session_id] - async for chunk in self._execute_plan_with_input_support(plan): - yield chunk + async for message in self._execute_plan_with_input_support(plan): + yield message async def _cancel_execution(self, session_id: str): """Cancel execution and clean up all related resources""" @@ -533,11 +511,11 @@ async def _execute_plan_with_input_support( plan: The execution plan containing tasks to execute metadata: Execution metadata containing session and user info """ - session_id, user_id = plan.session_id, plan.user_id + session_id = plan.session_id if not plan.tasks: - yield self._create_error_message_chunk( - "No tasks found for this request.", session_id, user_id, "__system__" + yield self._create_error_message( + "No tasks found for this request.", session_id ) return @@ -550,30 +528,35 @@ async def _execute_plan_with_input_support( await self.task_manager.store.save_task(task) # Execute task with input support - async for chunk in self._execute_task_with_input_support( + async for message in self._execute_task_with_input_support( task, plan.query, metadata ): - # Accumulate agent responses - agent_name = chunk.meta.agent_name - agent_responses[agent_name] += chunk.content - yield chunk - - # Save complete responses to session - if chunk.is_final and agent_responses[agent_name].strip(): - await self.session_manager.add_message( - session_id, - Role.AGENT, - agent_responses[agent_name], - agent_name=agent_name, - ) - agent_responses[agent_name] = "" + # Accumulate based on event + if message.event in { + StreamResponseEvent.MESSAGE_CHUNK, + StreamResponseEvent.REASONING, + NotifyResponseEvent.MESSAGE, + } and isinstance(message.data.content, str): + agent_responses[task.agent_name] += message.data.content + yield message + + if ( + is_task_completed(message.event) + or task.pattern == TaskPattern.RECURRING + ): + if agent_responses[task.agent_name].strip(): + await self.session_manager.add_message( + session_id, + Role.AGENT, + agent_responses[task.agent_name], + agent_name=task.agent_name, + ) + agent_responses[task.agent_name] = "" except Exception as e: error_msg = f"Error executing {task.agent_name}: {str(e)}" logger.exception(f"Task execution failed: {error_msg}") - yield self._create_error_message_chunk( - error_msg, session_id, user_id, task.agent_name - ) + yield self._create_error_message(error_msg, session_id) # Save any remaining agent responses await self._save_remaining_responses(session_id, agent_responses) @@ -632,43 +615,34 @@ async def _execute_task_with_input_support( if state == TaskState.failed: err_msg = get_message_text(event.status.message) await self.task_manager.fail_task(task.task_id, err_msg) - yield self._create_error_message_chunk( - err_msg, task.session_id, task.user_id, agent_name - ) + yield self._create_error_message(err_msg, task.session_id) return - # TODO: Check for user input requirement # if state == TaskState.input_required: # Handle tool call start response_event = event.metadata.get("response_event") if state == TaskState.working and is_tool_call(response_event): - yield self._create_tool_message_chunk( + yield self._create_tool_message( + response_event, task.session_id, - task.user_id, - agent_name, tool_call_id=event.metadata.get("tool_call_id", ""), tool_name=event.metadata.get("tool_name", ""), - tool_result=event.metadata.get("tool_result", ""), + tool_result=event.metadata.get("tool_result"), ) continue elif isinstance(event, TaskArtifactUpdateEvent): - yield self._create_message_chunk( + yield self._create_message( get_message_text(event.artifact, ""), task.session_id, - task.user_id, - agent_name, - is_final=metadata.get("notify", False), + event=StreamResponseEvent.MESSAGE_CHUNK, ) # Complete task successfully await self.task_manager.complete_task(task.task_id) - yield self._create_message_chunk( + yield self._create_message( "", task.session_id, - task.user_id, - agent_name, - is_final=True, - status=MessageChunkStatus.success, + event=StreamResponseEvent.TASK_DONE, ) except Exception as e: @@ -683,112 +657,6 @@ async def _save_remaining_responses(self, session_id: str, agent_responses: dict session_id, Role.AGENT, full_response, agent_name=agent_name ) - # ==================== Legacy Task Execution (No HIL Support) ==================== - - async def _execute_plan_legacy( - self, plan: ExecutionPlan, metadata: dict - ) -> AsyncGenerator[ProcessMessage, None]: - """ - Execute an execution plan without Human-in-the-Loop support. - - This is a simplified version for backwards compatibility. - """ - session_id, user_id = metadata["session_id"], metadata["user_id"] - - if not plan.tasks: - yield self._create_error_message_chunk( - "No tasks found for this request.", session_id, user_id, "__system__" - ) - return - - # Execute tasks sequentially - for task in plan.tasks: - try: - await self.task_manager.store.save_task(task) - async for chunk in self._execute_task_legacy( - task, plan.query, metadata - ): - yield chunk - except Exception as e: - error_msg = f"Error executing {task.agent_name}: {str(e)}" - yield self._create_error_message_chunk( - error_msg, session_id, user_id, task.agent_name - ) - - async def _execute_task_legacy( - self, task: Task, query: str, metadata: dict - ) -> AsyncGenerator[ProcessMessage, None]: - """ - Execute a single task without user input interruption support. - - This is a simplified version for backwards compatibility. - """ - try: - await self.task_manager.start_task(task.task_id) - - # Get agent connection - agent_card = await self.agent_connections.start_agent( - task.agent_name, - with_listener=False, - notification_callback=store_task_in_session, - ) - client = await self.agent_connections.get_client(task.agent_name) - - if not client: - raise RuntimeError(f"Could not connect to agent {task.agent_name}") - - if task.pattern != TaskPattern.ONCE: - metadata["notify"] = True - - response = await client.send_message( - query, - session_id=task.session_id, - metadata=metadata, - streaming=agent_card.capabilities.streaming, - ) - - # Process streaming responses - async for remote_task, event in response: - if event is None and remote_task.status.state == TaskState.submitted: - task.remote_task_ids.append(remote_task.id) - continue - - if isinstance(event, TaskStatusUpdateEvent): - logger.info(f"Task status update: {event.status.state}") - - if event.status.state == TaskState.failed: - err_msg = get_message_text(event.status.message) - await self.task_manager.fail_task(task.task_id, err_msg) - yield self._create_error_message_chunk( - err_msg, task.session_id, task.user_id, task.agent_name - ) - return - continue - - if isinstance(event, TaskArtifactUpdateEvent): - yield self._create_message_chunk( - get_message_text(event.artifact, ""), - task.session_id, - task.user_id, - task.agent_name, - is_final=metadata.get("notify", False), - ) - - # Complete task - await self.task_manager.complete_task(task.task_id) - yield self._create_message_chunk( - "", - task.session_id, - task.user_id, - task.agent_name, - is_final=True, - status=MessageChunkStatus.success, - ) - - except Exception as e: - await self.task_manager.fail_task(task.task_id, str(e)) - raise e - # ==================== Module-level Factory Function ==================== From 1d446ad8f067341b0f956d30119fc8a082623d6f Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sun, 21 Sep 2025 23:31:06 +0800 Subject: [PATCH 04/11] refactor: update type imports and streamline agent response exports --- .../ai-hedge-fund/adapter/__main__.py | 19 +++++++------------ python/valuecell/__init__.py | 4 ---- python/valuecell/core/__init__.py | 9 +++++++++ python/valuecell/core/agent/__init__.py | 10 ---------- 4 files changed, 16 insertions(+), 26 deletions(-) diff --git a/python/third_party/ai-hedge-fund/adapter/__main__.py b/python/third_party/ai-hedge-fund/adapter/__main__.py index 820f8d2bc..cfbb72466 100644 --- a/python/third_party/ai-hedge-fund/adapter/__main__.py +++ b/python/third_party/ai-hedge-fund/adapter/__main__.py @@ -2,7 +2,7 @@ import logging import os from datetime import datetime -from typing import List +from typing import AsyncGenerator, List from agno.agent import Agent from agno.models.openrouter import OpenRouter @@ -10,7 +10,7 @@ from langchain_core.messages import HumanMessage from pydantic import BaseModel, Field, field_validator from valuecell.core.agent.decorator import create_wrapped_agent -from valuecell.core.types import BaseAgent +from valuecell.core import BaseAgent, StreamResponse, streaming from src.main import create_workflow from src.utils.analysts import ANALYST_ORDER @@ -69,7 +69,9 @@ def __init__(self): markdown=True, ) - async def stream(self, query, session_id, task_id): + async def stream( + self, query, session_id, task_id + ) -> AsyncGenerator[StreamResponse, None]: logger.info( f"Parsing query: {query}. Task ID: {task_id}, Session ID: {session_id}" ) @@ -123,15 +125,8 @@ async def stream(self, query, session_id, task_id): ): if not isinstance(chunk, str): continue - yield { - "content": chunk, - "is_task_complete": False, - } - - yield { - "content": "", - "is_task_complete": True, - } + yield streaming.message_chunk(chunk) + yield streaming.done() def run_hedge_fund_stream( diff --git a/python/valuecell/__init__.py b/python/valuecell/__init__.py index 819a0b2b4..e49b81bb3 100644 --- a/python/valuecell/__init__.py +++ b/python/valuecell/__init__.py @@ -17,7 +17,3 @@ # registers agents on import import valuecell.agents as _ # noqa: F401 - -# Optional convenience re-exports (not added to __all__ to keep root clean) -# Users can import: from valuecell import responses -from . import responses as responses # noqa: E402,F401 diff --git a/python/valuecell/core/__init__.py b/python/valuecell/core/__init__.py index ca0a90f49..7efffec1f 100644 --- a/python/valuecell/core/__init__.py +++ b/python/valuecell/core/__init__.py @@ -30,6 +30,9 @@ RemoteAgentResponse, ) +from .agent.decorator import serve, create_wrapped_agent +from .agent.responses import streaming, notification + __all__ = [ # Session exports "Message", @@ -54,4 +57,10 @@ "BaseAgent", "StreamResponse", "RemoteAgentResponse", + # Agent utilities + "serve", + "create_wrapped_agent", + # Response utilities + "streaming", + "notification", ] diff --git a/python/valuecell/core/agent/__init__.py b/python/valuecell/core/agent/__init__.py index 7becdb9d8..2005b5024 100644 --- a/python/valuecell/core/agent/__init__.py +++ b/python/valuecell/core/agent/__init__.py @@ -6,20 +6,10 @@ from .decorator import serve from .registry import AgentRegistry -# Import types from the unified types module -from ..types import BaseAgent, RemoteAgentResponse, StreamResponse - - __all__ = [ # Core agent exports "AgentClient", "RemoteConnections", "serve", "AgentRegistry", - "BaseAgent", - "RemoteAgentResponse", - "StreamResponse", ] - -# Convenience re-export for response constructors -from . import responses as responses # noqa: E402,F401 From 16bd3a9ad8ed5b4581fb6693f37125bb4e9906b3 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 00:05:29 +0800 Subject: [PATCH 05/11] refactor: update planner model ID retrieval and improve agent card handling --- python/.env.example | 1 + python/valuecell/core/agent/decorator.py | 5 ++++- python/valuecell/core/coordinate/models.py | 2 +- python/valuecell/core/coordinate/orchestrator.py | 8 +++++--- python/valuecell/core/coordinate/planner.py | 9 ++++++--- .../valuecell/core/coordinate/tests/test_orchestrator.py | 2 +- 6 files changed, 18 insertions(+), 9 deletions(-) diff --git a/python/.env.example b/python/.env.example index 2972c2341..d7fba4fc3 100644 --- a/python/.env.example +++ b/python/.env.example @@ -34,6 +34,7 @@ AGENT_DEBUG_MODE=false SEC_EMAIL=your.name@example.com # Model IDs for OpenRouter +PLANNER_MODEL_ID=openai/gpt-4o-mini SEC_PARSER_MODEL_ID=openai/gpt-4o-mini SEC_ANALYSIS_MODEL_ID=deepseek/deepseek-chat-v3-0324 AI_HEDGE_FUND_PARSER_MODEL_ID=openai/gpt-4o-mini diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 034a5d7dd..c3eb91e56 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -197,7 +197,10 @@ async def _add_chunk( # Stream from the user agent and update task incrementally await updater.update_status( - TaskState.working, message=f"Task received by {agent_name}" + TaskState.working, + message=new_agent_text_message( + f"Task received by {agent_name}", session_id, task_id + ), ) try: query_handler = ( diff --git a/python/valuecell/core/coordinate/models.py b/python/valuecell/core/coordinate/models.py index 3bd9f8749..f91669e94 100644 --- a/python/valuecell/core/coordinate/models.py +++ b/python/valuecell/core/coordinate/models.py @@ -18,7 +18,7 @@ class ExecutionPlan(BaseModel): ..., description="Session ID this plan belongs to" ) user_id: str = Field(..., description="User ID who requested this plan") - query: str = Field(..., description="Original user query that generated this plan") + orig_query: str = Field(..., description="Original user query that generated this plan") tasks: List[Task] = Field(default_factory=list, description="Tasks to execute") created_at: str = Field(..., description="Plan creation timestamp") diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 455c8bcee..677d4cd8d 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -529,7 +529,7 @@ async def _execute_plan_with_input_support( # Execute task with input support async for message in self._execute_task_with_input_support( - task, plan.query, metadata + task, metadata ): # Accumulate based on event if message.event in { @@ -562,7 +562,7 @@ async def _execute_plan_with_input_support( await self._save_remaining_responses(session_id, agent_responses) async def _execute_task_with_input_support( - self, task: Task, query: str, metadata: Optional[dict] = None + self, task: Task, metadata: Optional[dict] = None ) -> AsyncGenerator[ProcessMessage, None]: """ Execute a single task with user input interruption support. @@ -594,7 +594,7 @@ async def _execute_task_with_input_support( # Send message to agent remote_response = await client.send_message( - query, + task.query, session_id=task.session_id, metadata=metadata, streaming=agent_card.capabilities.streaming, @@ -619,6 +619,8 @@ async def _execute_task_with_input_support( return # if state == TaskState.input_required: # Handle tool call start + if not event.metadata: + continue response_event = event.metadata.get("response_event") if state == TaskState.working and is_tool_call(response_event): yield self._create_tool_message( diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index 07b89b142..fc3aa06ee 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -77,7 +77,7 @@ async def create_plan( plan_id=generate_uuid("plan"), session_id=user_input.meta.session_id, user_id=user_input.meta.user_id, - query=user_input.query, # Store the original query + orig_query=user_input.query, # Store the original query created_at=datetime.now().isoformat(), ) @@ -100,7 +100,7 @@ async def _analyze_input_and_create_tasks( """ # Create planning agent with appropriate tools and instructions agent = Agent( - model=OpenRouter(id="openai/gpt-4o-mini"), + model=OpenRouter(id=os.getenv("PLANNER_MODEL_ID", "openai/gpt-4o-mini")), tools=[ UserControlFlowTools(), self.tool_get_agent_description, @@ -217,7 +217,10 @@ def tool_get_agent_description(self, agent_name: str) -> str: """ self.agent_connections.list_remote_agents() if card := self.agent_connections.get_remote_agent_card(agent_name): - return agentcard_to_prompt(card) + if isinstance(card, AgentCard): + return agentcard_to_prompt(card) + if isinstance(card, dict): + return str(card) return "The requested agent could not be found or is not available." diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index d9a3003c1..f6bc0fbc5 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -82,7 +82,7 @@ def sample_plan( plan_id="plan-1", session_id=session_id, user_id=user_id, - query=sample_query, + orig_query=sample_query, tasks=[sample_task], created_at="2025-09-16T10:00:00", ) From 1959011362b5b570bc8a6f26c8e6d77820cbd67b Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 09:23:51 +0800 Subject: [PATCH 06/11] refactor: streamline response handling in agent streaming service --- python/valuecell/server/services/agent_stream_service.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 0cf33726c..2c67efbec 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -48,12 +48,7 @@ async def stream_query_agent( async for response_chunk in self.orchestrator.process_user_input( user_input ): - if ( - response_chunk - and response_chunk.content - and response_chunk.content.strip() - ): - yield response_chunk.content + yield response_chunk.model_dump() except Exception as e: logger.error(f"Error in stream_query_agent: {str(e)}") From 83d0d930595744a7a9bc1926a2abbf7d4a654822 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 09:25:20 +0800 Subject: [PATCH 07/11] refactor: enhance agent description with cautionary stock symbol limitations --- python/configs/agent_cards/hedge_fund_agent.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/configs/agent_cards/hedge_fund_agent.json b/python/configs/agent_cards/hedge_fund_agent.json index 644a1599c..d3fcb8172 100644 --- a/python/configs/agent_cards/hedge_fund_agent.json +++ b/python/configs/agent_cards/hedge_fund_agent.json @@ -1,7 +1,7 @@ { "name": "AIHedgeFundAgent", "url": "http://localhost:10001/", - "description": "AI Hedge Fund Agent provides multi-strategy analysis and investment insights. It can act like various famous analysts and specialists (valuation, technical, sentiment) to analyze stocks, portfolios, and market trends. Use its skills to get tailored perspectives ranging from deep fundamental valuation to technical trading signals.", + "description": "AI Hedge Fund Agent provides multi-strategy analysis and investment insights. It can act like various famous analysts and specialists (valuation, technical, sentiment) to analyze stocks, portfolios, and market trends. **CAUTION**: Only stock symbols in {AAPL, GOOGL, MSFT, NVDA, TSLA} is/are supported and the agent will refuse to answer for other symbols. The input should contain one or more stock symbol(s) from this list. ", "skills": [ { "id": "aswath_damodaran_agent", From 0d6721b9a6070e96c4773d7df46e9eeec1afedd9 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:00:22 +0800 Subject: [PATCH 08/11] refactor: ensure message_id is always required in ProcessMessageData --- python/valuecell/core/coordinate/orchestrator.py | 4 +++- python/valuecell/core/types.py | 2 +- python/valuecell/utils/uuid.py | 4 ++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 677d4cd8d..a711a0e9c 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -18,6 +18,7 @@ ToolCallContent, UserInput, ) +from valuecell.utils.uuid import generate_message_id from .callback import store_task_in_session from .models import ExecutionPlan @@ -378,7 +379,7 @@ def _create_message( event=event, data=ProcessMessageData( conversation_id=conversation_id, - message_id=message_id, + message_id=message_id or generate_message_id(), content=content, ), ) @@ -396,6 +397,7 @@ def _create_tool_message( event=event, data=ProcessMessageData( conversation_id=conversation_id, + message_id=generate_message_id(), content=ToolCallContent( tool_call_id=tool_call_id, tool_name=tool_name, diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index e1d264a61..f5463637d 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -103,7 +103,7 @@ class ToolCallContent(BaseModel): class ProcessMessageData(BaseModel): conversation_id: str = Field(..., description="Conversation ID for this request") - message_id: Optional[str] = Field(None, description="Message ID for this request") + message_id: str = Field(..., description="Message ID for this request") content: str | ToolCallContent = Field( ..., description="Content of the message chunk" ) diff --git a/python/valuecell/utils/uuid.py b/python/valuecell/utils/uuid.py index a904c06b2..501c95f8b 100644 --- a/python/valuecell/utils/uuid.py +++ b/python/valuecell/utils/uuid.py @@ -6,3 +6,7 @@ def generate_uuid(prefix: str = None) -> str: return str(uuid4().hex) return f"{prefix}-{uuid4().hex}" + + +def generate_message_id() -> str: + return generate_uuid("msg") From c13ba3744e99b23aef0d8a8477fcfa797ab4fa2f Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:12:22 +0800 Subject: [PATCH 09/11] refactor: enhance agent model initialization and improve JSON response handling in planner --- python/valuecell/core/coordinate/planner.py | 11 +++++++++-- python/valuecell/core/coordinate/planner_prompts.py | 5 ++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index fc3aa06ee..21cb98ffc 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -100,7 +100,10 @@ async def _analyze_input_and_create_tasks( """ # Create planning agent with appropriate tools and instructions agent = Agent( - model=OpenRouter(id=os.getenv("PLANNER_MODEL_ID", "openai/gpt-4o-mini")), + model=OpenRouter( + id=os.getenv("PLANNER_MODEL_ID", "openai/gpt-4o-mini"), + max_tokens=None, + ), tools=[ UserControlFlowTools(), self.tool_get_agent_description, @@ -147,7 +150,11 @@ async def _analyze_input_and_create_tasks( # Parse planning result and create tasks try: - plan_raw = PlannerResponse.model_validate_json(run_response.content) + plan_content = run_response.content + if plan_content.startswith("```json\n") and plan_content.endswith("\n```"): + # Strip markdown code block if present + plan_content = "\n".join(plan_content.split("\n")[1:-1]) + plan_raw = PlannerResponse.model_validate_json(plan_content) except Exception as e: raise ValueError( f"Planner produced invalid JSON for PlannerResponse: {e}. " diff --git a/python/valuecell/core/coordinate/planner_prompts.py b/python/valuecell/core/coordinate/planner_prompts.py index 534ca236f..6180fccc8 100644 --- a/python/valuecell/core/coordinate/planner_prompts.py +++ b/python/valuecell/core/coordinate/planner_prompts.py @@ -33,10 +33,9 @@ def create_prompt_with_datetime(base_prompt: str) -> str: - Ask only one clarification question at a time - Wait for user response before asking additional questions - Generate clear, specific prompts suitable for AI model execution -- Output must be valid JSON following the Response Format -- Output will be parsed programmatically, so ensure strict adherence to the format and do not include any extra text +- Output must be valid JSON string following the Response Format -**Response Format:** +**Response Format (return exactly this structure as JSON string; no extra keys, no comments, no backticks, no markdown format):** { "tasks": [ { From dae0cdcf5d61c31fee951708309157d8162555b2 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:14:57 +0800 Subject: [PATCH 10/11] fix: orchestrator tests --- python/valuecell/core/coordinate/tests/test_orchestrator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index f6bc0fbc5..e78370a54 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -309,8 +309,8 @@ async def test_planner_error( out.append(chunk) assert len(out) == 1 - assert "(Error)" in out[0].content - assert "Planning failed" in out[0].content + assert "(Error)" in out[0].data.content + assert "Planning failed" in out[0].data.content @pytest.mark.asyncio @@ -328,7 +328,7 @@ async def test_agent_connection_error( async for chunk in orchestrator.process_user_input(sample_user_input): out.append(chunk) - assert any("(Error)" in c.content for c in out) + assert any("(Error)" in c.data.content for c in out) @pytest.mark.asyncio From 6bf8a67a113505d12fbc2f29f3b4a1dfe3fff24e Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:15:24 +0800 Subject: [PATCH 11/11] fix: format --- python/valuecell/core/coordinate/models.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/coordinate/models.py b/python/valuecell/core/coordinate/models.py index f91669e94..ff3732ef5 100644 --- a/python/valuecell/core/coordinate/models.py +++ b/python/valuecell/core/coordinate/models.py @@ -18,7 +18,9 @@ class ExecutionPlan(BaseModel): ..., description="Session ID this plan belongs to" ) user_id: str = Field(..., description="User ID who requested this plan") - orig_query: str = Field(..., description="Original user query that generated this plan") + orig_query: str = Field( + ..., description="Original user query that generated this plan" + ) tasks: List[Task] = Field(default_factory=list, description="Tasks to execute") created_at: str = Field(..., description="Plan creation timestamp")