diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 4d9c6080f..46aeed96b 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -9,6 +9,7 @@ from valuecell.core.types import ( MessageChunk, MessageChunkMetadata, + MessageChunkStatus, MessageDataKind, UserInput, ) @@ -35,12 +36,15 @@ def _create_message_chunk( user_id: str, kind: MessageDataKind = MessageDataKind.TEXT, is_final: bool = False, + status: MessageChunkStatus = MessageChunkStatus.partial, ) -> MessageChunk: """Create a MessageChunk with common metadata""" return MessageChunk( content=content, kind=kind, - meta=MessageChunkMetadata(session_id=session_id, user_id=user_id), + meta=MessageChunkMetadata( + session_id=session_id, user_id=user_id, status=status + ), is_final=is_final, ) @@ -53,6 +57,7 @@ def _create_error_message_chunk( session_id=session_id, user_id=user_id, is_final=True, + status=MessageChunkStatus.failure, ) async def process_user_input( @@ -97,8 +102,8 @@ async def _execute_plan( session_id, user_id = metadata["session_id"], metadata["user_id"] if not plan.tasks: - yield self._create_message_chunk( - "No tasks found for this request.", session_id, user_id, is_final=True + yield self._create_error_message_chunk( + "No tasks found for this request.", session_id, user_id ) return @@ -116,15 +121,6 @@ async def _execute_plan( error_msg = f"Error executing {task.agent_name}: {str(e)}" yield self._create_error_message_chunk(error_msg, session_id, user_id) - # Check if no results were produced - if not plan.tasks: - yield self._create_message_chunk( - "No agents were able to process this request.", - session_id, - user_id, - is_final=True, - ) - async def _execute_task( self, task, query: str, metadata: dict ) -> AsyncGenerator[MessageChunk, None]: @@ -165,11 +161,10 @@ async def _execute_task( if event.status.state == TaskState.failed: err_msg = get_message_text(event.status.message) await self.task_manager.fail_task(task.task_id, err_msg) - yield self._create_message_chunk( + yield self._create_error_message_chunk( err_msg, task.session_id, task.user_id, - is_final=True, ) return @@ -184,7 +179,11 @@ async def _execute_task( # Complete task await self.task_manager.complete_task(task.task_id) yield self._create_message_chunk( - "", task.session_id, task.user_id, is_final=True + "", + task.session_id, + task.user_id, + is_final=True, + status=MessageChunkStatus.success, ) except Exception as e: diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index f252fe987..56fd63713 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -55,7 +55,18 @@ class MessageDataKind(str, Enum): COMMAND = "command" +class MessageChunkStatus(str, Enum): + partial = "partial" + success = "success" + failure = "failure" + cancelled = "cancelled" + + class MessageChunkMetadata(BaseModel): + status: MessageChunkStatus = Field( + default=MessageChunkStatus.partial, + description="Chunk outcome: use partial for intermediate chunks; success/failure for final.", + ) session_id: str = Field(..., description="Session ID for this request") user_id: str = Field(..., description="User ID who made this request")