diff --git a/python/valuecell/server/services/conversation_service.py b/python/valuecell/server/services/conversation_service.py index 1f122674c..769420ba5 100644 --- a/python/valuecell/server/services/conversation_service.py +++ b/python/valuecell/server/services/conversation_service.py @@ -11,6 +11,7 @@ ConversationService as CoreConversationService, ) from valuecell.core.event.factory import ResponseFactory +from valuecell.core.types import CommonResponseEvent, ComponentType from valuecell.server.api.schemas.conversation import ( ConversationDeleteData, ConversationHistoryData, @@ -58,27 +59,109 @@ async def get_conversation_list( conversation_id=conv.conversation_id, title=conv.title or f"Conversation {conv.conversation_id}", agent_name=conv.agent_name, - update_time=conv.updated_at.isoformat() - if conv.updated_at - else conv.created_at.isoformat(), + update_time=( + conv.updated_at.isoformat() + if conv.updated_at + else conv.created_at.isoformat() + ), ) conversation_items.append(conversation_item) return ConversationListData(conversations=conversation_items, total=total) + async def _validate_conversation_exists(self, conversation_id: str) -> None: + """Validate that a conversation exists, raise ValueError if not found.""" + conversation = await self.conversation_manager.get_conversation(conversation_id) + if not conversation: + raise ValueError(f"Conversation {conversation_id} not found") + + def _convert_response_to_history_item(self, response) -> ConversationHistoryItem: + """Convert a BaseResponse to ConversationHistoryItem.""" + data = response.data + + # Convert payload to dict for JSON serialization + payload_data = None + if data.payload: + try: + payload_data = ( + data.payload.model_dump() + if hasattr(data.payload, "model_dump") + else str(data.payload) + ) + except Exception: + payload_data = str(data.payload) + + # Normalize event and role names + event_str = self._normalize_event_name(str(response.event)) + role_str = self._normalize_role_name(str(data.role)) + + # Create unified format: event and data at top level + message_data_with_meta = MessageData( + conversation_id=data.conversation_id, + thread_id=data.thread_id, + task_id=data.task_id, + payload=payload_data, + role=role_str, + item_id=data.item_id, + ) + if data.agent_name: + message_data_with_meta.agent_name = data.agent_name + if data.metadata: + message_data_with_meta.metadata = data.metadata + + return ConversationHistoryItem(event=event_str, data=message_data_with_meta) + async def get_conversation_history( self, conversation_id: str ) -> ConversationHistoryData: """Get conversation history for a specific conversation.""" # Check if conversation exists - conversation = await self.conversation_manager.get_conversation(conversation_id) - if not conversation: - raise ValueError(f"Conversation {conversation_id} not found") + await self._validate_conversation_exists(conversation_id) + + # Retrieve persisted conversation items and rebuild responses + conversation_items = ( + await self.core_conversation_service.get_conversation_items( + conversation_id=conversation_id, + ) + ) + + base_responses = [] + for item in conversation_items: + resp = self.response_factory.from_conversation_item(item) + # Exclude scheduled task results from general history + if ( + resp.event == CommonResponseEvent.COMPONENT_GENERATOR.value + and resp.data.payload + and getattr(resp.data.payload, "component_type", None) + == ComponentType.SCHEDULED_TASK_RESULT.value + ): + continue # Skip scheduled task results in general history + + base_responses.append(resp) + + # Convert BaseResponse objects to ConversationHistoryItem objects + history_items = [ + self._convert_response_to_history_item(response) + for response in base_responses + ] + + return ConversationHistoryData( + conversation_id=conversation_id, items=history_items + ) + + async def get_conversation_scheduled_task_results( + self, conversation_id: str + ) -> ConversationHistoryData: + """Get scheduled task results for a specific conversation.""" + # Check if conversation exists + await self._validate_conversation_exists(conversation_id) # Retrieve persisted conversation items and rebuild responses conversation_items = ( await self.core_conversation_service.get_conversation_items( - conversation_id=conversation_id + conversation_id=conversation_id, + event=CommonResponseEvent.COMPONENT_GENERATOR.value, + component_type=ComponentType.SCHEDULED_TASK_RESULT.value, ) ) @@ -88,45 +171,10 @@ async def get_conversation_history( ] # Convert BaseResponse objects to ConversationHistoryItem objects - history_items = [] - for response in base_responses: - data = response.data - - # Convert payload to dict for JSON serialization - payload_data = None - if data.payload: - try: - payload_data = ( - data.payload.model_dump() - if hasattr(data.payload, "model_dump") - else str(data.payload) - ) - except Exception: - payload_data = str(data.payload) - - # Normalize event and role names - event_str = self._normalize_event_name(str(response.event)) - role_str = self._normalize_role_name(str(data.role)) - - # Create unified format: event and data at top level - message_data_with_meta = MessageData( - conversation_id=data.conversation_id, - thread_id=data.thread_id, - task_id=data.task_id, - payload=payload_data, - role=role_str, - item_id=data.item_id, - ) - if data.agent_name: - message_data_with_meta.agent_name = data.agent_name - if data.metadata: - message_data_with_meta.metadata = data.metadata - - history_item = ConversationHistoryItem( - event=event_str, data=message_data_with_meta - ) - - history_items.append(history_item) + history_items = [ + self._convert_response_to_history_item(response) + for response in base_responses + ] return ConversationHistoryData( conversation_id=conversation_id, items=history_items