diff --git a/python/valuecell/core/__init__.py b/python/valuecell/core/__init__.py index ef6ad5ec8..8af5d85bf 100644 --- a/python/valuecell/core/__init__.py +++ b/python/valuecell/core/__init__.py @@ -1,17 +1,17 @@ -# Session management +# Conversation management from .agent.decorator import create_wrapped_agent from .agent.responses import notification, streaming -from .session import ( - InMemorySessionStore, - Session, - SessionManager, - SessionStatus, - SessionStore, +from .conversation import ( + InMemoryConversationStore, + Conversation, + ConversationManager, + ConversationStatus, + ConversationStore, ) -from .session.message_store import ( - InMemoryMessageStore, - MessageStore, - SQLiteMessageStore, +from .conversation.item_store import ( + InMemoryItemStore, + ItemStore, + SQLiteItemStore, ) # Task management @@ -27,15 +27,15 @@ ) __all__ = [ - # Session exports - "Session", - "SessionStatus", - "SessionManager", - "SessionStore", - "InMemorySessionStore", - "MessageStore", - "InMemoryMessageStore", - "SQLiteMessageStore", + # Conversation exports + "Conversation", + "ConversationStatus", + "ConversationManager", + "ConversationStore", + "InMemoryConversationStore", + "ItemStore", + "InMemoryItemStore", + "SQLiteItemStore", # Task exports "Task", "TaskStatus", diff --git a/python/valuecell/core/agent/client.py b/python/valuecell/core/agent/client.py index 73def6d00..b04aeb61c 100644 --- a/python/valuecell/core/agent/client.py +++ b/python/valuecell/core/agent/client.py @@ -51,7 +51,7 @@ async def _setup_client(self): async def send_message( self, query: str, - session_id: str = None, + conversation_id: str = None, metadata: dict = None, streaming: bool = False, ) -> AsyncIterator[RemoteAgentResponse]: @@ -66,7 +66,7 @@ async def send_message( role=Role.user, parts=[Part(root=TextPart(text=query))], message_id=generate_uuid("msg"), - context_id=session_id or generate_uuid("ctx"), + context_id=conversation_id or generate_uuid("ctx"), metadata=metadata if metadata else None, ) diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py index 99e064bad..672a13c28 100644 --- a/python/valuecell/core/agent/decorator.py +++ b/python/valuecell/core/agent/decorator.py @@ -116,21 +116,21 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non # Helper state task_id = task.id - session_id = task.context_id - updater = TaskUpdater(event_queue, task_id, session_id) + context_id = task.context_id + updater = TaskUpdater(event_queue, task_id, context_id) # Stream from the user agent and update task incrementally await updater.update_status( TaskState.working, message=new_agent_text_message( - f"Task received by {agent_name}", session_id, task_id + f"Task received by {agent_name}", context_id, task_id ), ) try: query_handler = ( self.agent.notify if task_meta.get("notify") else self.agent.stream ) - async for response in query_handler(query, session_id, task_id): + async for response in query_handler(query, context_id, task_id): if not isinstance(response, (StreamResponse, NotifyResponse)): raise ValueError( f"Agent {agent_name} yielded invalid response type: {type(response)}" @@ -176,7 +176,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non logger.error(message) await updater.update_status( TaskState.failed, - message=new_agent_text_message(message, session_id, task_id), + message=new_agent_text_message(message, context_id, task_id), ) finally: await updater.complete() diff --git a/python/valuecell/core/conversation/__init__.py b/python/valuecell/core/conversation/__init__.py new file mode 100644 index 000000000..43c57ddb8 --- /dev/null +++ b/python/valuecell/core/conversation/__init__.py @@ -0,0 +1,21 @@ +"""Conversation module initialization""" + +from .manager import ConversationManager +from .item_store import InMemoryItemStore, ItemStore, SQLiteItemStore +from .models import Conversation, ConversationStatus +from .conversation_store import InMemoryConversationStore, ConversationStore + +__all__ = [ + # Models + "Conversation", + "ConversationStatus", + # Conversation management + "ConversationManager", + # Conversation storage + "ConversationStore", + "InMemoryConversationStore", + # Item storage + "ItemStore", + "InMemoryItemStore", + "SQLiteItemStore", +] diff --git a/python/valuecell/core/conversation/conversation_store.py b/python/valuecell/core/conversation/conversation_store.py new file mode 100644 index 000000000..658e230ff --- /dev/null +++ b/python/valuecell/core/conversation/conversation_store.py @@ -0,0 +1,84 @@ +from abc import ABC, abstractmethod +from typing import Dict, List, Optional + +from .models import Conversation + + +class ConversationStore(ABC): + """Conversation storage abstract base class - handles conversation metadata only. + + Items are stored separately using ItemStore implementations. + """ + + @abstractmethod + async def save_conversation(self, conversation: Conversation) -> None: + """Save conversation""" + + @abstractmethod + async def load_conversation(self, conversation_id: str) -> Optional[Conversation]: + """Load conversation""" + + @abstractmethod + async def delete_conversation(self, conversation_id: str) -> bool: + """Delete conversation""" + + @abstractmethod + async def list_conversations( + self, user_id: str, limit: int = 100, offset: int = 0 + ) -> List[Conversation]: + """List user conversations""" + + @abstractmethod + async def conversation_exists(self, conversation_id: str) -> bool: + """Check if conversation exists""" + + +class InMemoryConversationStore(ConversationStore): + """In-memory conversation storage implementation""" + + def __init__(self): + self._conversations: Dict[str, Conversation] = {} + + async def save_conversation(self, conversation: Conversation) -> None: + """Save conversation to memory""" + self._conversations[conversation.conversation_id] = conversation + + async def load_conversation(self, conversation_id: str) -> Optional[Conversation]: + """Load conversation from memory""" + return self._conversations.get(conversation_id) + + async def delete_conversation(self, conversation_id: str) -> bool: + """Delete conversation from memory""" + if conversation_id in self._conversations: + del self._conversations[conversation_id] + return True + return False + + async def list_conversations( + self, user_id: str, limit: int = 100, offset: int = 0 + ) -> List[Conversation]: + """List user conversations""" + user_conversations = [ + conversation + for conversation in self._conversations.values() + if conversation.user_id == user_id + ] + # Sort by creation time descending + user_conversations.sort(key=lambda c: c.created_at, reverse=True) + + # Apply pagination + start = offset + end = offset + limit + return user_conversations[start:end] + + async def conversation_exists(self, conversation_id: str) -> bool: + """Check if conversation exists""" + return conversation_id in self._conversations + + def clear_all(self) -> None: + """Clear all conversations (for testing)""" + self._conversations.clear() + + def get_conversation_count(self) -> int: + """Get total conversation count (for debugging)""" + return len(self._conversations) diff --git a/python/valuecell/core/session/message_store.py b/python/valuecell/core/conversation/item_store.py similarity index 58% rename from python/valuecell/core/session/message_store.py rename to python/valuecell/core/conversation/item_store.py index 2a9bc67a5..6510b5d8d 100644 --- a/python/valuecell/core/session/message_store.py +++ b/python/valuecell/core/conversation/item_store.py @@ -9,51 +9,51 @@ from valuecell.core.types import ConversationItem, Role -class MessageStore(ABC): +class ItemStore(ABC): @abstractmethod - async def save_message(self, message: ConversationItem) -> None: ... + async def save_item(self, item: ConversationItem) -> None: ... @abstractmethod - async def get_messages( + async def get_items( self, - session_id: str, + conversation_id: str, limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, ) -> List[ConversationItem]: ... @abstractmethod - async def get_latest_message( - self, session_id: str + async def get_latest_item( + self, conversation_id: str ) -> Optional[ConversationItem]: ... @abstractmethod - async def get_message(self, message_id: str) -> Optional[ConversationItem]: ... + async def get_item(self, item_id: str) -> Optional[ConversationItem]: ... @abstractmethod - async def get_message_count(self, session_id: str) -> int: ... + async def get_item_count(self, conversation_id: str) -> int: ... @abstractmethod - async def delete_session_messages(self, session_id: str) -> None: ... + async def delete_conversation_items(self, conversation_id: str) -> None: ... -class InMemoryMessageStore(MessageStore): +class InMemoryItemStore(ItemStore): def __init__(self): - # session_id -> list[ConversationItem] - self._messages: Dict[str, List[ConversationItem]] = {} + # conversation_id -> list[ConversationItem] + self._items: Dict[str, List[ConversationItem]] = {} - async def save_message(self, message: ConversationItem) -> None: - arr = self._messages.setdefault(message.conversation_id, []) - arr.append(message) + async def save_item(self, item: ConversationItem) -> None: + arr = self._items.setdefault(item.conversation_id, []) + arr.append(item) - async def get_messages( + async def get_items( self, - session_id: str, + conversation_id: str, limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, ) -> List[ConversationItem]: - items = list(self._messages.get(session_id, [])) + items = list(self._items.get(conversation_id, [])) if role is not None: items = [m for m in items if m.role == role] if offset: @@ -62,26 +62,26 @@ async def get_messages( items = items[:limit] return items - async def get_latest_message(self, session_id: str) -> Optional[ConversationItem]: - items = self._messages.get(session_id, []) + async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]: + items = self._items.get(conversation_id, []) return items[-1] if items else None - async def get_message(self, message_id: str) -> Optional[ConversationItem]: - for arr in self._messages.values(): + async def get_item(self, item_id: str) -> Optional[ConversationItem]: + for arr in self._items.values(): for m in arr: - if m.item_id == message_id: + if m.item_id == item_id: return m return None - async def get_message_count(self, session_id: str) -> int: - return len(self._messages.get(session_id, [])) + async def get_item_count(self, conversation_id: str) -> int: + return len(self._items.get(conversation_id, [])) - async def delete_session_messages(self, session_id: str) -> None: - self._messages.pop(session_id, None) + async def delete_conversation_items(self, conversation_id: str) -> None: + self._items.pop(conversation_id, None) -class SQLiteMessageStore(MessageStore): - """SQLite-backed message store using aiosqlite for true async I/O.""" +class SQLiteItemStore(ItemStore): + """SQLite-backed item store using aiosqlite for true async I/O.""" def __init__(self, db_path: str): self.db_path = db_path @@ -99,7 +99,7 @@ async def _ensure_initialized(self) -> None: async with aiosqlite.connect(self.db_path) as db: await db.execute( """ - CREATE TABLE IF NOT EXISTS messages ( + CREATE TABLE IF NOT EXISTS conversation_items ( item_id TEXT PRIMARY KEY, role TEXT NOT NULL, event TEXT NOT NULL, @@ -113,15 +113,15 @@ async def _ensure_initialized(self) -> None: ) await db.execute( """ - CREATE INDEX IF NOT EXISTS idx_messages_conv_time - ON messages (conversation_id, created_at); + CREATE INDEX IF NOT EXISTS idx_item_conv_time + ON conversation_items (conversation_id, created_at); """ ) await db.commit() self._initialized = True @staticmethod - def _row_to_message(row: sqlite3.Row) -> ConversationItem: + def _row_to_item(row: sqlite3.Row) -> ConversationItem: return ConversationItem( item_id=row["item_id"], role=row["role"], @@ -132,43 +132,43 @@ def _row_to_message(row: sqlite3.Row) -> ConversationItem: payload=row["payload"], ) - async def save_message(self, message: ConversationItem) -> None: + async def save_item(self, item: ConversationItem) -> None: await self._ensure_initialized() - role_val = getattr(message.role, "value", str(message.role)) - event_val = getattr(message.event, "value", str(message.event)) + role_val = getattr(item.role, "value", str(item.role)) + event_val = getattr(item.event, "value", str(item.event)) async with aiosqlite.connect(self.db_path) as db: await db.execute( """ - INSERT OR REPLACE INTO messages ( + INSERT OR REPLACE INTO conversation_items ( item_id, role, event, conversation_id, thread_id, task_id, payload ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( - message.item_id, + item.item_id, role_val, event_val, - message.conversation_id, - message.thread_id, - message.task_id, - message.payload, + item.conversation_id, + item.thread_id, + item.task_id, + item.payload, ), ) await db.commit() - async def get_messages( + async def get_items( self, - session_id: str, + conversation_id: str, limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, ) -> List[ConversationItem]: await self._ensure_initialized() - params = [session_id] + params = [conversation_id] where = "WHERE conversation_id = ?" if role is not None: where += " AND role = ?" params.append(getattr(role, "value", str(role))) - sql = f"SELECT * FROM messages {where} ORDER BY datetime(created_at) ASC" + sql = f"SELECT * FROM conversation_items {where} ORDER BY datetime(created_at) ASC" if limit is not None: sql += " LIMIT ?" params.append(int(limit)) @@ -181,45 +181,45 @@ async def get_messages( db.row_factory = sqlite3.Row cur = await db.execute(sql, params) rows = await cur.fetchall() - return [self._row_to_message(r) for r in rows] + return [self._row_to_item(r) for r in rows] - async def get_latest_message(self, session_id: str) -> Optional[ConversationItem]: + async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]: await self._ensure_initialized() async with aiosqlite.connect(self.db_path) as db: db.row_factory = sqlite3.Row cur = await db.execute( - "SELECT * FROM messages WHERE conversation_id = ? ORDER BY datetime(created_at) DESC LIMIT 1", - (session_id,), + "SELECT * FROM conversation_items WHERE conversation_id = ? ORDER BY datetime(created_at) DESC LIMIT 1", + (conversation_id,), ) row = await cur.fetchone() - return self._row_to_message(row) if row else None + return self._row_to_item(row) if row else None - async def get_message(self, message_id: str) -> Optional[ConversationItem]: + async def get_item(self, item_id: str) -> Optional[ConversationItem]: await self._ensure_initialized() async with aiosqlite.connect(self.db_path) as db: db.row_factory = sqlite3.Row cur = await db.execute( - "SELECT * FROM messages WHERE item_id = ?", - (message_id,), + "SELECT * FROM conversation_items WHERE item_id = ?", + (item_id,), ) row = await cur.fetchone() - return self._row_to_message(row) if row else None + return self._row_to_item(row) if row else None - async def get_message_count(self, session_id: str) -> int: + async def get_item_count(self, conversation_id: str) -> int: await self._ensure_initialized() async with aiosqlite.connect(self.db_path) as db: cur = await db.execute( - "SELECT COUNT(1) FROM messages WHERE conversation_id = ?", - (session_id,), + "SELECT COUNT(1) FROM conversation_items WHERE conversation_id = ?", + (conversation_id,), ) row = await cur.fetchone() return int(row[0] if row else 0) - async def delete_session_messages(self, session_id: str) -> None: + async def delete_conversation_items(self, conversation_id: str) -> None: await self._ensure_initialized() async with aiosqlite.connect(self.db_path) as db: await db.execute( - "DELETE FROM messages WHERE conversation_id = ?", - (session_id,), + "DELETE FROM conversation_items WHERE conversation_id = ?", + (conversation_id,), ) await db.commit() diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py new file mode 100644 index 000000000..101961a71 --- /dev/null +++ b/python/valuecell/core/conversation/manager.py @@ -0,0 +1,215 @@ +from datetime import datetime +from typing import List, Optional + +from valuecell.core.types import ( + ConversationItem, + ConversationItemEvent, + ResponsePayload, + Role, +) +from valuecell.utils import generate_uuid + +from .item_store import InMemoryItemStore, ItemStore +from .models import Conversation, ConversationStatus +from .conversation_store import InMemoryConversationStore, ConversationStore + + +class ConversationManager: + """Conversation manager - handles both conversation metadata and items through separate stores""" + + def __init__( + self, + conversation_store: Optional[ConversationStore] = None, + item_store: Optional[ItemStore] = None, + ): + self.conversation_store = conversation_store or InMemoryConversationStore() + self.item_store = item_store or InMemoryItemStore() + + async def create_conversation( + self, + user_id: str, + title: Optional[str] = None, + conversation_id: Optional[str] = None, + ) -> Conversation: + """Create new conversation""" + conversation = Conversation( + conversation_id=conversation_id or generate_uuid("conversation"), + user_id=user_id, + title=title, + ) + await self.conversation_store.save_conversation(conversation) + return conversation + + async def get_conversation(self, conversation_id: str) -> Optional[Conversation]: + """Get conversation metadata""" + return await self.conversation_store.load_conversation(conversation_id) + + async def update_conversation(self, conversation: Conversation) -> None: + """Update conversation metadata""" + conversation.updated_at = datetime.now() + await self.conversation_store.save_conversation(conversation) + + async def delete_conversation(self, conversation_id: str) -> bool: + """Delete conversation and all its items""" + # First delete all items for this conversation + await self.item_store.delete_conversation_items(conversation_id) + + # Then delete the conversation metadata + return await self.conversation_store.delete_conversation(conversation_id) + + async def list_user_conversations( + self, user_id: str, limit: int = 100, offset: int = 0 + ) -> List[Conversation]: + """List user conversations""" + return await self.conversation_store.list_conversations(user_id, limit, offset) + + async def conversation_exists(self, conversation_id: str) -> bool: + """Check if conversation exists""" + return await self.conversation_store.conversation_exists(conversation_id) + + async def add_item( + self, + role: Role, + event: ConversationItemEvent, + conversation_id: str, + thread_id: Optional[str] = None, + task_id: Optional[str] = None, + payload: ResponsePayload = None, + item_id: Optional[str] = None, + ) -> Optional[ConversationItem]: + """Add item to conversation + + Args: + conversation_id: Conversation ID to add item to + role: Item role (USER, AGENT, SYSTEM) + event: Item event + thread_id: Thread ID (optional) + task_id: Associated task ID (optional) + payload: Item payload + item_id: Item ID (optional) + """ + # Verify conversation exists + conversation = await self.get_conversation(conversation_id) + if not conversation: + return None + + # Create item + # Serialize payload to JSON string if it's a pydantic model + payload_str = None + if payload is not None: + try: + # pydantic BaseModel supports model_dump_json + payload_str = payload.model_dump_json(exclude_none=True) + except Exception: + try: + payload_str = str(payload) + except Exception: + payload_str = None + + item = ConversationItem( + item_id=item_id or generate_uuid("item"), + role=role, + event=event, + conversation_id=conversation_id, + thread_id=thread_id, + task_id=task_id, + payload=payload_str, + ) + + # Save item directly to item store + await self.item_store.save_item(item) + + # Update conversation timestamp + conversation.touch() + await self.conversation_store.save_conversation(conversation) + + return item + + async def get_conversation_items( + self, + conversation_id: str, + ) -> List[ConversationItem]: + """Get items for a conversation with optional filtering and pagination + + Args: + conversation_id: Conversation ID + limit: Maximum number of items to return + offset: Number of items to skip + role: Filter by specific role (optional) + """ + return await self.item_store.get_items(conversation_id) + + async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]: + """Get latest item in a conversation""" + return await self.item_store.get_latest_item(conversation_id) + + async def get_item(self, item_id: str) -> Optional[ConversationItem]: + """Get a specific item by ID""" + return await self.item_store.get_item(item_id) + + async def get_item_count(self, conversation_id: str) -> int: + """Get total item count for a conversation""" + return await self.item_store.get_item_count(conversation_id) + + async def get_items_by_role( + self, conversation_id: str, role: Role + ) -> List[ConversationItem]: + """Get items filtered by role""" + return await self.item_store.get_items(conversation_id, role=role) + + async def deactivate_conversation(self, conversation_id: str) -> bool: + """Deactivate conversation""" + conversation = await self.get_conversation(conversation_id) + if not conversation: + return False + + conversation.deactivate() + await self.conversation_store.save_conversation(conversation) + return True + + async def activate_conversation(self, conversation_id: str) -> bool: + """Activate conversation""" + conversation = await self.get_conversation(conversation_id) + if not conversation: + return False + + conversation.activate() + await self.conversation_store.save_conversation(conversation) + return True + + async def set_conversation_status( + self, conversation_id: str, status: ConversationStatus + ) -> bool: + """Set conversation status""" + conversation = await self.get_conversation(conversation_id) + if not conversation: + return False + + conversation.set_status(status) + await self.conversation_store.save_conversation(conversation) + return True + + async def require_user_input(self, conversation_id: str) -> bool: + """Mark conversation as requiring user input""" + return await self.set_conversation_status( + conversation_id, ConversationStatus.REQUIRE_USER_INPUT + ) + + async def get_conversations_by_status( + self, + user_id: str, + status: ConversationStatus, + limit: int = 100, + offset: int = 0, + ) -> List[Conversation]: + """Get user conversations filtered by status""" + # Get all user conversations and filter by status + # Note: This could be optimized by adding status filtering to the store interface + all_conversations = await self.conversation_store.list_conversations( + user_id, limit * 2, offset + ) + return [ + conversation + for conversation in all_conversations + if conversation.status == status + ][:limit] diff --git a/python/valuecell/core/conversation/models.py b/python/valuecell/core/conversation/models.py new file mode 100644 index 000000000..98f4721a7 --- /dev/null +++ b/python/valuecell/core/conversation/models.py @@ -0,0 +1,59 @@ +from datetime import datetime +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class ConversationStatus(str, Enum): + """Conversation status enumeration""" + + ACTIVE = "active" + INACTIVE = "inactive" + REQUIRE_USER_INPUT = "require_user_input" + + +class Conversation(BaseModel): + """Conversation data model - lightweight metadata only, items stored separately""" + + conversation_id: str = Field(..., description="Unique conversation identifier") + user_id: str = Field(..., description="User ID") + title: Optional[str] = Field(None, description="Conversation title") + created_at: datetime = Field( + default_factory=datetime.now, description="Creation time" + ) + updated_at: datetime = Field( + default_factory=datetime.now, description="Last update time" + ) + status: ConversationStatus = Field( + default=ConversationStatus.ACTIVE, description="Conversation status" + ) + + class Config: + json_encoders = {datetime: lambda v: v.isoformat()} + + @property + def is_active(self) -> bool: + """Backward compatibility property - returns True if conversation is active""" + return self.status == ConversationStatus.ACTIVE + + def set_status(self, status: ConversationStatus) -> None: + """Update conversation status and timestamp""" + self.status = status + self.updated_at = datetime.now() + + def activate(self) -> None: + """Set conversation to active status""" + self.set_status(ConversationStatus.ACTIVE) + + def deactivate(self) -> None: + """Set conversation to inactive status""" + self.set_status(ConversationStatus.INACTIVE) + + def require_user_input(self) -> None: + """Set conversation to require user input status""" + self.set_status(ConversationStatus.REQUIRE_USER_INPUT) + + def touch(self) -> None: + """Update the conversation's last activity timestamp""" + self.updated_at = datetime.now() diff --git a/python/valuecell/core/session/tests/test_sqlite_message_store.py b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py similarity index 60% rename from python/valuecell/core/session/tests/test_sqlite_message_store.py rename to python/valuecell/core/conversation/tests/test_sqlite_item_store.py index 5d6c23b04..3bb4d5284 100644 --- a/python/valuecell/core/session/tests/test_sqlite_message_store.py +++ b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py @@ -2,19 +2,19 @@ import tempfile import pytest -from valuecell.core.session.message_store import SQLiteMessageStore +from valuecell.core.conversation.item_store import SQLiteItemStore from valuecell.core.types import ConversationItem, Role, SystemResponseEvent @pytest.mark.asyncio -async def test_sqlite_message_store_basic_crud(): +async def test_sqlite_item_store_basic_crud(): fd, path = tempfile.mkstemp(suffix=".db") os.close(fd) try: - store = SQLiteMessageStore(path) + store = SQLiteItemStore(path) - # create and save two messages - m1 = ConversationItem( + # create and save two items + i1 = ConversationItem( item_id="i1", role=Role.SYSTEM, event=SystemResponseEvent.THREAD_STARTED, @@ -23,7 +23,7 @@ async def test_sqlite_message_store_basic_crud(): task_id=None, payload='{"a":1}', ) - m2 = ConversationItem( + i2 = ConversationItem( item_id="i2", role=Role.SYSTEM, event=SystemResponseEvent.DONE, @@ -32,32 +32,32 @@ async def test_sqlite_message_store_basic_crud(): task_id=None, payload='{"a":1}', ) - await store.save_message(m1) - await store.save_message(m2) + await store.save_item(i1) + await store.save_item(i2) # count - cnt = await store.get_message_count("s1") + cnt = await store.get_item_count("s1") assert cnt == 2 # get latest - latest = await store.get_latest_message("s1") + latest = await store.get_latest_item("s1") assert latest is not None assert latest.item_id in {"i1", "i2"} # list - msgs = await store.get_messages("s1") - assert len(msgs) == 2 - ids = {m.item_id for m in msgs} + items = await store.get_items("s1") + assert len(items) == 2 + ids = {i.item_id for i in items} assert ids == {"i1", "i2"} # get one - one = await store.get_message("i1") + one = await store.get_item("i1") assert one is not None assert one.item_id == "i1" # delete - await store.delete_session_messages("s1") - cnt2 = await store.get_message_count("s1") + await store.delete_conversation_items("s1") + cnt2 = await store.get_item_count("s1") assert cnt2 == 0 finally: if os.path.exists(path): diff --git a/python/valuecell/core/coordinate/models.py b/python/valuecell/core/coordinate/models.py index ff3732ef5..efae9d7c2 100644 --- a/python/valuecell/core/coordinate/models.py +++ b/python/valuecell/core/coordinate/models.py @@ -14,8 +14,8 @@ class ExecutionPlan(BaseModel): """ plan_id: str = Field(..., description="Unique plan identifier") - session_id: Optional[str] = Field( - ..., description="Session ID this plan belongs to" + conversation_id: Optional[str] = Field( + ..., description="Conversation ID this plan belongs to" ) user_id: str = Field(..., description="User ID who requested this plan") orig_query: str = Field( diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index ba89f836c..095390a99 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -11,7 +11,11 @@ SideEffectKind, handle_status_update, ) -from valuecell.core.session import SessionManager, SessionStatus, SQLiteMessageStore +from valuecell.core.conversation import ( + ConversationManager, + ConversationStatus, + SQLiteItemStore, +) from valuecell.core.task import Task, TaskManager from valuecell.core.task.models import TaskPattern from valuecell.core.types import BaseResponse, UserInput @@ -31,9 +35,9 @@ class ExecutionContext: """Manages the state of an interrupted execution for resumption""" - def __init__(self, stage: str, session_id: str, thread_id: str, user_id: str): + def __init__(self, stage: str, conversation_id: str, thread_id: str, user_id: str): self.stage = stage - self.session_id = session_id + self.conversation_id = conversation_id self.thread_id = thread_id self.user_id = user_id self.created_at = asyncio.get_event_loop().time() @@ -65,32 +69,32 @@ class UserInputManager: def __init__(self): self._pending_requests: Dict[str, UserInputRequest] = {} - def add_request(self, session_id: str, request: UserInputRequest): + def add_request(self, conversation_id: str, request: UserInputRequest): """Add a pending user input request""" - self._pending_requests[session_id] = request + self._pending_requests[conversation_id] = request - def has_pending_request(self, session_id: str) -> bool: - """Check if there's a pending request for the session""" - return session_id in self._pending_requests + def has_pending_request(self, conversation_id: str) -> bool: + """Check if there's a pending request for the conversation""" + return conversation_id in self._pending_requests - def get_request_prompt(self, session_id: str) -> Optional[str]: + def get_request_prompt(self, conversation_id: str) -> Optional[str]: """Get the prompt for a pending request""" - request = self._pending_requests.get(session_id) + request = self._pending_requests.get(conversation_id) return request.prompt if request else None - def provide_response(self, session_id: str, response: str) -> bool: + def provide_response(self, conversation_id: str, response: str) -> bool: """Provide a response to a pending request""" - if session_id not in self._pending_requests: + if conversation_id not in self._pending_requests: return False - request = self._pending_requests[session_id] + request = self._pending_requests[conversation_id] request.provide_response(response) - del self._pending_requests[session_id] + del self._pending_requests[conversation_id] return True - def clear_request(self, session_id: str): + def clear_request(self, conversation_id: str): """Clear a pending request""" - self._pending_requests.pop(session_id, None) + self._pending_requests.pop(conversation_id, None) class AgentOrchestrator: @@ -100,13 +104,13 @@ class AgentOrchestrator: This class manages the entire lifecycle of user requests including: - Planning phase with user input collection - Task execution with interruption support - - Session state management + - Conversation state management - Error handling and recovery """ def __init__(self): - self.session_manager = SessionManager( - message_store=SQLiteMessageStore(resolve_db_path()) + self.conversation_manager = ConversationManager( + item_store=SQLiteItemStore(resolve_db_path()) ) self.task_manager = TaskManager() self.agent_connections = RemoteConnections() @@ -134,7 +138,7 @@ async def process_user_input( Handles three types of scenarios: 1. New user requests - starts planning and execution - 2. Continuation of interrupted sessions - resumes from saved state + 2. Continuation of interrupted conversations - resumes from saved state 3. User input responses - provides input to waiting requests Args: @@ -143,72 +147,84 @@ async def process_user_input( Yields: MessageChunk: Streaming response chunks from agents """ - session_id = user_input.meta.session_id + conversation_id = user_input.meta.conversation_id user_id = user_input.meta.user_id try: - # Ensure session exists - session = await self.session_manager.get_session(session_id) - if not session: - await self.session_manager.create_session( - user_id, session_id=session_id + # Ensure conversation exists + conversation = await self.conversation_manager.get_conversation( + conversation_id + ) + if not conversation: + await self.conversation_manager.create_conversation( + user_id, conversation_id=conversation_id + ) + conversation = await self.conversation_manager.get_conversation( + conversation_id ) - session = await self.session_manager.get_session(session_id) yield self._response_factory.conversation_started( - conversation_id=session_id + conversation_id=conversation_id ) - # Handle session continuation vs new request - if session.status == SessionStatus.REQUIRE_USER_INPUT: - async for response in self._handle_session_continuation(user_input): + # Handle conversation continuation vs new request + if conversation.status == ConversationStatus.REQUIRE_USER_INPUT: + async for response in self._handle_conversation_continuation( + user_input + ): yield response else: async for response in self._handle_new_request(user_input): yield response except Exception as e: - logger.exception(f"Error processing user input for session {session_id}") + logger.exception( + f"Error processing user input for conversation {conversation_id}" + ) yield self._response_factory.system_failed( - session_id, + conversation_id, f"(Error) Error processing request: {str(e)}", ) finally: - yield self._response_factory.done(session_id) + yield self._response_factory.done(conversation_id) - async def provide_user_input(self, session_id: str, response: str): + async def provide_user_input(self, conversation_id: str, response: str): """ - Provide user input response for a specific session. + Provide user input response for a specific conversation. Args: - session_id: The session ID waiting for input + conversation_id: The conversation ID waiting for input response: The user's response to the input request """ - if self.user_input_manager.provide_response(session_id, response): - # Update session status to active - session = await self.session_manager.get_session(session_id) - if session: - session.activate() - await self.session_manager.update_session(session) - - def has_pending_user_input(self, session_id: str) -> bool: - """Check if a session has pending user input request""" - return self.user_input_manager.has_pending_request(session_id) - - def get_user_input_prompt(self, session_id: str) -> Optional[str]: - """Get the user input prompt for a specific session""" - return self.user_input_manager.get_request_prompt(session_id) - - async def close_session(self, session_id: str): - """Close an existing session and clean up resources""" - # Cancel any running tasks for this session - await self.task_manager.cancel_session_tasks(session_id) + if self.user_input_manager.provide_response(conversation_id, response): + # Update conversation status to active + conversation = await self.conversation_manager.get_conversation( + conversation_id + ) + if conversation: + conversation.activate() + await self.conversation_manager.update_conversation(conversation) + + def has_pending_user_input(self, conversation_id: str) -> bool: + """Check if a conversation has pending user input request""" + return self.user_input_manager.has_pending_request(conversation_id) + + def get_user_input_prompt(self, conversation_id: str) -> Optional[str]: + """Get the user input prompt for a specific conversation""" + return self.user_input_manager.get_request_prompt(conversation_id) + + async def close_conversation(self, conversation_id: str): + """Close an existing conversation and clean up resources""" + # Cancel any running tasks for this conversation + await self.task_manager.cancel_conversation_tasks(conversation_id) # Clean up execution context - await self._cancel_execution(session_id) + await self._cancel_execution(conversation_id) - async def get_session_history(self, session_id: str) -> list[BaseResponse]: - """Get session message history""" - items = await self.session_manager.get_session_messages(session_id) + async def get_conversation_history( + self, conversation_id: str + ) -> list[BaseResponse]: + """Get conversation message history""" + items = await self.conversation_manager.get_conversation_items(conversation_id) return [self._response_factory.from_conversation_item(it) for it in items] async def cleanup(self): @@ -220,45 +236,47 @@ async def cleanup(self): async def _handle_user_input_request(self, request: UserInputRequest): """Handle user input request from planner""" - # Extract session_id from request context - session_id = getattr(request, "session_id", None) - if session_id: - self.user_input_manager.add_request(session_id, request) + # Extract conversation_id from request context + conversation_id = getattr(request, "conversation_id", None) + if conversation_id: + self.user_input_manager.add_request(conversation_id, request) - async def _handle_session_continuation( + async def _handle_conversation_continuation( self, user_input: UserInput ) -> AsyncGenerator[BaseResponse, None]: - """Handle continuation of an interrupted session""" - session_id = user_input.meta.session_id + """Handle continuation of an interrupted conversation""" + conversation_id = user_input.meta.conversation_id user_id = user_input.meta.user_id # Validate execution context exists - if session_id not in self._execution_contexts: + if conversation_id not in self._execution_contexts: yield self._response_factory.system_failed( - session_id, - "No execution context found for this session. The session may have expired.", + conversation_id, + "No execution context found for this conversation. The conversation may have expired.", ) return - context = self._execution_contexts[session_id] + context = self._execution_contexts[conversation_id] # Validate context integrity and user consistency if not self._validate_execution_context(context, user_id): yield self._response_factory.system_failed( - session_id, + conversation_id, "Invalid execution context or user mismatch.", ) - await self._cancel_execution(session_id) + await self._cancel_execution(conversation_id) return # Provide user response and resume execution # If we are in an execution stage, store the pending response for resume context.add_metadata(pending_response=user_input.query) - await self.provide_user_input(session_id, user_input.query) + await self.provide_user_input(conversation_id, user_input.query) thread_id = generate_thread_id() response = self._response_factory.thread_started( - conversation_id=session_id, thread_id=thread_id, user_query=user_input.query + conversation_id=conversation_id, + thread_id=thread_id, + user_query=user_input.query, ) await self._persist_from_buffer(response) yield response @@ -267,13 +285,13 @@ async def _handle_session_continuation( # Resume based on execution stage if context.stage == "planning": async for response in self._continue_planning( - session_id, thread_id, context + conversation_id, thread_id, context ): yield response # Resuming execution stage is not yet supported else: yield self._response_factory.system_failed( - session_id, + conversation_id, "Resuming execution stage is not yet supported.", ) @@ -281,16 +299,18 @@ async def _handle_new_request( self, user_input: UserInput ) -> AsyncGenerator[BaseResponse, None]: """Handle a new user request""" - session_id = user_input.meta.session_id + conversation_id = user_input.meta.conversation_id thread_id = generate_thread_id() response = self._response_factory.thread_started( - conversation_id=session_id, thread_id=thread_id, user_query=user_input.query + conversation_id=conversation_id, + thread_id=thread_id, + user_query=user_input.query, ) await self._persist_from_buffer(response) yield response # Create planning task with user input callback - context_aware_callback = self._create_context_aware_callback(session_id) + context_aware_callback = self._create_context_aware_callback(conversation_id) planning_task = asyncio.create_task( self.planner.create_plan(user_input, context_aware_callback) @@ -302,11 +322,11 @@ async def _handle_new_request( ): yield response - def _create_context_aware_callback(self, session_id: str): - """Create a callback that adds session context to user input requests""" + def _create_context_aware_callback(self, conversation_id: str): + """Create a callback that adds conversation context to user input requests""" async def context_aware_handle(request): - request.session_id = session_id + request.conversation_id = conversation_id await self._handle_user_input_request(request) return context_aware_handle @@ -319,25 +339,29 @@ async def _monitor_planning_task( callback, ) -> AsyncGenerator[BaseResponse, None]: """Monitor planning task and handle user input interruptions""" - session_id = user_input.meta.session_id + conversation_id = user_input.meta.conversation_id user_id = user_input.meta.user_id # Wait for planning completion or user input request while not planning_task.done(): - if self.has_pending_user_input(session_id): + if self.has_pending_user_input(conversation_id): # Save planning context - context = ExecutionContext("planning", session_id, thread_id, user_id) + context = ExecutionContext( + "planning", conversation_id, thread_id, user_id + ) context.add_metadata( original_user_input=user_input, planning_task=planning_task, planner_callback=callback, ) - self._execution_contexts[session_id] = context + self._execution_contexts[conversation_id] = context - # Update session status and send user input request - await self._request_user_input(session_id) + # Update conversation status and send user input request + await self._request_user_input(conversation_id) response = self._response_factory.plan_require_user_input( - session_id, thread_id, self.get_user_input_prompt(session_id) + conversation_id, + thread_id, + self.get_user_input_prompt(conversation_id), ) await self._persist_from_buffer(response) yield response @@ -350,12 +374,12 @@ async def _monitor_planning_task( async for response in self._execute_plan_with_input_support(plan, thread_id): yield response - async def _request_user_input(self, session_id: str): - """Set session to require user input and send the request""" - session = await self.session_manager.get_session(session_id) - if session: - session.require_user_input() - await self.session_manager.update_session(session) + async def _request_user_input(self, conversation_id: str): + """Set conversation to require user input and send the request""" + conversation = await self.conversation_manager.get_conversation(conversation_id) + if conversation: + conversation.require_user_input() + await self.conversation_manager.update_conversation(conversation) def _validate_execution_context( self, context: ExecutionContext, user_id: str @@ -373,7 +397,7 @@ def _validate_execution_context( return True async def _continue_planning( - self, session_id: str, thread_id: str, context: ExecutionContext + self, conversation_id: str, thread_id: str, context: ExecutionContext ) -> AsyncGenerator[BaseResponse, None]: """Resume planning stage execution""" planning_task = context.get_metadata("planning_task") @@ -381,22 +405,22 @@ async def _continue_planning( if not all([planning_task, original_user_input]): yield self._response_factory.plan_failed( - session_id, + conversation_id, thread_id, "Invalid planning context - missing required data", ) - await self._cancel_execution(session_id) + await self._cancel_execution(conversation_id) return # Continue monitoring planning task while not planning_task.done(): - if self.has_pending_user_input(session_id): + if self.has_pending_user_input(conversation_id): # Still need more user input, send request - prompt = self.get_user_input_prompt(session_id) - # Ensure session is set to require user input again for repeated prompts - await self._request_user_input(session_id) + prompt = self.get_user_input_prompt(conversation_id) + # Ensure conversation is set to require user input again for repeated prompts + await self._request_user_input(conversation_id) response = self._response_factory.plan_require_user_input( - session_id, thread_id, prompt + conversation_id, thread_id, prompt ) await self._persist_from_buffer(response) yield response @@ -406,47 +430,47 @@ async def _continue_planning( # Planning completed, execute plan and clean up context plan = await planning_task - del self._execution_contexts[session_id] + del self._execution_contexts[conversation_id] async for response in self._execute_plan_with_input_support(plan, thread_id): yield response - async def _cancel_execution(self, session_id: str): + async def _cancel_execution(self, conversation_id: str): """Cancel execution and clean up all related resources""" # Clean up execution context - if session_id in self._execution_contexts: - context = self._execution_contexts[session_id] + if conversation_id in self._execution_contexts: + context = self._execution_contexts[conversation_id] # Cancel planning task if it exists and is not done planning_task = context.get_metadata("planning_task") if planning_task and not planning_task.done(): planning_task.cancel() - del self._execution_contexts[session_id] + del self._execution_contexts[conversation_id] # Clear pending user input - self.user_input_manager.clear_request(session_id) + self.user_input_manager.clear_request(conversation_id) - # Reset session status - session = await self.session_manager.get_session(session_id) - if session: - session.activate() - await self.session_manager.update_session(session) + # Reset conversation status + conversation = await self.conversation_manager.get_conversation(conversation_id) + if conversation: + conversation.activate() + await self.conversation_manager.update_conversation(conversation) async def _cleanup_expired_contexts( self, max_age_seconds: int = DEFAULT_CONTEXT_TIMEOUT_SECONDS ): """Clean up execution contexts that have been idle for too long""" - expired_sessions = [ - session_id - for session_id, context in self._execution_contexts.items() + expired_conversations = [ + conversation_id + for conversation_id, context in self._execution_contexts.items() if context.is_expired(max_age_seconds) ] - for session_id in expired_sessions: - await self._cancel_execution(session_id) + for conversation_id in expired_conversations: + await self._cancel_execution(conversation_id) logger.warning( - f"Cleaned up expired execution context for session {session_id}" + f"Cleaned up expired execution context for conversation {conversation_id}" ) # ==================== Plan and Task Execution Methods ==================== @@ -462,13 +486,13 @@ async def _execute_plan_with_input_support( Args: plan: The execution plan containing tasks to execute - metadata: Execution metadata containing session and user info + metadata: Execution metadata containing conversation and user info """ - session_id = plan.session_id + conversation_id = plan.conversation_id if not plan.tasks: yield self._response_factory.plan_failed( - session_id, thread_id, "No tasks found for this request." + conversation_id, thread_id, "No tasks found for this request." ) return @@ -493,7 +517,7 @@ async def _execute_plan_with_input_support( error_msg = f"(Error) Error executing {task.task_id}: {str(e)}" logger.exception(f"Task execution failed: {error_msg}") yield self._response_factory.task_failed( - session_id, + conversation_id, thread_id, task.task_id, error_msg, @@ -513,7 +537,7 @@ async def _execute_task_with_input_support( try: # Start task execution task_id = task.task_id - conversation_id = task.session_id + conversation_id = task.conversation_id await self.task_manager.start_task(task_id) # Get agent connection @@ -534,7 +558,7 @@ async def _execute_task_with_input_support( # Send message to agent remote_response = await client.send_message( task.query, - session_id=conversation_id, + conversation_id=conversation_id, metadata=metadata, streaming=agent_card.capabilities.streaming, ) @@ -604,7 +628,7 @@ async def _persist_from_buffer(self, response: BaseResponse): async def _persist_items(self, items: list[SaveItem]): for it in items: - await self.session_manager.add_message( + await self.conversation_manager.add_item( role=it.role, event=it.event, conversation_id=it.conversation_id, diff --git a/python/valuecell/core/coordinate/planner.py b/python/valuecell/core/coordinate/planner.py index 17c2b2569..3e3c5ceff 100644 --- a/python/valuecell/core/coordinate/planner.py +++ b/python/valuecell/core/coordinate/planner.py @@ -75,7 +75,7 @@ async def create_plan( """ plan = ExecutionPlan( plan_id=generate_uuid("plan"), - session_id=user_input.meta.session_id, + conversation_id=user_input.meta.conversation_id, user_id=user_input.meta.user_id, orig_query=user_input.query, # Store the original query created_at=datetime.now().isoformat(), @@ -170,7 +170,7 @@ async def _analyze_input_and_create_tasks( ) return [ self._create_task( - user_input.meta.session_id, + user_input.meta.conversation_id, user_input.meta.user_id, task.agent_name, task.query, @@ -181,7 +181,7 @@ async def _analyze_input_and_create_tasks( def _create_task( self, - session_id: str, + conversation_id: str, user_id: str, agent_name: str, query: str, @@ -191,7 +191,7 @@ def _create_task( Create a new task for the specified agent. Args: - session_id: Session this task belongs to + conversation_id: Conversation this task belongs to user_id: User who requested this task agent_name: Name of the agent to execute the task query: Query/prompt for the agent @@ -202,7 +202,7 @@ def _create_task( """ return Task( task_id=generate_uuid("task"), - session_id=session_id, + conversation_id=conversation_id, user_id=user_id, agent_name=agent_name, status=TaskStatus.PENDING, diff --git a/python/valuecell/core/coordinate/response_buffer.py b/python/valuecell/core/coordinate/response_buffer.py index 06790f435..dcb1a8911 100644 --- a/python/valuecell/core/coordinate/response_buffer.py +++ b/python/valuecell/core/coordinate/response_buffer.py @@ -229,7 +229,7 @@ def _make_save_item_from_response(self, resp: BaseResponse) -> SaveItem: data: UnifiedResponseData = resp.data payload = data.payload - # Ensure payload is BaseModel for SessionManager + # Ensure payload is BaseModel if isinstance(payload, BaseModel): bm = payload elif isinstance(payload, str): diff --git a/python/valuecell/core/coordinate/response_router.py b/python/valuecell/core/coordinate/response_router.py index bca1b6fd9..8a2f01cda 100644 --- a/python/valuecell/core/coordinate/response_router.py +++ b/python/valuecell/core/coordinate/response_router.py @@ -54,7 +54,7 @@ async def handle_status_update( err_msg = get_message_text(event.status.message) responses.append( response_factory.task_failed( - conversation_id=task.session_id, + conversation_id=task.conversation_id, thread_id=thread_id, task_id=task.task_id, content=err_msg, @@ -81,7 +81,7 @@ async def handle_status_update( tool_result = get_message_text(event.metadata.get("tool_result", "")) responses.append( response_factory.tool_call( - conversation_id=task.session_id, + conversation_id=task.conversation_id, thread_id=thread_id, task_id=task.task_id, event=response_event, @@ -97,7 +97,7 @@ async def handle_status_update( if state == TaskState.working and EventPredicates.is_reasoning(response_event): responses.append( response_factory.reasoning( - conversation_id=task.session_id, + conversation_id=task.conversation_id, thread_id=thread_id, task_id=task.task_id, event=response_event, @@ -114,7 +114,7 @@ async def handle_status_update( component_type = event.metadata.get("component_type", "unknown") responses.append( response_factory.component_generator( - conversation_id=task.session_id, + conversation_id=task.conversation_id, thread_id=thread_id, task_id=task.task_id, content=content, @@ -128,7 +128,7 @@ async def handle_status_update( responses.append( response_factory.message_response_general( event=response_event, - conversation_id=task.session_id, + conversation_id=task.conversation_id, thread_id=thread_id, task_id=task.task_id, content=content, diff --git a/python/valuecell/core/coordinate/tests/test_e2e_persistence.py b/python/valuecell/core/coordinate/tests/test_e2e_persistence.py index c222546cd..5aed51c79 100644 --- a/python/valuecell/core/coordinate/tests/test_e2e_persistence.py +++ b/python/valuecell/core/coordinate/tests/test_e2e_persistence.py @@ -8,19 +8,18 @@ @pytest.mark.asyncio async def test_orchestrator_buffer_store_e2e(tmp_path, monkeypatch): - # Point default SessionManager to a temp sqlite file db_path = tmp_path / "e2e_valuecell.db" monkeypatch.setenv("VALUECELL_SQLITE_DB", str(db_path)) orch = AgentOrchestrator() - # Prepare a session and a simple query; orchestrator will create the session if missing - session_id = "e2e-session" + # Prepare a conversation and a simple query; orchestrator will create the conversation if missing + conversation_id = "e2e-conversation" user_id = "e2e-user" ui = UserInput( query="hello world", desired_agent_name="TestAgent", - meta=UserInputMetadata(session_id=session_id, user_id=user_id), + meta=UserInputMetadata(conversation_id=conversation_id, user_id=user_id), ) # We don't have a live agent, so we expect planner/agent logic to raise; we just want to ensure @@ -35,13 +34,13 @@ async def test_orchestrator_buffer_store_e2e(tmp_path, monkeypatch): # Orchestrator is defensive, should not raise; but in case, we still proceed to check persistence pass - # Verify persistence: at least 1 message exists for session - msgs = await orch.session_manager.get_session_messages(session_id) + # Verify persistence: at least 1 message exists for conversation + msgs = await orch.conversation_manager.get_conversation_items(conversation_id) assert isinstance(msgs, list) assert len(msgs) >= 1 # Also verify we can count and fetch latest - cnt = await orch.session_manager.get_message_count(session_id) + cnt = await orch.conversation_manager.get_item_count(conversation_id) assert cnt == len(msgs) - latest = await orch.session_manager.get_latest_message(session_id) + latest = await orch.conversation_manager.get_latest_item(conversation_id) assert latest is not None diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index 629377799..ffbe769c0 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -4,7 +4,7 @@ Focus on essential behavior without over-engineering: - Happy path (streaming and non-streaming) - Planner error and agent connection error -- Session create/close and cleanup +- Conversation create/close and cleanup """ from types import SimpleNamespace @@ -27,7 +27,7 @@ from valuecell.core.coordinate.models import ExecutionPlan from valuecell.core.coordinate.orchestrator import AgentOrchestrator -from valuecell.core.session import SessionStatus +from valuecell.core.conversation import ConversationStatus from valuecell.core.task import Task, TaskStatus as CoreTaskStatus from valuecell.core.types import UserInput, UserInputMetadata @@ -37,9 +37,9 @@ # ------------------------- -@pytest.fixture(name="session_id") -def _session_id() -> str: - return "test-session-123" +@pytest.fixture(name="conversation_id") +def _conversation_id() -> str: + return "test-conversation-123" @pytest.fixture(name="user_id") @@ -53,19 +53,21 @@ def _sample_query() -> str: @pytest.fixture(name="sample_user_input") -def _sample_user_input(session_id: str, user_id: str, sample_query: str) -> UserInput: +def _sample_user_input( + conversation_id: str, user_id: str, sample_query: str +) -> UserInput: return UserInput( query=sample_query, desired_agent_name="TestAgent", - meta=UserInputMetadata(session_id=session_id, user_id=user_id), + meta=UserInputMetadata(conversation_id=conversation_id, user_id=user_id), ) @pytest.fixture(name="sample_task") -def _sample_task(session_id: str, user_id: str, sample_query: str) -> Task: +def _sample_task(conversation_id: str, user_id: str, sample_query: str) -> Task: return Task( task_id="task-1", - session_id=session_id, + conversation_id=conversation_id, user_id=user_id, agent_name="TestAgent", query=sample_query, @@ -76,11 +78,11 @@ def _sample_task(session_id: str, user_id: str, sample_query: str) -> Task: @pytest.fixture(name="sample_plan") def _sample_plan( - session_id: str, user_id: str, sample_query: str, sample_task: Task + conversation_id: str, user_id: str, sample_query: str, sample_task: Task ) -> ExecutionPlan: return ExecutionPlan( plan_id="plan-1", - session_id=session_id, + conversation_id=conversation_id, user_id=user_id, orig_query=sample_query, tasks=[sample_task], @@ -88,30 +90,30 @@ def _sample_plan( ) -def _stub_session(status: Any = SessionStatus.ACTIVE): - # Minimal session stub with status and basic methods used by orchestrator +def _stub_conversation(status: Any = ConversationStatus.ACTIVE): + # Minimal conversation stub with status and basic methods used by orchestrator s = SimpleNamespace(status=status) def activate(): - s.status = SessionStatus.ACTIVE + s.status = ConversationStatus.ACTIVE def require_user_input(): - s.status = SessionStatus.REQUIRE_USER_INPUT + s.status = ConversationStatus.REQUIRE_USER_INPUT s.activate = activate s.require_user_input = require_user_input return s -@pytest.fixture(name="mock_session_manager") -def _mock_session_manager() -> Mock: +@pytest.fixture(name="mock_conversation_manager") +def _mock_conversation_manager() -> Mock: m = Mock() - m.add_message = AsyncMock() - m.create_session = AsyncMock(return_value="new-session-id") - m.get_session_messages = AsyncMock(return_value=[]) - m.list_user_sessions = AsyncMock(return_value=[]) - m.get_session = AsyncMock(return_value=_stub_session()) - m.update_session = AsyncMock() + m.add_item = AsyncMock() + m.create_conversation = AsyncMock(return_value="new-conversation-id") + m.get_conversation_items = AsyncMock(return_value=[]) + m.list_user_conversations = AsyncMock(return_value=[]) + m.get_conversation = AsyncMock(return_value=_stub_conversation()) + m.update_conversation = AsyncMock() return m @@ -122,7 +124,7 @@ def _mock_task_manager() -> Mock: m.start_task = AsyncMock() m.complete_task = AsyncMock() m.fail_task = AsyncMock() - m.cancel_session_tasks = AsyncMock(return_value=0) + m.cancel_conversation_tasks = AsyncMock(return_value=0) return m @@ -172,10 +174,10 @@ def _mock_planner(sample_plan: ExecutionPlan) -> Mock: @pytest.fixture(name="orchestrator") def _orchestrator( - mock_session_manager: Mock, mock_task_manager: Mock, mock_planner: Mock + mock_conversation_manager: Mock, mock_task_manager: Mock, mock_planner: Mock ) -> AgentOrchestrator: o = AgentOrchestrator() - o.session_manager = mock_session_manager + o.conversation_manager = mock_conversation_manager o.task_manager = mock_task_manager o.planner = mock_planner return o diff --git a/python/valuecell/core/session/__init__.py b/python/valuecell/core/session/__init__.py deleted file mode 100644 index 11cea3f45..000000000 --- a/python/valuecell/core/session/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Session module initialization""" - -from .manager import SessionManager -from .message_store import InMemoryMessageStore, MessageStore, SQLiteMessageStore -from .models import Session, SessionStatus -from .store import InMemorySessionStore, SessionStore - -__all__ = [ - # Models - "Session", - "SessionStatus", - # Session management - "SessionManager", - # Session storage - "SessionStore", - "InMemorySessionStore", - # Message storage - "MessageStore", - "InMemoryMessageStore", - "SQLiteMessageStore", -] diff --git a/python/valuecell/core/session/manager.py b/python/valuecell/core/session/manager.py deleted file mode 100644 index 9b06f7502..000000000 --- a/python/valuecell/core/session/manager.py +++ /dev/null @@ -1,204 +0,0 @@ -from datetime import datetime -from typing import List, Optional - -from valuecell.core.types import ( - ConversationItem, - ConversationItemEvent, - ResponsePayload, - Role, -) -from valuecell.utils import generate_uuid - -from .message_store import InMemoryMessageStore, MessageStore -from .models import Session, SessionStatus -from .store import InMemorySessionStore, SessionStore - - -class SessionManager: - """Session manager - handles both session metadata and messages through separate stores""" - - def __init__( - self, - session_store: Optional[SessionStore] = None, - message_store: Optional[MessageStore] = None, - ): - self.session_store = session_store or InMemorySessionStore() - self.message_store = message_store or InMemoryMessageStore() - - async def create_session( - self, - user_id: str, - title: Optional[str] = None, - session_id: Optional[str] = None, - ) -> Session: - """Create new session""" - session = Session( - session_id=session_id or generate_uuid("session"), - user_id=user_id, - title=title, - ) - await self.session_store.save_session(session) - return session - - async def get_session(self, session_id: str) -> Optional[Session]: - """Get session metadata""" - return await self.session_store.load_session(session_id) - - async def update_session(self, session: Session) -> None: - """Update session metadata""" - session.updated_at = datetime.now() - await self.session_store.save_session(session) - - async def delete_session(self, session_id: str) -> bool: - """Delete session and all its messages""" - # First delete all messages for this session - await self.message_store.delete_session_messages(session_id) - - # Then delete the session metadata - return await self.session_store.delete_session(session_id) - - async def list_user_sessions( - self, user_id: str, limit: int = 100, offset: int = 0 - ) -> List[Session]: - """List user sessions""" - return await self.session_store.list_sessions(user_id, limit, offset) - - async def session_exists(self, session_id: str) -> bool: - """Check if session exists""" - return await self.session_store.session_exists(session_id) - - async def add_message( - self, - role: Role, - event: ConversationItemEvent, - conversation_id: str, - thread_id: Optional[str] = None, - task_id: Optional[str] = None, - payload: ResponsePayload = None, - item_id: Optional[str] = None, - ) -> Optional[ConversationItem]: - """Add message to session - - Args: - session_id: Session ID to add message to - role: Message role (USER, AGENT, SYSTEM) - content: Message content - user_id: User ID (will be fetched from session if not provided) - agent_name: Agent name (optional) - task_id: Associated task ID (optional) - """ - # Verify session exists - session = await self.get_session(conversation_id) - if not session: - return None - - # Create message - # Serialize payload to JSON string if it's a pydantic model - payload_str = None - if payload is not None: - try: - # pydantic BaseModel supports model_dump_json - payload_str = payload.model_dump_json(exclude_none=True) - except Exception: - try: - payload_str = str(payload) - except Exception: - payload_str = None - - item = ConversationItem( - item_id=item_id or generate_uuid("item"), - role=role, - event=event, - conversation_id=conversation_id, - thread_id=thread_id, - task_id=task_id, - payload=payload_str, - ) - - # Save message directly to message store - await self.message_store.save_message(item) - - # Update session timestamp - session.touch() - await self.session_store.save_session(session) - - return item - - async def get_session_messages( - self, - session_id: str, - ) -> List[ConversationItem]: - """Get messages for a session with optional filtering and pagination - - Args: - session_id: Session ID - limit: Maximum number of messages to return - offset: Number of messages to skip - role: Filter by specific role (optional) - """ - return await self.message_store.get_messages(session_id) - - async def get_latest_message(self, session_id: str) -> Optional[ConversationItem]: - """Get latest message in a session""" - return await self.message_store.get_latest_message(session_id) - - async def get_message(self, message_id: str) -> Optional[ConversationItem]: - """Get a specific message by ID""" - return await self.message_store.get_message(message_id) - - async def get_message_count(self, session_id: str) -> int: - """Get total message count for a session""" - return await self.message_store.get_message_count(session_id) - - async def get_messages_by_role( - self, session_id: str, role: Role - ) -> List[ConversationItem]: - """Get messages filtered by role""" - return await self.message_store.get_messages(session_id, role=role) - - async def deactivate_session(self, session_id: str) -> bool: - """Deactivate session""" - session = await self.get_session(session_id) - if not session: - return False - - session.deactivate() - await self.session_store.save_session(session) - return True - - async def activate_session(self, session_id: str) -> bool: - """Activate session""" - session = await self.get_session(session_id) - if not session: - return False - - session.activate() - await self.session_store.save_session(session) - return True - - async def set_session_status(self, session_id: str, status: SessionStatus) -> bool: - """Set session status""" - session = await self.get_session(session_id) - if not session: - return False - - session.set_status(status) - await self.session_store.save_session(session) - return True - - async def require_user_input(self, session_id: str) -> bool: - """Mark session as requiring user input""" - return await self.set_session_status( - session_id, SessionStatus.REQUIRE_USER_INPUT - ) - - async def get_sessions_by_status( - self, user_id: str, status: SessionStatus, limit: int = 100, offset: int = 0 - ) -> List[Session]: - """Get user sessions filtered by status""" - # Get all user sessions and filter by status - # Note: This could be optimized by adding status filtering to the store interface - all_sessions = await self.session_store.list_sessions( - user_id, limit * 2, offset - ) - return [session for session in all_sessions if session.status == status][:limit] diff --git a/python/valuecell/core/session/models.py b/python/valuecell/core/session/models.py deleted file mode 100644 index 2835216d4..000000000 --- a/python/valuecell/core/session/models.py +++ /dev/null @@ -1,59 +0,0 @@ -from datetime import datetime -from enum import Enum -from typing import Optional - -from pydantic import BaseModel, Field - - -class SessionStatus(str, Enum): - """Session status enumeration""" - - ACTIVE = "active" - INACTIVE = "inactive" - REQUIRE_USER_INPUT = "require_user_input" - - -class Session(BaseModel): - """Session data model - lightweight metadata only, messages stored separately""" - - session_id: str = Field(..., description="Unique session identifier") - user_id: str = Field(..., description="User ID") - title: Optional[str] = Field(None, description="Session title") - created_at: datetime = Field( - default_factory=datetime.now, description="Creation time" - ) - updated_at: datetime = Field( - default_factory=datetime.now, description="Last update time" - ) - status: SessionStatus = Field( - default=SessionStatus.ACTIVE, description="Session status" - ) - - class Config: - json_encoders = {datetime: lambda v: v.isoformat()} - - @property - def is_active(self) -> bool: - """Backward compatibility property - returns True if session is active""" - return self.status == SessionStatus.ACTIVE - - def set_status(self, status: SessionStatus) -> None: - """Update session status and timestamp""" - self.status = status - self.updated_at = datetime.now() - - def activate(self) -> None: - """Set session to active status""" - self.set_status(SessionStatus.ACTIVE) - - def deactivate(self) -> None: - """Set session to inactive status""" - self.set_status(SessionStatus.INACTIVE) - - def require_user_input(self) -> None: - """Set session to require user input status""" - self.set_status(SessionStatus.REQUIRE_USER_INPUT) - - def touch(self) -> None: - """Update the session's last activity timestamp""" - self.updated_at = datetime.now() diff --git a/python/valuecell/core/session/store.py b/python/valuecell/core/session/store.py deleted file mode 100644 index 99f5c6ae5..000000000 --- a/python/valuecell/core/session/store.py +++ /dev/null @@ -1,82 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Dict, List, Optional - -from .models import Session - - -class SessionStore(ABC): - """Session storage abstract base class - handles session metadata only. - - Messages are stored separately using MessageStore implementations. - """ - - @abstractmethod - async def save_session(self, session: Session) -> None: - """Save session""" - - @abstractmethod - async def load_session(self, session_id: str) -> Optional[Session]: - """Load session""" - - @abstractmethod - async def delete_session(self, session_id: str) -> bool: - """Delete session""" - - @abstractmethod - async def list_sessions( - self, user_id: str, limit: int = 100, offset: int = 0 - ) -> List[Session]: - """List user sessions""" - - @abstractmethod - async def session_exists(self, session_id: str) -> bool: - """Check if session exists""" - - -class InMemorySessionStore(SessionStore): - """In-memory session storage implementation""" - - def __init__(self): - self._sessions: Dict[str, Session] = {} - - async def save_session(self, session: Session) -> None: - """Save session to memory""" - self._sessions[session.session_id] = session - - async def load_session(self, session_id: str) -> Optional[Session]: - """Load session from memory""" - return self._sessions.get(session_id) - - async def delete_session(self, session_id: str) -> bool: - """Delete session from memory""" - if session_id in self._sessions: - del self._sessions[session_id] - return True - return False - - async def list_sessions( - self, user_id: str, limit: int = 100, offset: int = 0 - ) -> List[Session]: - """List user sessions""" - user_sessions = [ - session for session in self._sessions.values() if session.user_id == user_id - ] - # Sort by creation time descending - user_sessions.sort(key=lambda s: s.created_at, reverse=True) - - # Apply pagination - start = offset - end = offset + limit - return user_sessions[start:end] - - async def session_exists(self, session_id: str) -> bool: - """Check if session exists""" - return session_id in self._sessions - - def clear_all(self) -> None: - """Clear all sessions (for testing)""" - self._sessions.clear() - - def get_session_count(self) -> int: - """Get total session count (for debugging)""" - return len(self._sessions) diff --git a/python/valuecell/core/task/manager.py b/python/valuecell/core/task/manager.py index 6bc912cc0..0b48426b1 100644 --- a/python/valuecell/core/task/manager.py +++ b/python/valuecell/core/task/manager.py @@ -68,9 +68,11 @@ async def cancel_task(self, task_id: str) -> bool: return True # Batch operations - async def cancel_session_tasks(self, session_id: str) -> int: - """Cancel all unfinished tasks in a session""" - tasks = [t for t in self._tasks.values() if t.session_id == session_id] + async def cancel_conversation_tasks(self, conversation_id: str) -> int: + """Cancel all unfinished tasks in a conversation""" + tasks = [ + t for t in self._tasks.values() if t.conversation_id == conversation_id + ] cancelled_count = 0 for task in tasks: diff --git a/python/valuecell/core/task/models.py b/python/valuecell/core/task/models.py index 66718258b..9ac326b79 100644 --- a/python/valuecell/core/task/models.py +++ b/python/valuecell/core/task/models.py @@ -32,7 +32,9 @@ class Task(BaseModel): description="Task identifier determined by the remote agent after submission", ) query: str = Field(..., description="The task to be performed") - session_id: str = Field(..., description="Session ID this task belongs to") + conversation_id: str = Field( + ..., description="Conversation ID this task belongs to" + ) user_id: str = Field(..., description="User ID who initiated this task") agent_name: str = Field(..., description="Name of the agent executing this task") status: TaskStatus = Field( diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 62c562f65..10eb7e808 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -10,7 +10,9 @@ class UserInputMetadata(BaseModel): """Metadata associated with user input""" - session_id: Optional[str] = Field(None, description="Session ID for this request") + conversation_id: Optional[str] = Field( + None, description="Conversation ID for this request" + ) user_id: str = Field(..., description="User ID who made this request") @@ -288,14 +290,14 @@ class BaseAgent(ABC): @abstractmethod async def stream( - self, query: str, session_id: str, task_id: str + self, query: str, conversation_id: str, task_id: str ) -> AsyncGenerator[StreamResponse, None]: """ Process user queries and return streaming responses (user-initiated) Args: query: User query content - session_id: Session ID + conversation_id: Conversation ID task_id: Task ID Yields: @@ -304,14 +306,14 @@ async def stream( raise NotImplementedError async def notify( - self, query: str, session_id: str, task_id: str + self, query: str, conversation_id: str, task_id: str ) -> AsyncGenerator[NotifyResponse, None]: """ Send proactive notifications to subscribed users (agent-initiated) Args: query: User query content, can be empty for some agents - session_id: Session ID for the notification + conversation_id: Conversation ID for the notification user_id: Target user ID for the notification Yields: diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 350b94dd9..e5f89450e 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -37,9 +37,11 @@ async def stream_query_agent( user_id = "default_user" desired_agent_name = agent_name - session_id = agent_name + "_session_" + user_id + conversation_id = agent_name + "_conv_" + user_id - user_input_meta = UserInputMetadata(user_id=user_id, session_id=session_id) + user_input_meta = UserInputMetadata( + user_id=user_id, conversation_id=conversation_id + ) user_input = UserInput( query=query, desired_agent_name=desired_agent_name, meta=user_input_meta