diff --git a/python/valuecell/core/conversation/item_store.py b/python/valuecell/core/conversation/item_store.py index 6510b5d8d..da68d4fcd 100644 --- a/python/valuecell/core/conversation/item_store.py +++ b/python/valuecell/core/conversation/item_store.py @@ -2,11 +2,11 @@ import asyncio import sqlite3 -import aiosqlite from abc import ABC, abstractmethod from typing import Dict, List, Optional -from valuecell.core.types import ConversationItem, Role +import aiosqlite +from valuecell.core.types import ConversationItem, ConversationItemEvent, Role class ItemStore(ABC): @@ -20,6 +20,7 @@ async def get_items( limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, + **kwargs, ) -> List[ConversationItem]: ... @abstractmethod @@ -52,6 +53,7 @@ async def get_items( limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, + **kwargs, ) -> List[ConversationItem]: items = list(self._items.get(conversation_id, [])) if role is not None: @@ -161,6 +163,9 @@ async def get_items( limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, + event: Optional[ConversationItemEvent] = None, + component_type: Optional[str] = None, + **kwargs, ) -> List[ConversationItem]: await self._ensure_initialized() params = [conversation_id] @@ -168,6 +173,14 @@ async def get_items( if role is not None: where += " AND role = ?" params.append(getattr(role, "value", str(role))) + # Add additional optional filters before building the final SQL string + if event is not None: + where += " AND event = ?" + params.append(getattr(event, "value", str(event))) + if component_type is not None: + where += " AND json_extract(payload, '$.component_type') = ?" + params.append(component_type) + sql = f"SELECT * FROM conversation_items {where} ORDER BY datetime(created_at) ASC" if limit is not None: sql += " LIMIT ?" diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py index 101961a71..009922237 100644 --- a/python/valuecell/core/conversation/manager.py +++ b/python/valuecell/core/conversation/manager.py @@ -128,6 +128,8 @@ async def add_item( async def get_conversation_items( self, conversation_id: str, + event: Optional[ConversationItemEvent] = None, + component_type: Optional[str] = None, ) -> List[ConversationItem]: """Get items for a conversation with optional filtering and pagination @@ -137,7 +139,9 @@ async def get_conversation_items( offset: Number of items to skip role: Filter by specific role (optional) """ - return await self.item_store.get_items(conversation_id) + return await self.item_store.get_items( + conversation_id, event=event, component_type=component_type + ) async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]: """Get latest item in a conversation""" diff --git a/python/valuecell/core/conversation/tests/test_conv_manager.py b/python/valuecell/core/conversation/tests/test_conv_manager.py index f9f15da63..19630e596 100644 --- a/python/valuecell/core/conversation/tests/test_conv_manager.py +++ b/python/valuecell/core/conversation/tests/test_conv_manager.py @@ -364,7 +364,9 @@ async def test_get_conversation_items(self): result = await manager.get_conversation_items("conv-123") assert result == items - manager.item_store.get_items.assert_called_once_with("conv-123") + manager.item_store.get_items.assert_called_once_with( + "conv-123", event=None, component_type=None + ) @pytest.mark.asyncio async def test_get_latest_item(self): diff --git a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py index 3bb4d5284..3813d53da 100644 --- a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py +++ b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py @@ -62,3 +62,70 @@ async def test_sqlite_item_store_basic_crud(): finally: if os.path.exists(path): os.remove(path) + + +@pytest.mark.asyncio +async def test_sqlite_item_store_filters_and_pagination(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + try: + store = SQLiteItemStore(path) + + # create several items with varying roles, events and payloads + items = [ + ConversationItem( + item_id="a1", + role=Role.SYSTEM, + event=SystemResponseEvent.THREAD_STARTED, + conversation_id="s2", + thread_id="t1", + task_id=None, + payload='{"a":1}', + ), + ConversationItem( + item_id="a2", + role=Role.AGENT, + event=SystemResponseEvent.DONE, + conversation_id="s2", + thread_id="t1", + task_id=None, + payload='{"component_type":"card","a":2}', + ), + ConversationItem( + item_id="a3", + role=Role.AGENT, + event=SystemResponseEvent.THREAD_STARTED, + conversation_id="s2", + thread_id="t2", + task_id=None, + payload='{"component_type":"chart","a":3}', + ), + ] + + # save in order + for it in items: + await store.save_item(it) + + # filter by role + agent_items = await store.get_items("s2", role=Role.AGENT) + assert {i.item_id for i in agent_items} == {"a2", "a3"} + + # filter by event + thread_started = await store.get_items( + "s2", event=SystemResponseEvent.THREAD_STARTED + ) + assert {i.item_id for i in thread_started} == {"a1", "a3"} + + # filter by component_type (json_extract on payload) + cards = await store.get_items("s2", component_type="card") + assert [i.item_id for i in cards] == ["a2"] + + # limit & offset: get first item only, then skip first + first = await store.get_items("s2", limit=1) + assert len(first) == 1 + second_page = await store.get_items("s2", limit=1, offset=1) + assert len(second_page) == 1 + + finally: + if os.path.exists(path): + os.remove(path) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 7d4594c7b..8b0726cf5 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -4,6 +4,11 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from valuecell.core.agent.connect import RemoteConnections +from valuecell.core.conversation import ( + ConversationManager, + ConversationStatus, + SQLiteItemStore, +) from valuecell.core.coordinate.response import ResponseFactory from valuecell.core.coordinate.response_buffer import ResponseBuffer, SaveItem from valuecell.core.coordinate.response_router import ( @@ -11,14 +16,9 @@ SideEffectKind, handle_status_update, ) -from valuecell.core.conversation import ( - ConversationManager, - ConversationStatus, - SQLiteItemStore, -) from valuecell.core.task import Task, TaskManager from valuecell.core.task.models import TaskPattern -from valuecell.core.types import BaseResponse, UserInput +from valuecell.core.types import BaseResponse, ConversationItemEvent, UserInput from valuecell.utils import resolve_db_path from valuecell.utils.uuid import generate_thread_id @@ -221,10 +221,15 @@ async def close_conversation(self, conversation_id: str): await self._cancel_execution(conversation_id) async def get_conversation_history( - self, conversation_id: str + self, + conversation_id: str, + event: Optional[ConversationItemEvent] = None, + component_type: Optional[str] = None, ) -> list[BaseResponse]: """Get conversation message history""" - items = await self.conversation_manager.get_conversation_items(conversation_id) + items = await self.conversation_manager.get_conversation_items( + conversation_id, event=event, component_type=component_type + ) return [self._response_factory.from_conversation_item(it) for it in items] async def cleanup(self):