From ca669cfcce303031c5cfabdae72b0c947426bc5c Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 22 Sep 2025 18:52:09 +0800 Subject: [PATCH 01/12] feature: introduce thread_id and subtask_id and corresponding models --- python/valuecell/core/agent/decorator.py | 17 +- python/valuecell/core/agent/responses.py | 65 ++--- .../valuecell/core/coordinate/orchestrator.py | 272 +++++++++--------- python/valuecell/core/coordinate/planner.py | 1 + python/valuecell/core/coordinate/response.py | 182 ++++++++++++ python/valuecell/core/types.py | 165 +++++++++-- .../server/services/agent_stream_service.py | 2 +- python/valuecell/utils/uuid.py | 4 + 8 files changed, 514 insertions(+), 194 deletions(-) create mode 100644 python/valuecell/core/coordinate/response.py diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index c3eb91e56..a0a2e58b6 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -33,6 +33,8 @@ NotifyResponseEvent, StreamResponse, StreamResponseEvent, + SystemResponseEvent, + _TaskResponseEvent, ) from valuecell.utils import ( get_agent_card_path, @@ -184,9 +186,11 @@ async def _add_chunk( if not response.content: return - response_event = response.event parts = [Part(root=TextPart(text=response.content))] - metadata = {"response_event": response_event.value} + metadata = { + "response_event": response.event.value, + "subtask_id": response.subtask_id, + } await updater.add_artifact( parts=parts, artifact_id=artifact_id, @@ -228,6 +232,7 @@ async def _add_chunk( "tool_call_id": response.metadata.get("tool_call_id"), "tool_name": response.metadata.get("tool_name"), "tool_result": response.metadata.get("content"), + "subtask_id": response.subtask_id, }, ) continue @@ -253,17 +258,13 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None def is_task_completed(response_type: str) -> bool: return response_type in { - StreamResponseEvent.TASK_DONE, - StreamResponseEvent.TASK_FAILED, - NotifyResponseEvent.TASK_DONE, - NotifyResponseEvent.TASK_FAILED, + _TaskResponseEvent.TASK_COMPLETED, } def is_task_failed(response_type: str) -> bool: return response_type in { - StreamResponseEvent.TASK_FAILED, - NotifyResponseEvent.TASK_FAILED, + SystemResponseEvent.TASK_FAILED, } diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index c44e94992..89f22b82a 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -1,24 +1,3 @@ -"""User-facing response constructors under valuecell.core.agent. - -Prefer importing from here if you're already working inside the core.agent -namespace. For a stable top-level import, you can also use -`valuecell.responses` which provides the same API. - -Example: - from valuecell.core.agent.responses import stream, notify - # Or explicit aliases for clarity: - from valuecell.core.agent.responses import streaming, notification - - yield stream.message_chunk("Thinking…") - yield stream.reasoning("Plan: 1) fetch 2) analyze") - yield stream.tool_call_start("call_1", "search") - yield stream.tool_call_result('{"items": 12}', "call_1", "search") - yield stream.done() - - send(notify.message("Task submitted")) - send(notify.done("OK")) -""" - from __future__ import annotations from typing import Optional @@ -28,50 +7,70 @@ NotifyResponseEvent, StreamResponse, StreamResponseEvent, + SystemResponseEvent, ToolCallContent, + _TaskResponseEvent, ) class _StreamResponseNamespace: """Factory methods for streaming responses.""" - def message_chunk(self, content: str) -> StreamResponse: - return StreamResponse(event=StreamResponseEvent.MESSAGE_CHUNK, content=content) + def message_chunk( + self, content: str, subtask_id: str | None = None + ) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.MESSAGE_CHUNK, + content=content, + subtask_id=subtask_id, + ) - def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse: + def tool_call_started( + self, tool_call_id: str, tool_name: str, subtask_id: str | None = None + ) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.TOOL_CALL_STARTED, metadata=ToolCallContent( - tool_call_id=tool_call_id, tool_name=tool_name + tool_call_id=tool_call_id, + tool_name=tool_name, ).model_dump(), + subtask_id=subtask_id, ) def tool_call_completed( - self, tool_result: str, tool_call_id: str, tool_name: str + self, + tool_result: str, + tool_call_id: str, + tool_name: str, + subtask_id: str | None = None, ) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.TOOL_CALL_COMPLETED, metadata=ToolCallContent( - tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_result=tool_result, ).model_dump(), + subtask_id=subtask_id, ) - def reasoning(self, content: str) -> StreamResponse: + def reasoning(self, content: str, subtask_id: str | None = None) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.REASONING, content=content, + subtask_id=subtask_id, ) def done(self, content: Optional[str] = None) -> StreamResponse: return StreamResponse( content=content, - event=StreamResponseEvent.TASK_DONE, + event=_TaskResponseEvent.TASK_COMPLETED, ) def failed(self, content: Optional[str] = None) -> StreamResponse: return StreamResponse( content=content, - event=StreamResponseEvent.TASK_FAILED, + event=SystemResponseEvent.TASK_FAILED, ) @@ -90,13 +89,13 @@ def message(self, content: str) -> NotifyResponse: def done(self, content: Optional[str] = None) -> NotifyResponse: return NotifyResponse( content=content, - event=NotifyResponseEvent.TASK_DONE, + event=_TaskResponseEvent.TASK_COMPLETED, ) def failed(self, content: Optional[str] = None) -> NotifyResponse: return NotifyResponse( content=content, - event=NotifyResponse.TASK_FAILED, + event=SystemResponseEvent.TASK_FAILED, ) @@ -106,6 +105,4 @@ def failed(self, content: Optional[str] = None) -> NotifyResponse: __all__ = [ "streaming", "notification", - "StreamResponse", - "NotifyResponse", ] diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index a711a0e9c..dd7b2ff56 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -6,19 +6,18 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from a2a.utils import get_message_text from valuecell.core.agent.connect import get_default_remote_connections -from valuecell.core.agent.decorator import is_tool_call, is_task_completed +from valuecell.core.agent.decorator import is_task_completed, is_tool_call +from valuecell.core.coordinate.response import ResponseFactory from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager from valuecell.core.task.models import TaskPattern from valuecell.core.types import ( + BaseResponse, NotifyResponseEvent, - ProcessMessage, - ProcessMessageData, StreamResponseEvent, - ToolCallContent, UserInput, ) -from valuecell.utils.uuid import generate_message_id +from valuecell.utils.uuid import generate_thread_id from .callback import store_task_in_session from .models import ExecutionPlan @@ -34,9 +33,10 @@ class ExecutionContext: """Manages the state of an interrupted execution for resumption""" - def __init__(self, stage: str, session_id: str, user_id: str): + def __init__(self, stage: str, session_id: str, thread_id: str, user_id: str): self.stage = stage self.session_id = session_id + self.thread_id = thread_id self.user_id = user_id self.created_at = asyncio.get_event_loop().time() self.metadata: Dict = {} @@ -120,11 +120,13 @@ def __init__(self): # Initialize planner self.planner = ExecutionPlanner(self.agent_connections) + self._response_factory = ResponseFactory() + # ==================== Public API Methods ==================== async def process_user_input( self, user_input: UserInput - ) -> AsyncGenerator[ProcessMessage, None]: + ) -> AsyncGenerator[BaseResponse, None]: """ Main entry point for processing user requests with Human-in-the-Loop support. @@ -144,19 +146,27 @@ async def process_user_input( try: # Ensure session exists - session = await self._ensure_session_exists(session_id, user_id) + session = await self.session_manager.get_session(session_id) + if not session: + await self.session_manager.create_session( + user_id, session_id=session_id + ) + session = await self.session_manager.get_session(session_id) + yield self._response_factory.conversation_started( + conversation_id=session_id + ) # Handle session continuation vs new request if session.status == SessionStatus.REQUIRE_USER_INPUT: - async for message in self._handle_session_continuation(user_input): - yield message + async for response in self._handle_session_continuation(user_input): + yield response else: - async for message in self._handle_new_request(user_input): - yield message + async for response in self._handle_new_request(user_input): + yield response except Exception as e: logger.exception(f"Error processing user input for session {session_id}") - yield self._create_error_message( + yield self._response_factory.system_failed( f"Error processing request: {str(e)}", session_id ) @@ -218,8 +228,6 @@ async def cleanup(self): # ==================== Private Helper Methods ==================== - # ==================== Private Helper Methods ==================== - async def _handle_user_input_request(self, request: UserInputRequest): """Handle user input request from planner""" # Extract session_id from request context @@ -227,26 +235,18 @@ async def _handle_user_input_request(self, request: UserInputRequest): if session_id: self.user_input_manager.add_request(session_id, request) - async def _ensure_session_exists(self, session_id: str, user_id: str): - """Ensure a session exists, creating it if necessary""" - session = await self.session_manager.get_session(session_id) - if not session: - await self.session_manager.create_session(user_id, session_id=session_id) - session = await self.session_manager.get_session(session_id) - return session - async def _handle_session_continuation( self, user_input: UserInput - ) -> AsyncGenerator[ProcessMessage, None]: + ) -> AsyncGenerator[BaseResponse, None]: """Handle continuation of an interrupted session""" session_id = user_input.meta.session_id user_id = user_input.meta.user_id # Validate execution context exists if session_id not in self._execution_contexts: - yield self._create_error_message( - "No execution context found for this session. The session may have expired.", + yield self._response_factory.system_failed( session_id, + "No execution context found for this session. The session may have expired.", ) return @@ -254,9 +254,9 @@ async def _handle_session_continuation( # Validate context integrity and user consistency if not self._validate_execution_context(context, user_id): - yield self._create_error_message( - "Invalid execution context or user mismatch.", + yield self._response_factory.system_failed( session_id, + "Invalid execution context or user mismatch.", ) await self._cancel_execution(session_id) return @@ -272,16 +272,17 @@ async def _handle_session_continuation( yield chunk # TODO: Add support for resuming execution stage if needed else: - yield self._create_error_message( - "Resuming execution stage is not yet supported.", + yield self._response_factory.system_failed( session_id, + "Resuming execution stage is not yet supported.", ) async def _handle_new_request( self, user_input: UserInput - ) -> AsyncGenerator[ProcessMessage, None]: + ) -> AsyncGenerator[BaseResponse, None]: """Handle a new user request""" session_id = user_input.meta.session_id + thread_id = generate_thread_id() # Add user message to session await self.session_manager.add_message( @@ -297,7 +298,7 @@ async def _handle_new_request( # Monitor planning progress async for chunk in self._monitor_planning_task( - planning_task, user_input, context_aware_callback + planning_task, thread_id, user_input, context_aware_callback ): yield chunk @@ -311,8 +312,12 @@ async def context_aware_handle(request): return context_aware_handle async def _monitor_planning_task( - self, planning_task, user_input: UserInput, callback - ) -> AsyncGenerator[ProcessMessage, None]: + self, + planning_task: asyncio.Task, + thread_id: str, + user_input: UserInput, + callback, + ) -> AsyncGenerator[BaseResponse, None]: """Monitor planning task and handle user input interruptions""" session_id = user_input.meta.session_id user_id = user_input.meta.user_id @@ -321,7 +326,7 @@ async def _monitor_planning_task( while not planning_task.done(): if self.has_pending_user_input(session_id): # Save planning context - context = ExecutionContext("planning", session_id, user_id) + context = ExecutionContext("planning", session_id, thread_id, user_id) context.add_metadata( original_user_input=user_input, planning_task=planning_task, @@ -331,8 +336,8 @@ async def _monitor_planning_task( # Update session status and send user input request await self._request_user_input(session_id) - yield self._create_user_input_request( - self.get_user_input_prompt(session_id), session_id + yield self._response_factory.plan_require_user_input( + session_id, thread_id, self.get_user_input_prompt(session_id) ) return @@ -340,7 +345,7 @@ async def _monitor_planning_task( # Planning completed, execute plan plan = await planning_task - async for chunk in self._execute_plan_with_input_support(plan): + async for chunk in self._execute_plan_with_input_support(plan, thread_id): yield chunk async def _request_user_input(self, session_id: str): @@ -365,78 +370,20 @@ def _validate_execution_context( return True - def _create_message( - self, - content: str, - conversation_id: str, - event: ( - StreamResponseEvent | NotifyResponseEvent - ) = StreamResponseEvent.MESSAGE_CHUNK, - message_id: Optional[str] = None, - ) -> ProcessMessage: - """Create a ProcessMessage for plain text content using the new schema.""" - return ProcessMessage( - event=event, - data=ProcessMessageData( - conversation_id=conversation_id, - message_id=message_id or generate_message_id(), - content=content, - ), - ) - - def _create_tool_message( - self, - event: StreamResponseEvent | NotifyResponseEvent, - conversation_id: str, - tool_call_id: str, - tool_name: str, - tool_result: Optional[str] = None, - ) -> ProcessMessage: - """Create a ProcessMessage for tool call events with ToolCallContent.""" - return ProcessMessage( - event=event, - data=ProcessMessageData( - conversation_id=conversation_id, - message_id=generate_message_id(), - content=ToolCallContent( - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_result=tool_result, - ), - ), - ) - - def _create_error_message(self, error_msg: str, session_id: str) -> ProcessMessage: - """Create an error ProcessMessage with standardized format (TASK_FAILED).""" - return self._create_message( - content=f"(Error): {error_msg}", - conversation_id=session_id, - event=StreamResponseEvent.TASK_FAILED, - ) - - def _create_user_input_request( - self, - prompt: str, - session_id: str, - ) -> ProcessMessage: - """Create a user input request ProcessMessage. The consumer should parse the prefix.""" - return self._create_message( - content=f"USER_INPUT_REQUIRED:{prompt}", - conversation_id=session_id, - event=StreamResponseEvent.MESSAGE_CHUNK, - ) - async def _continue_planning( self, session_id: str, context: ExecutionContext - ) -> AsyncGenerator[ProcessMessage, None]: + ) -> AsyncGenerator[BaseResponse, None]: """Resume planning stage execution""" planning_task = context.get_metadata("planning_task") original_user_input = context.get_metadata("original_user_input") + thread_id = generate_thread_id() + context.thread_id = thread_id if not all([planning_task, original_user_input]): - yield self._create_error_message( - "Invalid planning context - missing required data", + yield self._response_factory.plan_failed( session_id, + thread_id, + "Invalid planning context - missing required data", ) await self._cancel_execution(session_id) return @@ -448,7 +395,9 @@ async def _continue_planning( prompt = self.get_user_input_prompt(session_id) # Ensure session is set to require user input again for repeated prompts await self._request_user_input(session_id) - yield self._create_user_input_request(prompt, session_id) + yield self._response_factory.plan_require_user_input( + session_id, thread_id, prompt + ) return await asyncio.sleep(ASYNC_SLEEP_INTERVAL) @@ -457,7 +406,7 @@ async def _continue_planning( plan = await planning_task del self._execution_contexts[session_id] - async for message in self._execute_plan_with_input_support(plan): + async for message in self._execute_plan_with_input_support(plan, thread_id): yield message async def _cancel_execution(self, session_id: str): @@ -501,8 +450,8 @@ async def _cleanup_expired_contexts( # ==================== Plan and Task Execution Methods ==================== async def _execute_plan_with_input_support( - self, plan: ExecutionPlan, metadata: Optional[dict] = None - ) -> AsyncGenerator[ProcessMessage, None]: + self, plan: ExecutionPlan, thread_id: str, metadata: Optional[dict] = None + ) -> AsyncGenerator[BaseResponse, None]: """ Execute an execution plan with Human-in-the-Loop support. @@ -516,8 +465,8 @@ async def _execute_plan_with_input_support( session_id = plan.session_id if not plan.tasks: - yield self._create_error_message( - "No tasks found for this request.", session_id + yield self._response_factory.plan_failed( + session_id, thread_id, "No tasks found for this request." ) return @@ -530,20 +479,20 @@ async def _execute_plan_with_input_support( await self.task_manager.store.save_task(task) # Execute task with input support - async for message in self._execute_task_with_input_support( - task, metadata + async for response in self._execute_task_with_input_support( + task, thread_id, metadata ): # Accumulate based on event - if message.event in { + if response.event in { StreamResponseEvent.MESSAGE_CHUNK, StreamResponseEvent.REASONING, NotifyResponseEvent.MESSAGE, - } and isinstance(message.data.content, str): - agent_responses[task.agent_name] += message.data.content - yield message + } and isinstance(response.data.data.content, str): + agent_responses[task.agent_name] += response.data.data.content + yield response if ( - is_task_completed(message.event) + is_task_completed(response.event) or task.pattern == TaskPattern.RECURRING ): if agent_responses[task.agent_name].strip(): @@ -558,14 +507,21 @@ async def _execute_plan_with_input_support( except Exception as e: error_msg = f"Error executing {task.agent_name}: {str(e)}" logger.exception(f"Task execution failed: {error_msg}") - yield self._create_error_message(error_msg, session_id) + yield self._response_factory.task_failed( + session_id, + thread_id, + task.task_id, + _generate_task_default_subtask_id(task.task_id), + error_msg, + ) # Save any remaining agent responses await self._save_remaining_responses(session_id, agent_responses) + yield self._response_factory.done(session_id, thread_id) async def _execute_task_with_input_support( - self, task: Task, metadata: Optional[dict] = None - ) -> AsyncGenerator[ProcessMessage, None]: + self, task: Task, thread_id: str, metadata: Optional[dict] = None + ) -> AsyncGenerator[BaseResponse, None]: """ Execute a single task with user input interruption support. @@ -617,7 +573,13 @@ async def _execute_task_with_input_support( if state == TaskState.failed: err_msg = get_message_text(event.status.message) await self.task_manager.fail_task(task.task_id, err_msg) - yield self._create_error_message(err_msg, task.session_id) + yield self._response_factory.task_failed( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=_generate_task_default_subtask_id(task.task_id), + content=err_msg, + ) return # if state == TaskState.input_required: # Handle tool call start @@ -625,28 +587,67 @@ async def _execute_task_with_input_support( continue response_event = event.metadata.get("response_event") if state == TaskState.working and is_tool_call(response_event): - yield self._create_tool_message( - response_event, - task.session_id, - tool_call_id=event.metadata.get("tool_call_id", ""), - tool_name=event.metadata.get("tool_name", ""), - tool_result=event.metadata.get("tool_result"), + subtask_id = ( + event.metadata.get("subtask_id") if event.metadata else None ) + if not subtask_id: + subtask_id = _generate_task_default_subtask_id(task.task_id) + tool_call_id = event.metadata.get("tool_call_id", "") + tool_name = event.metadata.get("tool_name", "") + if response_event == StreamResponseEvent.TOOL_CALL_STARTED: + yield self._response_factory.tool_call_started( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + tool_call_id=tool_call_id, + tool_name=tool_name, + ) + continue + + if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: + tool_call_result = get_message_text( + event.metadata.get("tool_result", "") + ) + yield self._response_factory.tool_call_result( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_call_result=tool_call_result, + ) + continue + continue elif isinstance(event, TaskArtifactUpdateEvent): - yield self._create_message( - get_message_text(event.artifact, ""), - task.session_id, - event=StreamResponseEvent.MESSAGE_CHUNK, + artifact = event.artifact + subtask_id = ( + artifact.metadata.get("subtask_id") + if artifact.metadata + else None + ) + if not subtask_id: + subtask_id = _generate_task_default_subtask_id(task.task_id) + response_event = artifact.metadata.get("response_event") + yield self._response_factory.message_response_general( + event=response_event, + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + content=get_message_text(artifact, ""), ) # Complete task successfully await self.task_manager.complete_task(task.task_id) - yield self._create_message( - "", - task.session_id, - event=StreamResponseEvent.TASK_DONE, + yield self._response_factory.task_completed( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=_generate_task_default_subtask_id(task.task_id), ) except Exception as e: @@ -662,6 +663,11 @@ async def _save_remaining_responses(self, session_id: str, agent_responses: dict ) +def _generate_task_default_subtask_id(task_id: str) -> str: + """Generate a default subtask ID based on the main task ID""" + return f"{task_id}-default_subtask" + + # ==================== Module-level Factory Function ==================== _orchestrator = AgentOrchestrator() diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index 21cb98ffc..33a65bb1e 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -131,6 +131,7 @@ async def _analyze_input_and_create_tasks( for field in input_schema: if user_input_callback: # Use callback for async user input + # TODO: prompt options if available request = UserInputRequest(field.description) await user_input_callback(request) user_value = await request.wait_for_response() diff --git a/python/valuecell/core/coordinate/response.py b/python/valuecell/core/coordinate/response.py new file mode 100644 index 000000000..a72cd8e26 --- /dev/null +++ b/python/valuecell/core/coordinate/response.py @@ -0,0 +1,182 @@ +from typing_extensions import Literal +from valuecell.core.types import ( + BaseResponseDataContent, + ConversationStartedResponse, + DoneResponse, + MessageResponse, + NotifyResponseEvent, + PlanFailedResponse, + PlanRequireUserInputResponse, + ReasoningResponse, + StreamResponseEvent, + SystemFailedResponse, + TaskCompletedResponse, + TaskFailedResponse, + ToolCallCompletedResponse, + ToolCallContent, + ToolCallStartedResponse, + UnifiedResponseData, +) + + +class ResponseFactory: + def conversation_started(self, conversation_id: str) -> ConversationStartedResponse: + return ConversationStartedResponse( + data=UnifiedResponseData(conversation_id=conversation_id) + ) + + def system_failed(self, conversation_id: str, content: str) -> SystemFailedResponse: + return SystemFailedResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + data=BaseResponseDataContent(content=content), + ) + ) + + def done(self, conversation_id: str, thread_id: str) -> DoneResponse: + return DoneResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + ) + ) + + def plan_require_user_input( + self, conversation_id: str, thread_id: str, content: str + ) -> PlanRequireUserInputResponse: + return PlanRequireUserInputResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + data=BaseResponseDataContent(content=content), + ) + ) + + def plan_failed( + self, conversation_id: str, thread_id: str, content: str + ) -> PlanFailedResponse: + return PlanFailedResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + data=BaseResponseDataContent(content=content), + ) + ) + + def task_failed( + self, + conversation_id: str, + thread_id: str, + task_id: str, + subtask_id: str | None, + content: str, + ) -> TaskFailedResponse: + return TaskFailedResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + subtask_id=subtask_id, + data=BaseResponseDataContent(content=content), + ) + ) + + def task_completed( + self, + conversation_id: str, + thread_id: str, + task_id: str, + subtask_id: str | None, + ) -> TaskCompletedResponse: + return TaskCompletedResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + subtask_id=subtask_id, + ), + ) + + def tool_call_started( + self, + conversation_id: str, + thread_id: str, + task_id: str, + subtask_id: str, + tool_call_id: str, + tool_name: str, + ) -> ToolCallStartedResponse: + return ToolCallStartedResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + subtask_id=subtask_id, + data=ToolCallContent( + tool_call_id=tool_call_id, + tool_name=tool_name, + ), + ) + ) + + def tool_call_result( + self, + conversation_id: str, + thread_id: str, + task_id: str, + subtask_id: str, + tool_call_id: str, + tool_name: str, + tool_call_result: str, + ) -> ToolCallCompletedResponse: + return ToolCallCompletedResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + subtask_id=subtask_id, + data=ToolCallContent( + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_result=tool_call_result, + ), + ) + ) + + def message_response_general( + self, + event: Literal[StreamResponseEvent.MESSAGE_CHUNK, NotifyResponseEvent.MESSAGE], + conversation_id: str, + thread_id: str, + task_id: str, + subtask_id: str, + content: str, + ) -> MessageResponse: + return MessageResponse( + event=event, + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + subtask_id=subtask_id, + data=BaseResponseDataContent(content=content), + ), + ) + + def reasoning( + self, + conversation_id: str, + thread_id: str, + task_id: str, + subtask_id: str, + content: str, + ) -> ReasoningResponse: + return ReasoningResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + subtask_id=subtask_id, + data=BaseResponseDataContent(content=content), + ), + ) diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index f5463637d..c6ae93c84 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import AsyncGenerator, Callable, Optional +from typing import AsyncGenerator, Callable, Literal, Optional, Union from a2a.types import Task, TaskArtifactUpdateEvent, TaskStatusUpdateEvent from pydantic import BaseModel, Field @@ -47,19 +47,30 @@ def clear_desired_agent(self) -> None: self.desired_agent_name = None +class SystemResponseEvent(str, Enum): + CONVERSATION_STARTED = "conversation_started" + PLAN_REQUIRE_USER_INPUT = "plan_require_user_input" + PLAN_FAILED = "plan_failed" + TASK_FAILED = "task_failed" + SYSTEM_FAILED = "system_failed" + DONE = "done" + + +class _TaskResponseEvent(str, Enum): + TASK_STARTED = "task_started" + TASK_COMPLETED = "task_completed" + + class StreamResponseEvent(str, Enum): MESSAGE_CHUNK = "message_chunk" + COMPONENT_GENERATOR = "component_generator" TOOL_CALL_STARTED = "tool_call_started" TOOL_CALL_COMPLETED = "tool_call_completed" REASONING = "reasoning" - TASK_DONE = "task_done" - TASK_FAILED = "task_failed" class NotifyResponseEvent(str, Enum): MESSAGE = "message" - TASK_DONE = "task_done" - TASK_FAILED = "task_failed" class StreamResponse(BaseModel): @@ -69,7 +80,7 @@ class StreamResponse(BaseModel): None, description="The content of the stream response, typically a chunk of data or message.", ) - event: StreamResponseEvent = Field( + event: StreamResponseEvent | _TaskResponseEvent = Field( ..., description="The type of stream response, indicating its purpose or content nature.", ) @@ -77,6 +88,10 @@ class StreamResponse(BaseModel): None, description="Optional metadata providing additional context about the response", ) + subtask_id: Optional[str] = Field( + None, + description="Optional subtask ID if the response is related to a specific subtask", + ) class NotifyResponse(BaseModel): @@ -86,7 +101,7 @@ class NotifyResponse(BaseModel): ..., description="The content of the notification response", ) - event: NotifyResponseEvent = Field( + event: NotifyResponseEvent | _TaskResponseEvent = Field( ..., description="The type of notification response", ) @@ -101,24 +116,138 @@ class ToolCallContent(BaseModel): ) -class ProcessMessageData(BaseModel): - conversation_id: str = Field(..., description="Conversation ID for this request") - message_id: str = Field(..., description="Message ID for this request") - content: str | ToolCallContent = Field( - ..., description="Content of the message chunk" +class BaseResponseDataContent(BaseModel, ABC): + content: str = Field(..., description="The message content") + + +class ComponentGeneratorResponseDataContent(BaseResponseDataContent): + component_type: str = Field(..., description="The component type") + + +ResponsePayload = Union[ + BaseResponseDataContent, + ComponentGeneratorResponseDataContent, + ToolCallContent, +] + + +class UnifiedResponseData(BaseModel): + """Unified response data structure with optional hierarchy fields. + + Field names are preserved to maintain JSON compatibility when using + model_dump(exclude_none=True). + """ + + conversation_id: str = Field(..., description="Unique ID for the conversation") + thread_id: Optional[str] = Field( + None, description="Unique ID for the message thread" + ) + task_id: Optional[str] = Field(None, description="Unique ID for the task") + subtask_id: Optional[str] = Field( + None, description="Unique ID for the subtask, if any" + ) + data: Optional[ResponsePayload] = Field(None, description="The message data payload") + + +class BaseResponse(BaseModel, ABC): + """Top-level response envelope used for all events.""" + + event: StreamResponseEvent | NotifyResponseEvent | SystemResponseEvent = Field( + ..., description="The event type of the response" + ) + data: UnifiedResponseData = Field( + ..., description="The data payload of the response" + ) + + +class ConversationStartedResponse(BaseResponse): + event: Literal[SystemResponseEvent.CONVERSATION_STARTED] = Field( + SystemResponseEvent.CONVERSATION_STARTED, + description="The event type of the response", ) -class ProcessMessage(BaseModel): - """Chunk of a message, useful for streaming responses""" +class PlanRequireUserInputResponse(BaseResponse): + event: Literal[SystemResponseEvent.PLAN_REQUIRE_USER_INPUT] = Field( + SystemResponseEvent.PLAN_REQUIRE_USER_INPUT, + description="The event type of the response", + ) + data: UnifiedResponseData = Field(..., description="The plan data payload") + + +class MessageResponse(BaseResponse): + event: Literal[ + StreamResponseEvent.MESSAGE_CHUNK, + NotifyResponseEvent.MESSAGE, + ] = Field(..., description="The event type of the response") + data: UnifiedResponseData = Field(..., description="The complete message content") + + +class ComponentGeneratorResponse(BaseResponse): + event: Literal[StreamResponseEvent.COMPONENT_GENERATOR] = Field( + StreamResponseEvent.COMPONENT_GENERATOR + ) + data: UnifiedResponseData = Field(..., description="The component generator data") + + +class ToolCallStartedResponse(BaseResponse): + event: Literal[StreamResponseEvent.TOOL_CALL_STARTED] = Field( + StreamResponseEvent.TOOL_CALL_STARTED, + description="The event type of the response", + ) + data: UnifiedResponseData = Field(..., description="The task data payload") + + +class ToolCallCompletedResponse(BaseResponse): + event: Literal[StreamResponseEvent.TOOL_CALL_COMPLETED] = Field( + StreamResponseEvent.TOOL_CALL_COMPLETED, + description="The event type of the response", + ) + data: UnifiedResponseData = Field(..., description="The task data payload") + + +class ReasoningResponse(BaseResponse): + event: Literal[StreamResponseEvent.REASONING] = Field( + StreamResponseEvent.REASONING, description="The event type of the response" + ) + data: UnifiedResponseData = Field(..., description="The reasoning message content") + + +class DoneResponse(BaseResponse): + event: Literal[SystemResponseEvent.DONE] = Field( + SystemResponseEvent.DONE, description="The event type of the response" + ) + data: UnifiedResponseData = Field(..., description="The thread data payload") + + +class PlanFailedResponse(BaseResponse): + event: Literal[SystemResponseEvent.PLAN_FAILED] = Field( + SystemResponseEvent.PLAN_FAILED, description="The event type of the response" + ) + data: UnifiedResponseData = Field(..., description="The plan data payload") + + +class TaskFailedResponse(BaseResponse): + event: Literal[SystemResponseEvent.TASK_FAILED] = Field( + SystemResponseEvent.TASK_FAILED, description="The event type of the response" + ) + data: UnifiedResponseData = Field(..., description="The task data payload") + + +class TaskCompletedResponse(BaseResponse): + event: Literal[_TaskResponseEvent.TASK_COMPLETED] = Field( + _TaskResponseEvent.TASK_COMPLETED, description="The event type of the response" + ) + data: UnifiedResponseData = Field(..., description="The task data payload") + - event: StreamResponseEvent | NotifyResponseEvent = Field( - ..., description="The event type of the message chunk" +class SystemFailedResponse(BaseResponse): + event: Literal[SystemResponseEvent.SYSTEM_FAILED] = Field( + SystemResponseEvent.SYSTEM_FAILED, description="The event type of the response" ) - data: ProcessMessageData = Field(..., description="Content of the message chunk") + data: UnifiedResponseData = Field(..., description="The conversation data payload") -# TODO: keep only essential parameters class BaseAgent(ABC): """ Abstract base class for all agents. diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 2c67efbec..209f5968d 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -48,7 +48,7 @@ async def stream_query_agent( async for response_chunk in self.orchestrator.process_user_input( user_input ): - yield response_chunk.model_dump() + yield response_chunk.model_dump(exclude_none=True) except Exception as e: logger.error(f"Error in stream_query_agent: {str(e)}") diff --git a/python/valuecell/utils/uuid.py b/python/valuecell/utils/uuid.py index 501c95f8b..4e29a9306 100644 --- a/python/valuecell/utils/uuid.py +++ b/python/valuecell/utils/uuid.py @@ -10,3 +10,7 @@ def generate_uuid(prefix: str = None) -> str: def generate_message_id() -> str: return generate_uuid("msg") + + +def generate_thread_id() -> str: + return generate_uuid("th") From a256acc85ca3557011d13a2ce3e81017e261f820 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 09:54:50 +0800 Subject: [PATCH 02/12] feat: implement component generator response handling in orchestrator and response factory --- python/valuecell/core/agent/decorator.py | 6 +++-- python/valuecell/core/agent/responses.py | 10 ++++++++ .../valuecell/core/coordinate/orchestrator.py | 17 ++++++++++++- python/valuecell/core/coordinate/response.py | 24 +++++++++++++++++++ 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index a0a2e58b6..7c6d0f556 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -30,7 +30,6 @@ from valuecell.core.types import ( BaseAgent, NotifyResponse, - NotifyResponseEvent, StreamResponse, StreamResponseEvent, SystemResponseEvent, @@ -187,10 +186,13 @@ async def _add_chunk( return parts = [Part(root=TextPart(text=response.content))] + response_event = response.event metadata = { - "response_event": response.event.value, + "response_event": response_event.value, "subtask_id": response.subtask_id, } + if response_event == StreamResponseEvent.COMPONENT_GENERATOR: + metadata["component_type"] = response.metadata.get("component_type") await updater.add_artifact( parts=parts, artifact_id=artifact_id, diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index 89f22b82a..70c3484d1 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -61,6 +61,16 @@ def reasoning(self, content: str, subtask_id: str | None = None) -> StreamRespon subtask_id=subtask_id, ) + def component_generator( + self, content: str, component_type: str, subtask_id: str | None = None + ) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.COMPONENT_GENERATOR, + content=content, + metadata={"component_type": component_type}, + subtask_id=subtask_id, + ) + def done(self, content: Optional[str] = None) -> StreamResponse: return StreamResponse( content=content, diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index dd7b2ff56..8213fe628 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -632,13 +632,28 @@ async def _execute_task_with_input_support( if not subtask_id: subtask_id = _generate_task_default_subtask_id(task.task_id) response_event = artifact.metadata.get("response_event") + content = get_message_text(artifact, "") + if response_event == StreamResponseEvent.COMPONENT_GENERATOR: + component_type = artifact.metadata.get( + "component_type", "unknown" + ) + yield self._response_factory.component_generator( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + content=content, + component_type=component_type, + ) + continue + yield self._response_factory.message_response_general( event=response_event, conversation_id=task.session_id, thread_id=thread_id, task_id=task.task_id, subtask_id=subtask_id, - content=get_message_text(artifact, ""), + content=content, ) # Complete task successfully diff --git a/python/valuecell/core/coordinate/response.py b/python/valuecell/core/coordinate/response.py index a72cd8e26..7d5c3ffcf 100644 --- a/python/valuecell/core/coordinate/response.py +++ b/python/valuecell/core/coordinate/response.py @@ -1,6 +1,8 @@ from typing_extensions import Literal from valuecell.core.types import ( BaseResponseDataContent, + ComponentGeneratorResponse, + ComponentGeneratorResponseDataContent, ConversationStartedResponse, DoneResponse, MessageResponse, @@ -180,3 +182,25 @@ def reasoning( data=BaseResponseDataContent(content=content), ), ) + + def component_generator( + self, + conversation_id: str, + thread_id: str, + task_id: str, + subtask_id: str, + content: str, + component_type: str, + ) -> ComponentGeneratorResponse: + return ComponentGeneratorResponse( + data=UnifiedResponseData( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + subtask_id=subtask_id, + data=ComponentGeneratorResponseDataContent( + content=content, + component_type=component_type, + ), + ), + ) From 9a723bcd2cb67b790bef3f649642b75625962bb8 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 10:01:50 +0800 Subject: [PATCH 03/12] feat: enhance reasoning handling in agent orchestrator and decorator --- python/valuecell/core/agent/decorator.py | 18 +++++++++++++++++- .../valuecell/core/coordinate/orchestrator.py | 15 ++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 7c6d0f556..e2cbbe649 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -228,7 +228,7 @@ async def _add_chunk( if is_tool_call(response_event): await updater.update_status( TaskState.working, - message=message, + message=new_agent_text_message(response.content or ""), metadata={ "event": response_event.value, "tool_call_id": response.metadata.get("tool_call_id"), @@ -238,6 +238,16 @@ async def _add_chunk( }, ) continue + if is_reasoning(response_event): + await updater.update_status( + TaskState.working, + message=new_agent_text_message(response.content or ""), + metadata={ + "event": response_event.value, + "subtask_id": response.subtask_id, + }, + ) + continue await _add_chunk(response, is_complete=is_complete) if is_complete: @@ -277,6 +287,12 @@ def is_tool_call(response_type: str) -> bool: } +def is_reasoning(response_type: str) -> bool: + return response_type in { + StreamResponseEvent.REASONING, + } + + def _create_agent_executor(agent_instance): return GenericAgentExecutor(agent_instance) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 8213fe628..e14685468 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -6,7 +6,7 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from a2a.utils import get_message_text from valuecell.core.agent.connect import get_default_remote_connections -from valuecell.core.agent.decorator import is_task_completed, is_tool_call +from valuecell.core.agent.decorator import is_reasoning, is_task_completed, is_tool_call from valuecell.core.coordinate.response import ResponseFactory from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager @@ -619,6 +619,19 @@ async def _execute_task_with_input_support( tool_call_result=tool_call_result, ) continue + if state == TaskState.working and is_reasoning(response_event): + subtask_id = ( + event.metadata.get("subtask_id") if event.metadata else None + ) + if not subtask_id: + subtask_id = _generate_task_default_subtask_id(task.task_id) + yield self._response_factory.reasoning( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + content=get_message_text(event.status.message, ""), + ) continue From 21ea4e7dbea3a23029ff24a6c74c059334fc9c2f Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 10:07:20 +0800 Subject: [PATCH 04/12] feat: centralize response event predicates in EventPredicates class --- python/valuecell/core/agent/decorator.py | 37 +++---------------- python/valuecell/core/agent/responses.py | 34 +++++++++++++++++ .../valuecell/core/coordinate/orchestrator.py | 10 ++--- 3 files changed, 45 insertions(+), 36 deletions(-) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index e2cbbe649..6511d01b9 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -32,14 +32,13 @@ NotifyResponse, StreamResponse, StreamResponseEvent, - SystemResponseEvent, - _TaskResponseEvent, ) from valuecell.utils import ( get_agent_card_path, get_next_available_port, parse_host_port, ) +from .responses import EventPredicates logger = logging.getLogger(__name__) @@ -219,13 +218,13 @@ async def _add_chunk( ) response_event = response.event - if is_task_failed(response_event): + if EventPredicates.is_task_failed(response_event): raise RuntimeError( f"Agent {agent_name} reported failure: {response.content}" ) - is_complete = is_task_completed(response_event) - if is_tool_call(response_event): + is_complete = EventPredicates.is_task_completed(response_event) + if EventPredicates.is_tool_call(response_event): await updater.update_status( TaskState.working, message=new_agent_text_message(response.content or ""), @@ -238,7 +237,7 @@ async def _add_chunk( }, ) continue - if is_reasoning(response_event): + if EventPredicates.is_reasoning(response_event): await updater.update_status( TaskState.working, message=new_agent_text_message(response.content or ""), @@ -266,31 +265,7 @@ async def _add_chunk( async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: # Default cancel operation raise ServerError(error=UnsupportedOperationError()) - - -def is_task_completed(response_type: str) -> bool: - return response_type in { - _TaskResponseEvent.TASK_COMPLETED, - } - - -def is_task_failed(response_type: str) -> bool: - return response_type in { - SystemResponseEvent.TASK_FAILED, - } - - -def is_tool_call(response_type: str) -> bool: - return response_type in { - StreamResponseEvent.TOOL_CALL_STARTED, - StreamResponseEvent.TOOL_CALL_COMPLETED, - } - - -def is_reasoning(response_type: str) -> bool: - return response_type in { - StreamResponseEvent.REASONING, - } + def _create_agent_executor(agent_instance): diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index 70c3484d1..d832204cf 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -112,7 +112,41 @@ def failed(self, content: Optional[str] = None) -> NotifyResponse: notification = _NotifyResponseNamespace() +class EventPredicates: + """Utilities to classify response event types. + + These mirror the helper predicates previously defined in decorator.py + and centralize them next to response event definitions. + """ + + @staticmethod + def is_task_completed(response_type) -> bool: + return response_type in { + _TaskResponseEvent.TASK_COMPLETED, + } + + @staticmethod + def is_task_failed(response_type) -> bool: + return response_type in { + SystemResponseEvent.TASK_FAILED, + } + + @staticmethod + def is_tool_call(response_type) -> bool: + return response_type in { + StreamResponseEvent.TOOL_CALL_STARTED, + StreamResponseEvent.TOOL_CALL_COMPLETED, + } + + @staticmethod + def is_reasoning(response_type) -> bool: + return response_type in { + StreamResponseEvent.REASONING, + } + + __all__ = [ "streaming", "notification", + "EventPredicates", ] diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index e14685468..175f7aefa 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -6,7 +6,7 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from a2a.utils import get_message_text from valuecell.core.agent.connect import get_default_remote_connections -from valuecell.core.agent.decorator import is_reasoning, is_task_completed, is_tool_call +from valuecell.core.agent.responses import EventPredicates from valuecell.core.coordinate.response import ResponseFactory from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager @@ -270,7 +270,7 @@ async def _handle_session_continuation( if context.stage == "planning": async for chunk in self._continue_planning(session_id, context): yield chunk - # TODO: Add support for resuming execution stage if needed + # Resuming execution stage is not yet supported else: yield self._response_factory.system_failed( session_id, @@ -492,7 +492,7 @@ async def _execute_plan_with_input_support( yield response if ( - is_task_completed(response.event) + EventPredicates.is_task_completed(response.event) or task.pattern == TaskPattern.RECURRING ): if agent_responses[task.agent_name].strip(): @@ -586,7 +586,7 @@ async def _execute_task_with_input_support( if not event.metadata: continue response_event = event.metadata.get("response_event") - if state == TaskState.working and is_tool_call(response_event): + if state == TaskState.working and EventPredicates.is_tool_call(response_event): subtask_id = ( event.metadata.get("subtask_id") if event.metadata else None ) @@ -619,7 +619,7 @@ async def _execute_task_with_input_support( tool_call_result=tool_call_result, ) continue - if state == TaskState.working and is_reasoning(response_event): + if state == TaskState.working and EventPredicates.is_reasoning(response_event): subtask_id = ( event.metadata.get("subtask_id") if event.metadata else None ) From a1ed471855132300020c4c5ee9b3f1446eb9138b Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 10:21:20 +0800 Subject: [PATCH 05/12] feat: refactor agent orchestrator and response handling for improved clarity and structure --- python/valuecell/core/agent/decorator.py | 1 - .../valuecell/core/coordinate/orchestrator.py | 126 +++---------- .../core/coordinate/response_router.py | 169 ++++++++++++++++++ python/valuecell/core/types.py | 4 +- 4 files changed, 197 insertions(+), 103 deletions(-) create mode 100644 python/valuecell/core/coordinate/response_router.py diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 6511d01b9..613ba73db 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -265,7 +265,6 @@ async def _add_chunk( async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: # Default cancel operation raise ServerError(error=UnsupportedOperationError()) - def _create_agent_executor(agent_instance): diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 175f7aefa..68ac9d384 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -4,10 +4,15 @@ from typing import AsyncGenerator, Dict, Optional from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent -from a2a.utils import get_message_text from valuecell.core.agent.connect import get_default_remote_connections from valuecell.core.agent.responses import EventPredicates from valuecell.core.coordinate.response import ResponseFactory +from valuecell.core.coordinate.response_router import ( + RouteResult, + SideEffectKind, + handle_artifact_update, + handle_status_update, +) from valuecell.core.session import Role, SessionStatus, get_default_session_manager from valuecell.core.task import Task, get_default_task_manager from valuecell.core.task.models import TaskPattern @@ -565,109 +570,28 @@ async def _execute_task_with_input_support( continue if isinstance(event, TaskStatusUpdateEvent): - state = event.status.state - logger.info(f"Task {task.task_id} status update: {state}") - if state in {TaskState.submitted, TaskState.completed}: - continue - # Handle task failure - if state == TaskState.failed: - err_msg = get_message_text(event.status.message) - await self.task_manager.fail_task(task.task_id, err_msg) - yield self._response_factory.task_failed( - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=_generate_task_default_subtask_id(task.task_id), - content=err_msg, - ) - return - # if state == TaskState.input_required: - # Handle tool call start - if not event.metadata: - continue - response_event = event.metadata.get("response_event") - if state == TaskState.working and EventPredicates.is_tool_call(response_event): - subtask_id = ( - event.metadata.get("subtask_id") if event.metadata else None - ) - if not subtask_id: - subtask_id = _generate_task_default_subtask_id(task.task_id) - tool_call_id = event.metadata.get("tool_call_id", "") - tool_name = event.metadata.get("tool_name", "") - if response_event == StreamResponseEvent.TOOL_CALL_STARTED: - yield self._response_factory.tool_call_started( - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=subtask_id, - tool_call_id=tool_call_id, - tool_name=tool_name, + result: RouteResult = await handle_status_update( + self._response_factory, task, thread_id, event, logger + ) + for r in result.responses: + yield r + # Apply side effects + for eff in result.side_effects: + if eff.kind == SideEffectKind.FAIL_TASK: + await self.task_manager.fail_task( + task.task_id, eff.reason or "" ) - continue + if result.done: + return + continue - if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: - tool_call_result = get_message_text( - event.metadata.get("tool_result", "") - ) - yield self._response_factory.tool_call_result( - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=subtask_id, - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_call_result=tool_call_result, - ) - continue - if state == TaskState.working and EventPredicates.is_reasoning(response_event): - subtask_id = ( - event.metadata.get("subtask_id") if event.metadata else None - ) - if not subtask_id: - subtask_id = _generate_task_default_subtask_id(task.task_id) - yield self._response_factory.reasoning( - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=subtask_id, - content=get_message_text(event.status.message, ""), - ) - - continue - - elif isinstance(event, TaskArtifactUpdateEvent): - artifact = event.artifact - subtask_id = ( - artifact.metadata.get("subtask_id") - if artifact.metadata - else None - ) - if not subtask_id: - subtask_id = _generate_task_default_subtask_id(task.task_id) - response_event = artifact.metadata.get("response_event") - content = get_message_text(artifact, "") - if response_event == StreamResponseEvent.COMPONENT_GENERATOR: - component_type = artifact.metadata.get( - "component_type", "unknown" - ) - yield self._response_factory.component_generator( - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=subtask_id, - content=content, - component_type=component_type, - ) - continue - - yield self._response_factory.message_response_general( - event=response_event, - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=subtask_id, - content=content, + if isinstance(event, TaskArtifactUpdateEvent): + responses = await handle_artifact_update( + self._response_factory, task, thread_id, event ) + for r in responses: + yield r + continue # Complete task successfully await self.task_manager.complete_task(task.task_id) diff --git a/python/valuecell/core/coordinate/response_router.py b/python/valuecell/core/coordinate/response_router.py new file mode 100644 index 000000000..514fd922c --- /dev/null +++ b/python/valuecell/core/coordinate/response_router.py @@ -0,0 +1,169 @@ +import logging +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional + +from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent +from a2a.utils import get_message_text +from valuecell.core.agent.responses import EventPredicates +from valuecell.core.coordinate.response import ResponseFactory +from valuecell.core.task import Task +from valuecell.core.types import BaseResponse, StreamResponseEvent + +logger = logging.getLogger(__name__) + + +class SideEffectKind(Enum): + FAIL_TASK = "fail_task" + + +@dataclass +class SideEffect: + kind: SideEffectKind + reason: Optional[str] = None + + +@dataclass +class RouteResult: + responses: List[BaseResponse] + done: bool = False + side_effects: List[SideEffect] = None + + def __post_init__(self): + if self.side_effects is None: + self.side_effects = [] + + +def _default_subtask_id(task_id: str) -> str: + return f"{task_id}-default_subtask" + + +async def handle_status_update( + response_factory: ResponseFactory, + task: Task, + thread_id: str, + event: TaskStatusUpdateEvent, +) -> RouteResult: + responses: List[BaseResponse] = [] + state = event.status.state + logger.info(f"Task {task.task_id} status update: {state}") + + if state in {TaskState.submitted, TaskState.completed}: + return RouteResult(responses) + + if state == TaskState.failed: + err_msg = get_message_text(event.status.message) + responses.append( + response_factory.task_failed( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=_default_subtask_id(task.task_id), + content=err_msg, + ) + ) + return RouteResult( + responses=responses, + done=True, + side_effects=[SideEffect(kind=SideEffectKind.FAIL_TASK, reason=err_msg)], + ) + + if not event.metadata: + return RouteResult(responses) + + response_event = event.metadata.get("response_event") + + # Tool call events + if state == TaskState.working and EventPredicates.is_tool_call(response_event): + subtask_id = event.metadata.get("subtask_id") if event.metadata else None + if not subtask_id: + subtask_id = _default_subtask_id(task.task_id) + tool_call_id = event.metadata.get("tool_call_id", "") + tool_name = event.metadata.get("tool_name", "") + + if response_event == StreamResponseEvent.TOOL_CALL_STARTED: + responses.append( + response_factory.tool_call_started( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + tool_call_id=tool_call_id, + tool_name=tool_name, + ) + ) + return RouteResult(responses) + + if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: + tool_call_result = get_message_text(event.metadata.get("tool_result", "")) + responses.append( + response_factory.tool_call_result( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_call_result=tool_call_result, + ) + ) + return RouteResult(responses) + + # Reasoning messages + if state == TaskState.working and EventPredicates.is_reasoning(response_event): + subtask_id = event.metadata.get("subtask_id") if event.metadata else None + if not subtask_id: + subtask_id = _default_subtask_id(task.task_id) + responses.append( + response_factory.reasoning( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + content=get_message_text(event.status.message, ""), + ) + ) + return RouteResult(responses) + + return RouteResult(responses) + + +async def handle_artifact_update( + response_factory: ResponseFactory, + task: Task, + thread_id: str, + event: TaskArtifactUpdateEvent, +) -> List[BaseResponse]: + responses: List[BaseResponse] = [] + artifact = event.artifact + subtask_id = artifact.metadata.get("subtask_id") if artifact.metadata else None + if not subtask_id: + subtask_id = _default_subtask_id(task.task_id) + response_event = artifact.metadata.get("response_event") + content = get_message_text(artifact, "") + + if response_event == StreamResponseEvent.COMPONENT_GENERATOR: + component_type = artifact.metadata.get("component_type", "unknown") + responses.append( + response_factory.component_generator( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + content=content, + component_type=component_type, + ) + ) + return responses + + responses.append( + response_factory.message_response_general( + event=response_event, + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + content=content, + ) + ) + return responses diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index c6ae93c84..7f26d5b78 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -146,7 +146,9 @@ class UnifiedResponseData(BaseModel): subtask_id: Optional[str] = Field( None, description="Unique ID for the subtask, if any" ) - data: Optional[ResponsePayload] = Field(None, description="The message data payload") + data: Optional[ResponsePayload] = Field( + None, description="The message data payload" + ) class BaseResponse(BaseModel, ABC): From f8c88d3a92c358967aa0d53485113079c460415a Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 10:27:15 +0800 Subject: [PATCH 06/12] feat: add new response events for task cancellation and reasoning lifecycle --- python/valuecell/core/agent/responses.py | 14 ++++++++++++++ python/valuecell/core/types.py | 3 +++ 2 files changed, 17 insertions(+) diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index d832204cf..5bef89bbd 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -54,6 +54,12 @@ def tool_call_completed( subtask_id=subtask_id, ) + def reasoning_started(self, subtask_id: str | None = None) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.REASONING_STARTED, + subtask_id=subtask_id, + ) + def reasoning(self, content: str, subtask_id: str | None = None) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.REASONING, @@ -61,6 +67,12 @@ def reasoning(self, content: str, subtask_id: str | None = None) -> StreamRespon subtask_id=subtask_id, ) + def reasoning_completed(self, subtask_id: str | None = None) -> StreamResponse: + return StreamResponse( + event=StreamResponseEvent.REASONING_COMPLETED, + subtask_id=subtask_id, + ) + def component_generator( self, content: str, component_type: str, subtask_id: str | None = None ) -> StreamResponse: @@ -141,7 +153,9 @@ def is_tool_call(response_type) -> bool: @staticmethod def is_reasoning(response_type) -> bool: return response_type in { + StreamResponseEvent.REASONING_STARTED, StreamResponseEvent.REASONING, + StreamResponseEvent.REASONING_COMPLETED, } diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 7f26d5b78..490a1c6db 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -59,6 +59,7 @@ class SystemResponseEvent(str, Enum): class _TaskResponseEvent(str, Enum): TASK_STARTED = "task_started" TASK_COMPLETED = "task_completed" + TASK_CANCELLED = "task_cancelled" class StreamResponseEvent(str, Enum): @@ -66,7 +67,9 @@ class StreamResponseEvent(str, Enum): COMPONENT_GENERATOR = "component_generator" TOOL_CALL_STARTED = "tool_call_started" TOOL_CALL_COMPLETED = "tool_call_completed" + REASONING_STARTED = "reasoning_started" REASONING = "reasoning" + REASONING_COMPLETED = "reasoning_completed" class NotifyResponseEvent(str, Enum): From 8490b84a87c91038967ef2219914eae1e3bdaa61 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 10:42:11 +0800 Subject: [PATCH 07/12] feat: update response handling for tool calls and reasoning events with optional content --- python/valuecell/core/coordinate/response.py | 54 ++++++++----------- .../core/coordinate/response_router.py | 51 +++++++----------- python/valuecell/core/types.py | 26 ++++----- 3 files changed, 51 insertions(+), 80 deletions(-) diff --git a/python/valuecell/core/coordinate/response.py b/python/valuecell/core/coordinate/response.py index 7d5c3ffcf..a2ce3bb04 100644 --- a/python/valuecell/core/coordinate/response.py +++ b/python/valuecell/core/coordinate/response.py @@ -1,3 +1,5 @@ +from typing import Optional + from typing_extensions import Literal from valuecell.core.types import ( BaseResponseDataContent, @@ -14,9 +16,8 @@ SystemFailedResponse, TaskCompletedResponse, TaskFailedResponse, - ToolCallCompletedResponse, ToolCallContent, - ToolCallStartedResponse, + ToolCallResponse, UnifiedResponseData, ) @@ -99,39 +100,22 @@ def task_completed( ), ) - def tool_call_started( - self, - conversation_id: str, - thread_id: str, - task_id: str, - subtask_id: str, - tool_call_id: str, - tool_name: str, - ) -> ToolCallStartedResponse: - return ToolCallStartedResponse( - data=UnifiedResponseData( - conversation_id=conversation_id, - thread_id=thread_id, - task_id=task_id, - subtask_id=subtask_id, - data=ToolCallContent( - tool_call_id=tool_call_id, - tool_name=tool_name, - ), - ) - ) - - def tool_call_result( + def tool_call( self, conversation_id: str, thread_id: str, task_id: str, subtask_id: str, + event: Literal[ + StreamResponseEvent.TOOL_CALL_STARTED, + StreamResponseEvent.TOOL_CALL_COMPLETED, + ], tool_call_id: str, tool_name: str, - tool_call_result: str, - ) -> ToolCallCompletedResponse: - return ToolCallCompletedResponse( + tool_result: Optional[str] = None, + ) -> ToolCallResponse: + return ToolCallResponse( + event=event, data=UnifiedResponseData( conversation_id=conversation_id, thread_id=thread_id, @@ -140,9 +124,9 @@ def tool_call_result( data=ToolCallContent( tool_call_id=tool_call_id, tool_name=tool_name, - tool_result=tool_call_result, + tool_result=tool_result, ), - ) + ), ) def message_response_general( @@ -171,15 +155,21 @@ def reasoning( thread_id: str, task_id: str, subtask_id: str, - content: str, + event: Literal[ + StreamResponseEvent.REASONING, + StreamResponseEvent.REASONING_STARTED, + StreamResponseEvent.REASONING_COMPLETED, + ], + content: Optional[str] = None, ) -> ReasoningResponse: return ReasoningResponse( + event=event, data=UnifiedResponseData( conversation_id=conversation_id, thread_id=thread_id, task_id=task_id, subtask_id=subtask_id, - data=BaseResponseDataContent(content=content), + data=BaseResponseDataContent(content=content) if content else None, ), ) diff --git a/python/valuecell/core/coordinate/response_router.py b/python/valuecell/core/coordinate/response_router.py index 514fd922c..c3f7dd8a6 100644 --- a/python/valuecell/core/coordinate/response_router.py +++ b/python/valuecell/core/coordinate/response_router.py @@ -72,48 +72,33 @@ async def handle_status_update( return RouteResult(responses) response_event = event.metadata.get("response_event") + subtask_id = event.metadata.get("subtask_id") + if not subtask_id: + subtask_id = _default_subtask_id(task.task_id) # Tool call events if state == TaskState.working and EventPredicates.is_tool_call(response_event): - subtask_id = event.metadata.get("subtask_id") if event.metadata else None - if not subtask_id: - subtask_id = _default_subtask_id(task.task_id) - tool_call_id = event.metadata.get("tool_call_id", "") - tool_name = event.metadata.get("tool_name", "") - - if response_event == StreamResponseEvent.TOOL_CALL_STARTED: - responses.append( - response_factory.tool_call_started( - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=subtask_id, - tool_call_id=tool_call_id, - tool_name=tool_name, - ) - ) - return RouteResult(responses) + tool_call_id = event.metadata.get("tool_call_id", "unknown_tool_call_id") + tool_name = event.metadata.get("tool_name", "unknown_tool_name") - if response_event == StreamResponseEvent.TOOL_CALL_COMPLETED: + tool_call_result = None + if "tool_result" in event.metadata: tool_call_result = get_message_text(event.metadata.get("tool_result", "")) - responses.append( - response_factory.tool_call_result( - conversation_id=task.session_id, - thread_id=thread_id, - task_id=task.task_id, - subtask_id=subtask_id, - tool_call_id=tool_call_id, - tool_name=tool_name, - tool_call_result=tool_call_result, - ) + responses.append( + response_factory.tool_call( + conversation_id=task.session_id, + thread_id=thread_id, + task_id=task.task_id, + subtask_id=subtask_id, + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_call_result=tool_call_result, ) - return RouteResult(responses) + ) + return RouteResult(responses) # Reasoning messages if state == TaskState.working and EventPredicates.is_reasoning(response_event): - subtask_id = event.metadata.get("subtask_id") if event.metadata else None - if not subtask_id: - subtask_id = _default_subtask_id(task.task_id) responses.append( response_factory.reasoning( conversation_id=task.session_id, diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 490a1c6db..a4dedd6cd 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -120,7 +120,7 @@ class ToolCallContent(BaseModel): class BaseResponseDataContent(BaseModel, ABC): - content: str = Field(..., description="The message content") + content: Optional[str] = Field(None, description="The message content") class ComponentGeneratorResponseDataContent(BaseResponseDataContent): @@ -195,26 +195,22 @@ class ComponentGeneratorResponse(BaseResponse): data: UnifiedResponseData = Field(..., description="The component generator data") -class ToolCallStartedResponse(BaseResponse): - event: Literal[StreamResponseEvent.TOOL_CALL_STARTED] = Field( - StreamResponseEvent.TOOL_CALL_STARTED, - description="The event type of the response", - ) - data: UnifiedResponseData = Field(..., description="The task data payload") - - -class ToolCallCompletedResponse(BaseResponse): - event: Literal[StreamResponseEvent.TOOL_CALL_COMPLETED] = Field( - StreamResponseEvent.TOOL_CALL_COMPLETED, +class ToolCallResponse(BaseResponse): + event: Literal[ + StreamResponseEvent.TOOL_CALL_STARTED, StreamResponseEvent.TOOL_CALL_COMPLETED + ] = Field( + ..., description="The event type of the response", ) data: UnifiedResponseData = Field(..., description="The task data payload") class ReasoningResponse(BaseResponse): - event: Literal[StreamResponseEvent.REASONING] = Field( - StreamResponseEvent.REASONING, description="The event type of the response" - ) + event: Literal[ + StreamResponseEvent.REASONING_STARTED, + StreamResponseEvent.REASONING, + StreamResponseEvent.REASONING_COMPLETED, + ] = Field(..., description="The event type of the response") data: UnifiedResponseData = Field(..., description="The reasoning message content") From a967dc2854d1b9345a46fdaea3f1491c82d0ae75 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 10:58:28 +0800 Subject: [PATCH 08/12] fix tests --- python/valuecell/core/coordinate/orchestrator.py | 7 ++++--- python/valuecell/core/coordinate/response_router.py | 8 +++++--- .../valuecell/core/coordinate/tests/test_orchestrator.py | 6 +++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 68ac9d384..695f7617a 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -172,7 +172,8 @@ async def process_user_input( except Exception as e: logger.exception(f"Error processing user input for session {session_id}") yield self._response_factory.system_failed( - f"Error processing request: {str(e)}", session_id + session_id, + f"(Error) Error processing request: {str(e)}", ) async def provide_user_input(self, session_id: str, response: str): @@ -510,7 +511,7 @@ async def _execute_plan_with_input_support( agent_responses[task.agent_name] = "" except Exception as e: - error_msg = f"Error executing {task.agent_name}: {str(e)}" + error_msg = f"(Error) Error executing {task.agent_name}: {str(e)}" logger.exception(f"Task execution failed: {error_msg}") yield self._response_factory.task_failed( session_id, @@ -571,7 +572,7 @@ async def _execute_task_with_input_support( if isinstance(event, TaskStatusUpdateEvent): result: RouteResult = await handle_status_update( - self._response_factory, task, thread_id, event, logger + self._response_factory, task, thread_id, event ) for r in result.responses: yield r diff --git a/python/valuecell/core/coordinate/response_router.py b/python/valuecell/core/coordinate/response_router.py index c3f7dd8a6..4de70e83d 100644 --- a/python/valuecell/core/coordinate/response_router.py +++ b/python/valuecell/core/coordinate/response_router.py @@ -81,18 +81,19 @@ async def handle_status_update( tool_call_id = event.metadata.get("tool_call_id", "unknown_tool_call_id") tool_name = event.metadata.get("tool_name", "unknown_tool_name") - tool_call_result = None + tool_result = None if "tool_result" in event.metadata: - tool_call_result = get_message_text(event.metadata.get("tool_result", "")) + tool_result = get_message_text(event.metadata.get("tool_result", "")) responses.append( response_factory.tool_call( conversation_id=task.session_id, thread_id=thread_id, task_id=task.task_id, subtask_id=subtask_id, + event=response_event, tool_call_id=tool_call_id, tool_name=tool_name, - tool_call_result=tool_call_result, + tool_result=tool_result, ) ) return RouteResult(responses) @@ -105,6 +106,7 @@ async def handle_status_update( thread_id=thread_id, task_id=task.task_id, subtask_id=subtask_id, + event=response_event, content=get_message_text(event.status.message, ""), ) ) diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index e78370a54..69ccf2516 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -309,8 +309,8 @@ async def test_planner_error( out.append(chunk) assert len(out) == 1 - assert "(Error)" in out[0].data.content - assert "Planning failed" in out[0].data.content + assert "(Error)" in out[0].data.data.content + assert "Planning failed" in out[0].data.data.content @pytest.mark.asyncio @@ -328,7 +328,7 @@ async def test_agent_connection_error( async for chunk in orchestrator.process_user_input(sample_user_input): out.append(chunk) - assert any("(Error)" in c.data.content for c in out) + assert any("(Error)" in c.data.data.content for c in out) @pytest.mark.asyncio From 36aa8194e15e2fb8dd70832f1b57153f0cdf81dc Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 11:18:19 +0800 Subject: [PATCH 09/12] refactor: rename ToolCallContent and BaseResponseDataContent to ToolCallPayload and BaseResponseDataPayload for consistency --- python/valuecell/core/agent/responses.py | 6 +++--- python/valuecell/core/coordinate/response.py | 22 ++++++++++---------- python/valuecell/core/types.py | 14 ++++++------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/python/valuecell/core/agent/responses.py b/python/valuecell/core/agent/responses.py index 5bef89bbd..74bc6ed3c 100644 --- a/python/valuecell/core/agent/responses.py +++ b/python/valuecell/core/agent/responses.py @@ -8,7 +8,7 @@ StreamResponse, StreamResponseEvent, SystemResponseEvent, - ToolCallContent, + ToolCallPayload, _TaskResponseEvent, ) @@ -30,7 +30,7 @@ def tool_call_started( ) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.TOOL_CALL_STARTED, - metadata=ToolCallContent( + metadata=ToolCallPayload( tool_call_id=tool_call_id, tool_name=tool_name, ).model_dump(), @@ -46,7 +46,7 @@ def tool_call_completed( ) -> StreamResponse: return StreamResponse( event=StreamResponseEvent.TOOL_CALL_COMPLETED, - metadata=ToolCallContent( + metadata=ToolCallPayload( tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result, diff --git a/python/valuecell/core/coordinate/response.py b/python/valuecell/core/coordinate/response.py index a2ce3bb04..399af08c2 100644 --- a/python/valuecell/core/coordinate/response.py +++ b/python/valuecell/core/coordinate/response.py @@ -2,9 +2,9 @@ from typing_extensions import Literal from valuecell.core.types import ( - BaseResponseDataContent, + BaseResponseDataPayload, ComponentGeneratorResponse, - ComponentGeneratorResponseDataContent, + ComponentGeneratorResponseDataPayload, ConversationStartedResponse, DoneResponse, MessageResponse, @@ -16,7 +16,7 @@ SystemFailedResponse, TaskCompletedResponse, TaskFailedResponse, - ToolCallContent, + ToolCallPayload, ToolCallResponse, UnifiedResponseData, ) @@ -32,7 +32,7 @@ def system_failed(self, conversation_id: str, content: str) -> SystemFailedRespo return SystemFailedResponse( data=UnifiedResponseData( conversation_id=conversation_id, - data=BaseResponseDataContent(content=content), + payload=BaseResponseDataPayload(content=content), ) ) @@ -51,7 +51,7 @@ def plan_require_user_input( data=UnifiedResponseData( conversation_id=conversation_id, thread_id=thread_id, - data=BaseResponseDataContent(content=content), + payload=BaseResponseDataPayload(content=content), ) ) @@ -62,7 +62,7 @@ def plan_failed( data=UnifiedResponseData( conversation_id=conversation_id, thread_id=thread_id, - data=BaseResponseDataContent(content=content), + payload=BaseResponseDataPayload(content=content), ) ) @@ -80,7 +80,7 @@ def task_failed( thread_id=thread_id, task_id=task_id, subtask_id=subtask_id, - data=BaseResponseDataContent(content=content), + payload=BaseResponseDataPayload(content=content), ) ) @@ -121,7 +121,7 @@ def tool_call( thread_id=thread_id, task_id=task_id, subtask_id=subtask_id, - data=ToolCallContent( + payload=ToolCallPayload( tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result, @@ -145,7 +145,7 @@ def message_response_general( thread_id=thread_id, task_id=task_id, subtask_id=subtask_id, - data=BaseResponseDataContent(content=content), + payload=BaseResponseDataPayload(content=content), ), ) @@ -169,7 +169,7 @@ def reasoning( thread_id=thread_id, task_id=task_id, subtask_id=subtask_id, - data=BaseResponseDataContent(content=content) if content else None, + payload=BaseResponseDataPayload(content=content) if content else None, ), ) @@ -188,7 +188,7 @@ def component_generator( thread_id=thread_id, task_id=task_id, subtask_id=subtask_id, - data=ComponentGeneratorResponseDataContent( + payload=ComponentGeneratorResponseDataPayload( content=content, component_type=component_type, ), diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index a4dedd6cd..89a640adc 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -110,7 +110,7 @@ class NotifyResponse(BaseModel): ) -class ToolCallContent(BaseModel): +class ToolCallPayload(BaseModel): tool_call_id: str = Field(..., description="Unique ID for the tool call") tool_name: str = Field(..., description="Name of the tool being called") tool_result: Optional[str] = Field( @@ -119,18 +119,18 @@ class ToolCallContent(BaseModel): ) -class BaseResponseDataContent(BaseModel, ABC): +class BaseResponseDataPayload(BaseModel, ABC): content: Optional[str] = Field(None, description="The message content") -class ComponentGeneratorResponseDataContent(BaseResponseDataContent): +class ComponentGeneratorResponseDataPayload(BaseResponseDataPayload): component_type: str = Field(..., description="The component type") ResponsePayload = Union[ - BaseResponseDataContent, - ComponentGeneratorResponseDataContent, - ToolCallContent, + BaseResponseDataPayload, + ComponentGeneratorResponseDataPayload, + ToolCallPayload, ] @@ -149,7 +149,7 @@ class UnifiedResponseData(BaseModel): subtask_id: Optional[str] = Field( None, description="Unique ID for the subtask, if any" ) - data: Optional[ResponsePayload] = Field( + payload: Optional[ResponsePayload] = Field( None, description="The message data payload" ) From 46f54486dc2238cdfa89f7e20f5999ecc67dd675 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 11:25:38 +0800 Subject: [PATCH 10/12] fix: correct subtask ID format in _default_subtask_id function --- python/valuecell/core/coordinate/response_router.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/valuecell/core/coordinate/response_router.py b/python/valuecell/core/coordinate/response_router.py index 4de70e83d..7b97b13ea 100644 --- a/python/valuecell/core/coordinate/response_router.py +++ b/python/valuecell/core/coordinate/response_router.py @@ -35,7 +35,7 @@ def __post_init__(self): def _default_subtask_id(task_id: str) -> str: - return f"{task_id}-default_subtask" + return f"{task_id}_default-subtask" async def handle_status_update( From 5b27d23fca7b22dfd9a0f42a3835df2a62bbac99 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 11:27:40 +0800 Subject: [PATCH 11/12] fix: update response data access in orchestrator and tests to use payload --- python/valuecell/core/coordinate/orchestrator.py | 4 ++-- python/valuecell/core/coordinate/tests/test_orchestrator.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 695f7617a..343bb6eec 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -493,8 +493,8 @@ async def _execute_plan_with_input_support( StreamResponseEvent.MESSAGE_CHUNK, StreamResponseEvent.REASONING, NotifyResponseEvent.MESSAGE, - } and isinstance(response.data.data.content, str): - agent_responses[task.agent_name] += response.data.data.content + } and isinstance(response.data.payload.content, str): + agent_responses[task.agent_name] += response.data.payload.content yield response if ( diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index 69ccf2516..30caa4bbd 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -309,8 +309,8 @@ async def test_planner_error( out.append(chunk) assert len(out) == 1 - assert "(Error)" in out[0].data.data.content - assert "Planning failed" in out[0].data.data.content + assert "(Error)" in out[0].data.payload.content + assert "Planning failed" in out[0].data.payload.content @pytest.mark.asyncio @@ -328,7 +328,7 @@ async def test_agent_connection_error( async for chunk in orchestrator.process_user_input(sample_user_input): out.append(chunk) - assert any("(Error)" in c.data.data.content for c in out) + assert any("(Error)" in c.data.payload.content for c in out) @pytest.mark.asyncio From c669bcda8dbd4121ce73f70888f91f90ef707d41 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 23 Sep 2025 11:32:30 +0800 Subject: [PATCH 12/12] fix format --- python/valuecell/core/coordinate/orchestrator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 343bb6eec..85e218d79 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -494,7 +494,9 @@ async def _execute_plan_with_input_support( StreamResponseEvent.REASONING, NotifyResponseEvent.MESSAGE, } and isinstance(response.data.payload.content, str): - agent_responses[task.agent_name] += response.data.payload.content + agent_responses[task.agent_name] += ( + response.data.payload.content + ) yield response if (