From e6b2265fadfa23e48a698ed87b763543e5d24feb Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 28 Oct 2025 15:46:04 +0800 Subject: [PATCH 1/2] feat(conversation): enhance conversation history retrieval and add scheduled task results filtering --- .../server/services/conversation_service.py | 87 ++++++++++++++++++- 1 file changed, 83 insertions(+), 4 deletions(-) diff --git a/python/valuecell/server/services/conversation_service.py b/python/valuecell/server/services/conversation_service.py index 1f122674c..df12e882d 100644 --- a/python/valuecell/server/services/conversation_service.py +++ b/python/valuecell/server/services/conversation_service.py @@ -11,6 +11,7 @@ ConversationService as CoreConversationService, ) from valuecell.core.event.factory import ResponseFactory +from valuecell.core.types import CommonResponseEvent, ComponentType from valuecell.server.api.schemas.conversation import ( ConversationDeleteData, ConversationHistoryData, @@ -58,9 +59,11 @@ async def get_conversation_list( conversation_id=conv.conversation_id, title=conv.title or f"Conversation {conv.conversation_id}", agent_name=conv.agent_name, - update_time=conv.updated_at.isoformat() - if conv.updated_at - else conv.created_at.isoformat(), + update_time=( + conv.updated_at.isoformat() + if conv.updated_at + else conv.created_at.isoformat() + ), ) conversation_items.append(conversation_item) @@ -78,7 +81,83 @@ async def get_conversation_history( # Retrieve persisted conversation items and rebuild responses conversation_items = ( await self.core_conversation_service.get_conversation_items( - conversation_id=conversation_id + conversation_id=conversation_id, + ) + ) + + base_responses = [] + for item in conversation_items: + resp = self.response_factory.from_conversation_item(item) + # Exclude scheduled task results from general history + if ( + resp.event == CommonResponseEvent.COMPONENT_GENERATOR.value + and resp.data.payload + and getattr(resp.data.payload, "component_type", None) + == ComponentType.SCHEDULED_TASK_RESULT.value + ): + continue # Skip scheduled task results in general history + base_responses.append(resp) + + # Convert BaseResponse objects to ConversationHistoryItem objects + history_items = [] + for response in base_responses: + data = response.data + + # Convert payload to dict for JSON serialization + payload_data = None + if data.payload: + try: + payload_data = ( + data.payload.model_dump() + if hasattr(data.payload, "model_dump") + else str(data.payload) + ) + except Exception: + payload_data = str(data.payload) + + # Normalize event and role names + event_str = self._normalize_event_name(str(response.event)) + role_str = self._normalize_role_name(str(data.role)) + + # Create unified format: event and data at top level + message_data_with_meta = MessageData( + conversation_id=data.conversation_id, + thread_id=data.thread_id, + task_id=data.task_id, + payload=payload_data, + role=role_str, + item_id=data.item_id, + ) + if data.agent_name: + message_data_with_meta.agent_name = data.agent_name + if data.metadata: + message_data_with_meta.metadata = data.metadata + + history_item = ConversationHistoryItem( + event=event_str, data=message_data_with_meta + ) + + history_items.append(history_item) + + return ConversationHistoryData( + conversation_id=conversation_id, items=history_items + ) + + async def get_conversation_scheduled_task_results( + self, conversation_id: str + ) -> ConversationHistoryData: + """Get conversation history for a specific conversation.""" + # Check if conversation exists + conversation = await self.conversation_manager.get_conversation(conversation_id) + if not conversation: + raise ValueError(f"Conversation {conversation_id} not found") + + # Retrieve persisted conversation items and rebuild responses + conversation_items = ( + await self.core_conversation_service.get_conversation_items( + conversation_id=conversation_id, + event=CommonResponseEvent.COMPONENT_GENERATOR.value, + component_type=ComponentType.SCHEDULED_TASK_RESULT.value, ) ) From badbb5e2a6ec52349542b79e1e5aa1bcd7366cba Mon Sep 17 00:00:00 2001 From: paisley Date: Tue, 28 Oct 2025 15:59:59 +0800 Subject: [PATCH 2/2] refactor get_conversation_scheduled_task_results --- .../server/services/conversation_service.py | 139 +++++++----------- 1 file changed, 54 insertions(+), 85 deletions(-) diff --git a/python/valuecell/server/services/conversation_service.py b/python/valuecell/server/services/conversation_service.py index df12e882d..769420ba5 100644 --- a/python/valuecell/server/services/conversation_service.py +++ b/python/valuecell/server/services/conversation_service.py @@ -69,14 +69,54 @@ async def get_conversation_list( return ConversationListData(conversations=conversation_items, total=total) + async def _validate_conversation_exists(self, conversation_id: str) -> None: + """Validate that a conversation exists, raise ValueError if not found.""" + conversation = await self.conversation_manager.get_conversation(conversation_id) + if not conversation: + raise ValueError(f"Conversation {conversation_id} not found") + + def _convert_response_to_history_item(self, response) -> ConversationHistoryItem: + """Convert a BaseResponse to ConversationHistoryItem.""" + data = response.data + + # Convert payload to dict for JSON serialization + payload_data = None + if data.payload: + try: + payload_data = ( + data.payload.model_dump() + if hasattr(data.payload, "model_dump") + else str(data.payload) + ) + except Exception: + payload_data = str(data.payload) + + # Normalize event and role names + event_str = self._normalize_event_name(str(response.event)) + role_str = self._normalize_role_name(str(data.role)) + + # Create unified format: event and data at top level + message_data_with_meta = MessageData( + conversation_id=data.conversation_id, + thread_id=data.thread_id, + task_id=data.task_id, + payload=payload_data, + role=role_str, + item_id=data.item_id, + ) + if data.agent_name: + message_data_with_meta.agent_name = data.agent_name + if data.metadata: + message_data_with_meta.metadata = data.metadata + + return ConversationHistoryItem(event=event_str, data=message_data_with_meta) + async def get_conversation_history( self, conversation_id: str ) -> ConversationHistoryData: """Get conversation history for a specific conversation.""" # Check if conversation exists - conversation = await self.conversation_manager.get_conversation(conversation_id) - if not conversation: - raise ValueError(f"Conversation {conversation_id} not found") + await self._validate_conversation_exists(conversation_id) # Retrieve persisted conversation items and rebuild responses conversation_items = ( @@ -96,48 +136,14 @@ async def get_conversation_history( == ComponentType.SCHEDULED_TASK_RESULT.value ): continue # Skip scheduled task results in general history + base_responses.append(resp) # Convert BaseResponse objects to ConversationHistoryItem objects - history_items = [] - for response in base_responses: - data = response.data - - # Convert payload to dict for JSON serialization - payload_data = None - if data.payload: - try: - payload_data = ( - data.payload.model_dump() - if hasattr(data.payload, "model_dump") - else str(data.payload) - ) - except Exception: - payload_data = str(data.payload) - - # Normalize event and role names - event_str = self._normalize_event_name(str(response.event)) - role_str = self._normalize_role_name(str(data.role)) - - # Create unified format: event and data at top level - message_data_with_meta = MessageData( - conversation_id=data.conversation_id, - thread_id=data.thread_id, - task_id=data.task_id, - payload=payload_data, - role=role_str, - item_id=data.item_id, - ) - if data.agent_name: - message_data_with_meta.agent_name = data.agent_name - if data.metadata: - message_data_with_meta.metadata = data.metadata - - history_item = ConversationHistoryItem( - event=event_str, data=message_data_with_meta - ) - - history_items.append(history_item) + history_items = [ + self._convert_response_to_history_item(response) + for response in base_responses + ] return ConversationHistoryData( conversation_id=conversation_id, items=history_items @@ -146,11 +152,9 @@ async def get_conversation_history( async def get_conversation_scheduled_task_results( self, conversation_id: str ) -> ConversationHistoryData: - """Get conversation history for a specific conversation.""" + """Get scheduled task results for a specific conversation.""" # Check if conversation exists - conversation = await self.conversation_manager.get_conversation(conversation_id) - if not conversation: - raise ValueError(f"Conversation {conversation_id} not found") + await self._validate_conversation_exists(conversation_id) # Retrieve persisted conversation items and rebuild responses conversation_items = ( @@ -167,45 +171,10 @@ async def get_conversation_scheduled_task_results( ] # Convert BaseResponse objects to ConversationHistoryItem objects - history_items = [] - for response in base_responses: - data = response.data - - # Convert payload to dict for JSON serialization - payload_data = None - if data.payload: - try: - payload_data = ( - data.payload.model_dump() - if hasattr(data.payload, "model_dump") - else str(data.payload) - ) - except Exception: - payload_data = str(data.payload) - - # Normalize event and role names - event_str = self._normalize_event_name(str(response.event)) - role_str = self._normalize_role_name(str(data.role)) - - # Create unified format: event and data at top level - message_data_with_meta = MessageData( - conversation_id=data.conversation_id, - thread_id=data.thread_id, - task_id=data.task_id, - payload=payload_data, - role=role_str, - item_id=data.item_id, - ) - if data.agent_name: - message_data_with_meta.agent_name = data.agent_name - if data.metadata: - message_data_with_meta.metadata = data.metadata - - history_item = ConversationHistoryItem( - event=event_str, data=message_data_with_meta - ) - - history_items.append(history_item) + history_items = [ + self._convert_response_to_history_item(response) + for response in base_responses + ] return ConversationHistoryData( conversation_id=conversation_id, items=history_items