Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 94 additions & 46 deletions python/valuecell/server/services/conversation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ConversationService as CoreConversationService,
)
from valuecell.core.event.factory import ResponseFactory
from valuecell.core.types import CommonResponseEvent, ComponentType
from valuecell.server.api.schemas.conversation import (
ConversationDeleteData,
ConversationHistoryData,
Expand Down Expand Up @@ -58,27 +59,109 @@ async def get_conversation_list(
conversation_id=conv.conversation_id,
title=conv.title or f"Conversation {conv.conversation_id}",
agent_name=conv.agent_name,
update_time=conv.updated_at.isoformat()
if conv.updated_at
else conv.created_at.isoformat(),
update_time=(
conv.updated_at.isoformat()
if conv.updated_at
else conv.created_at.isoformat()
),
)
conversation_items.append(conversation_item)

return ConversationListData(conversations=conversation_items, total=total)

async def _validate_conversation_exists(self, conversation_id: str) -> None:
"""Validate that a conversation exists, raise ValueError if not found."""
conversation = await self.conversation_manager.get_conversation(conversation_id)
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")

def _convert_response_to_history_item(self, response) -> ConversationHistoryItem:
"""Convert a BaseResponse to ConversationHistoryItem."""
data = response.data

# Convert payload to dict for JSON serialization
payload_data = None
if data.payload:
try:
payload_data = (
data.payload.model_dump()
if hasattr(data.payload, "model_dump")
else str(data.payload)
)
except Exception:
payload_data = str(data.payload)

# Normalize event and role names
event_str = self._normalize_event_name(str(response.event))
role_str = self._normalize_role_name(str(data.role))

# Create unified format: event and data at top level
message_data_with_meta = MessageData(
conversation_id=data.conversation_id,
thread_id=data.thread_id,
task_id=data.task_id,
payload=payload_data,
role=role_str,
item_id=data.item_id,
)
if data.agent_name:
message_data_with_meta.agent_name = data.agent_name
if data.metadata:
message_data_with_meta.metadata = data.metadata

return ConversationHistoryItem(event=event_str, data=message_data_with_meta)

async def get_conversation_history(
self, conversation_id: str
) -> ConversationHistoryData:
"""Get conversation history for a specific conversation."""
# Check if conversation exists
conversation = await self.conversation_manager.get_conversation(conversation_id)
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
await self._validate_conversation_exists(conversation_id)

# Retrieve persisted conversation items and rebuild responses
conversation_items = (
await self.core_conversation_service.get_conversation_items(
conversation_id=conversation_id,
)
)

base_responses = []
for item in conversation_items:
resp = self.response_factory.from_conversation_item(item)
# Exclude scheduled task results from general history
if (
resp.event == CommonResponseEvent.COMPONENT_GENERATOR.value
and resp.data.payload
and getattr(resp.data.payload, "component_type", None)
== ComponentType.SCHEDULED_TASK_RESULT.value
):
continue # Skip scheduled task results in general history

base_responses.append(resp)

# Convert BaseResponse objects to ConversationHistoryItem objects
history_items = [
self._convert_response_to_history_item(response)
for response in base_responses
]

return ConversationHistoryData(
conversation_id=conversation_id, items=history_items
)

async def get_conversation_scheduled_task_results(
self, conversation_id: str
) -> ConversationHistoryData:
"""Get scheduled task results for a specific conversation."""
# Check if conversation exists
await self._validate_conversation_exists(conversation_id)

# Retrieve persisted conversation items and rebuild responses
conversation_items = (
await self.core_conversation_service.get_conversation_items(
conversation_id=conversation_id
conversation_id=conversation_id,
event=CommonResponseEvent.COMPONENT_GENERATOR.value,
component_type=ComponentType.SCHEDULED_TASK_RESULT.value,
)
)

Expand All @@ -88,45 +171,10 @@ async def get_conversation_history(
]

# Convert BaseResponse objects to ConversationHistoryItem objects
history_items = []
for response in base_responses:
data = response.data

# Convert payload to dict for JSON serialization
payload_data = None
if data.payload:
try:
payload_data = (
data.payload.model_dump()
if hasattr(data.payload, "model_dump")
else str(data.payload)
)
except Exception:
payload_data = str(data.payload)

# Normalize event and role names
event_str = self._normalize_event_name(str(response.event))
role_str = self._normalize_role_name(str(data.role))

# Create unified format: event and data at top level
message_data_with_meta = MessageData(
conversation_id=data.conversation_id,
thread_id=data.thread_id,
task_id=data.task_id,
payload=payload_data,
role=role_str,
item_id=data.item_id,
)
if data.agent_name:
message_data_with_meta.agent_name = data.agent_name
if data.metadata:
message_data_with_meta.metadata = data.metadata

history_item = ConversationHistoryItem(
event=event_str, data=message_data_with_meta
)

history_items.append(history_item)
history_items = [
self._convert_response_to_history_item(response)
for response in base_responses
]

return ConversationHistoryData(
conversation_id=conversation_id, items=history_items
Expand Down