Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions python/valuecell/server/api/routers/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from valuecell.server.services.conversation_service import get_conversation_service

from ..schemas.conversation import (
AllConversationsScheduledTaskResponse,
ConversationDeleteResponse,
ConversationHistoryResponse,
ConversationListResponse,
Expand Down Expand Up @@ -39,6 +40,30 @@ async def get_conversations(
data=data, msg="Conversations retrieved successfully"
)

@router.get(
"/scheduled-task-results",
response_model=AllConversationsScheduledTaskResponse,
summary="Get all conversations scheduled task results",
description="Get scheduled task results from all conversations, grouped by agent name",
)
async def get_all_conversations_scheduled_task_results(
user_id: Optional[str] = Query(None, description="Filter by user ID"),
) -> AllConversationsScheduledTaskResponse:
"""Get all conversations scheduled task results grouped by agent."""
try:
service = get_conversation_service()
data = await service.get_all_conversations_scheduled_task_results(
user_id=user_id
)
return AllConversationsScheduledTaskResponse.create(
data=data,
msg="All conversations scheduled task results retrieved successfully",
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Internal server error: {str(e)}"
)

@router.get(
"/{conversation_id}/history",
response_model=ConversationHistoryResponse,
Expand Down
26 changes: 26 additions & 0 deletions python/valuecell/server/api/schemas/conversation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Conversation API schemas."""

from datetime import datetime
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field
Expand Down Expand Up @@ -75,6 +76,26 @@ class ConversationDeleteData(BaseModel):
)


class AgentScheduledTaskResults(BaseModel):
"""Scheduled task results for a specific agent."""

agent_name: str = Field(..., description="Name of the agent")
results: List[ConversationHistoryItem] = Field(
..., description="List of scheduled task results for this agent"
)
update_time: Optional[datetime] = Field(
None, description="Timestamp of the latest message from this agent"
)


class AllConversationsScheduledTaskData(BaseModel):
"""Data structure for all conversations scheduled task results."""

agents: List[AgentScheduledTaskResults] = Field(
..., description="List of agents with their scheduled task results"
)


# Response type for conversation list
ConversationListResponse = SuccessResponse[ConversationListData]

Expand All @@ -83,3 +104,8 @@ class ConversationDeleteData(BaseModel):

# Response type for conversation deletion
ConversationDeleteResponse = SuccessResponse[ConversationDeleteData]

# Response type for all conversations scheduled task results
AllConversationsScheduledTaskResponse = SuccessResponse[
AllConversationsScheduledTaskData
]
77 changes: 77 additions & 0 deletions python/valuecell/server/services/conversation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from valuecell.core.event.factory import ResponseFactory
from valuecell.core.types import CommonResponseEvent, ComponentType
from valuecell.server.api.schemas.conversation import (
AgentScheduledTaskResults,
AllConversationsScheduledTaskData,
ConversationDeleteData,
ConversationHistoryData,
ConversationHistoryItem,
Expand Down Expand Up @@ -180,6 +182,81 @@ async def get_conversation_scheduled_task_results(
conversation_id=conversation_id, items=history_items
)

async def get_all_conversations_scheduled_task_results(
self, user_id: Optional[str] = None
) -> AllConversationsScheduledTaskData:
"""Get scheduled task results from all conversations, grouped by agent name."""
# Get all conversations
conversations = await self.conversation_manager.list_user_conversations(
user_id=user_id
)

# Dictionary to group results by agent name and track latest message times
agent_results = {}
agent_latest_times = {}

# Process each conversation
for conversation in conversations:
# Get conversation items
conversation_items = await self.conversation_manager.get_conversation_items(
conversation.conversation_id
)

# Filter for scheduled task results
# Note: ConversationItem.payload is a JSON string, not an object
scheduled_task_items = []
for item in conversation_items:
if (
hasattr(item, "event")
and item.event == CommonResponseEvent.COMPONENT_GENERATOR
):
# Parse the payload JSON string to check component_type
try:
import json

payload_data = json.loads(item.payload)
if (
payload_data.get("component_type")
== ComponentType.SCHEDULED_TASK_RESULT
):
scheduled_task_items.append(item)
except (json.JSONDecodeError, AttributeError):
# Skip items with invalid payload
continue

# Convert to history items and group by agent
for item in scheduled_task_items:
# Convert ConversationItem to BaseResponse first
response = self.response_factory.from_conversation_item(item)
history_item = self._convert_response_to_history_item(response)
agent_name = conversation.agent_name or "Unknown Agent"

if agent_name not in agent_results:
agent_results[agent_name] = []
agent_latest_times[agent_name] = None

agent_results[agent_name].append(history_item)

# Get the latest message time for this agent from the conversation
# We'll use the conversation's updated_at as a proxy for the latest message time
if (
agent_latest_times[agent_name] is None
or conversation.updated_at > agent_latest_times[agent_name]
):
agent_latest_times[agent_name] = conversation.updated_at

# Convert to response format with latest message times
agents = [
AgentScheduledTaskResults(
agent_name=agent_name,
results=results,
update_time=agent_latest_times[agent_name],
)
for agent_name, results in agent_results.items()
]

return AllConversationsScheduledTaskData(agents=agents)

async def delete_conversation(self, conversation_id: str) -> ConversationDeleteData:
"""Delete a conversation and all its associated data."""
# Check if conversation exists
Expand Down