diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 39b58fb95..5a3121e7e 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -665,20 +665,27 @@ async def _execute_plan_with_input_support( plan.user_id, conversation_id=task.conversation_id ) if task.handoff_from_super_agent: - yield self._response_factory.component_generator( - conversation_id=conversation_id, - thread_id=thread_id, - task_id=task.task_id, - content=json.dumps(subagent_component_content_dict), - component_type=ComponentType.SUBAGENT_CONVERSATION.value, - component_id=subagent_conversation_item_id, - agent_name=task.agent_name, + subagent_conv_start_component = ( + self._response_factory.component_generator( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task.task_id, + content=json.dumps(subagent_component_content_dict), + component_type=ComponentType.SUBAGENT_CONVERSATION.value, + component_id=subagent_conversation_item_id, + agent_name=task.agent_name, + ) ) - yield self._response_factory.thread_started( + yield subagent_conv_start_component + await self._persist_from_buffer(subagent_conv_start_component) + + subagent_conv_thread_started = self._response_factory.thread_started( conversation_id=task.conversation_id, thread_id=thread_id, user_query=task.query, ) + yield subagent_conv_thread_started + await self._persist_from_buffer(subagent_conv_thread_started) try: # Register the task with TaskManager (persist in-memory) await self.task_manager.update_task(task) @@ -710,15 +717,19 @@ async def _execute_plan_with_input_support( subagent_component_content_dict["phase"] = ( SubagentConversationPhase.END.value ) - yield self._response_factory.component_generator( - conversation_id=conversation_id, - thread_id=thread_id, - task_id=task.task_id, - content=json.dumps(subagent_component_content_dict), - component_type=ComponentType.SUBAGENT_CONVERSATION.value, - component_id=subagent_conversation_item_id, - agent_name=task.agent_name, + subagent_conv_end_component = ( + self._response_factory.component_generator( + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task.task_id, + content=json.dumps(subagent_component_content_dict), + component_type=ComponentType.SUBAGENT_CONVERSATION.value, + component_id=subagent_conversation_item_id, + agent_name=task.agent_name, + ) ) + yield subagent_conv_end_component + await self._persist_from_buffer(subagent_conv_end_component) async def _execute_task_with_input_support( self, task: Task, thread_id: str, metadata: Optional[dict] = None