Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions python/valuecell/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# Session management
# Conversation management
from .agent.decorator import create_wrapped_agent
from .agent.responses import notification, streaming
from .session import (
InMemorySessionStore,
Session,
SessionManager,
SessionStatus,
SessionStore,
from .conversation import (
InMemoryConversationStore,
Conversation,
ConversationManager,
ConversationStatus,
ConversationStore,
)
from .session.message_store import (
InMemoryMessageStore,
MessageStore,
SQLiteMessageStore,
from .conversation.item_store import (
InMemoryItemStore,
ItemStore,
SQLiteItemStore,
)

# Task management
Expand All @@ -27,15 +27,15 @@
)

__all__ = [
# Session exports
"Session",
"SessionStatus",
"SessionManager",
"SessionStore",
"InMemorySessionStore",
"MessageStore",
"InMemoryMessageStore",
"SQLiteMessageStore",
# Conversation exports
"Conversation",
"ConversationStatus",
"ConversationManager",
"ConversationStore",
"InMemoryConversationStore",
"ItemStore",
"InMemoryItemStore",
"SQLiteItemStore",
# Task exports
"Task",
"TaskStatus",
Expand Down
4 changes: 2 additions & 2 deletions python/valuecell/core/agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def _setup_client(self):
async def send_message(
self,
query: str,
session_id: str = None,
conversation_id: str = None,
metadata: dict = None,
streaming: bool = False,
) -> AsyncIterator[RemoteAgentResponse]:
Expand All @@ -66,7 +66,7 @@ async def send_message(
role=Role.user,
parts=[Part(root=TextPart(text=query))],
message_id=generate_uuid("msg"),
context_id=session_id or generate_uuid("ctx"),
context_id=conversation_id or generate_uuid("ctx"),
metadata=metadata if metadata else None,
)

Expand Down
10 changes: 5 additions & 5 deletions python/valuecell/core/agent/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,21 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non

# Helper state
task_id = task.id
session_id = task.context_id
updater = TaskUpdater(event_queue, task_id, session_id)
context_id = task.context_id
updater = TaskUpdater(event_queue, task_id, context_id)

# Stream from the user agent and update task incrementally
await updater.update_status(
TaskState.working,
message=new_agent_text_message(
f"Task received by {agent_name}", session_id, task_id
f"Task received by {agent_name}", context_id, task_id
),
)
try:
query_handler = (
self.agent.notify if task_meta.get("notify") else self.agent.stream
)
async for response in query_handler(query, session_id, task_id):
async for response in query_handler(query, context_id, task_id):
if not isinstance(response, (StreamResponse, NotifyResponse)):
raise ValueError(
f"Agent {agent_name} yielded invalid response type: {type(response)}"
Expand Down Expand Up @@ -176,7 +176,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
logger.error(message)
await updater.update_status(
TaskState.failed,
message=new_agent_text_message(message, session_id, task_id),
message=new_agent_text_message(message, context_id, task_id),
)
finally:
await updater.complete()
Expand Down
21 changes: 21 additions & 0 deletions python/valuecell/core/conversation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Conversation module initialization"""

from .manager import ConversationManager
from .item_store import InMemoryItemStore, ItemStore, SQLiteItemStore
from .models import Conversation, ConversationStatus
from .conversation_store import InMemoryConversationStore, ConversationStore

__all__ = [
# Models
"Conversation",
"ConversationStatus",
# Conversation management
"ConversationManager",
# Conversation storage
"ConversationStore",
"InMemoryConversationStore",
# Item storage
"ItemStore",
"InMemoryItemStore",
"SQLiteItemStore",
]
84 changes: 84 additions & 0 deletions python/valuecell/core/conversation/conversation_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional

from .models import Conversation


class ConversationStore(ABC):
"""Conversation storage abstract base class - handles conversation metadata only.

Items are stored separately using ItemStore implementations.
"""

@abstractmethod
async def save_conversation(self, conversation: Conversation) -> None:
"""Save conversation"""

@abstractmethod
async def load_conversation(self, conversation_id: str) -> Optional[Conversation]:
"""Load conversation"""

@abstractmethod
async def delete_conversation(self, conversation_id: str) -> bool:
"""Delete conversation"""

@abstractmethod
async def list_conversations(
self, user_id: str, limit: int = 100, offset: int = 0
) -> List[Conversation]:
"""List user conversations"""

@abstractmethod
async def conversation_exists(self, conversation_id: str) -> bool:
"""Check if conversation exists"""


class InMemoryConversationStore(ConversationStore):
"""In-memory conversation storage implementation"""

def __init__(self):
self._conversations: Dict[str, Conversation] = {}

async def save_conversation(self, conversation: Conversation) -> None:
"""Save conversation to memory"""
self._conversations[conversation.conversation_id] = conversation

async def load_conversation(self, conversation_id: str) -> Optional[Conversation]:
"""Load conversation from memory"""
return self._conversations.get(conversation_id)

async def delete_conversation(self, conversation_id: str) -> bool:
"""Delete conversation from memory"""
if conversation_id in self._conversations:
del self._conversations[conversation_id]
return True
return False

async def list_conversations(
self, user_id: str, limit: int = 100, offset: int = 0
) -> List[Conversation]:
"""List user conversations"""
user_conversations = [
conversation
for conversation in self._conversations.values()
if conversation.user_id == user_id
]
# Sort by creation time descending
user_conversations.sort(key=lambda c: c.created_at, reverse=True)

# Apply pagination
start = offset
end = offset + limit
return user_conversations[start:end]

async def conversation_exists(self, conversation_id: str) -> bool:
"""Check if conversation exists"""
return conversation_id in self._conversations

def clear_all(self) -> None:
"""Clear all conversations (for testing)"""
self._conversations.clear()

def get_conversation_count(self) -> int:
"""Get total conversation count (for debugging)"""
return len(self._conversations)
Loading