From 3e03fef53c4dc5ea28bece75c4bbdc3317a13ba4 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Thu, 25 Sep 2025 18:02:38 +0800 Subject: [PATCH 1/4] feat: enhance session message retrieval with optional filtering by event and component type --- .../valuecell/core/conversation/item_store.py | 15 +++++++++++-- python/valuecell/core/conversation/manager.py | 6 +++++- .../valuecell/core/coordinate/orchestrator.py | 21 ++++++++++++------- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/python/valuecell/core/conversation/item_store.py b/python/valuecell/core/conversation/item_store.py index 6510b5d8d..7c99503ea 100644 --- a/python/valuecell/core/conversation/item_store.py +++ b/python/valuecell/core/conversation/item_store.py @@ -2,11 +2,11 @@ import asyncio import sqlite3 -import aiosqlite from abc import ABC, abstractmethod from typing import Dict, List, Optional -from valuecell.core.types import ConversationItem, Role +import aiosqlite +from valuecell.core.types import ConversationItem, ConversationItemEvent, Role class ItemStore(ABC): @@ -20,6 +20,7 @@ async def get_items( limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, + **kwargs, ) -> List[ConversationItem]: ... @abstractmethod @@ -52,6 +53,7 @@ async def get_items( limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, + **kwargs, ) -> List[ConversationItem]: items = list(self._items.get(conversation_id, [])) if role is not None: @@ -161,6 +163,9 @@ async def get_items( limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, + event: Optional[ConversationItemEvent] = None, + component_type: Optional[str] = None, + **kwargs, ) -> List[ConversationItem]: await self._ensure_initialized() params = [conversation_id] @@ -177,6 +182,12 @@ async def get_items( sql += " LIMIT -1" sql += " OFFSET ?" params.append(int(offset)) + if event is not None: + where += " AND event = ?" + params.append(getattr(event, "value", str(event))) + if component_type is not None: + where += " AND json_extract(payload, '$.component_type') = ?" + params.append(component_type) async with aiosqlite.connect(self.db_path) as db: db.row_factory = sqlite3.Row cur = await db.execute(sql, params) diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py index 101961a71..009922237 100644 --- a/python/valuecell/core/conversation/manager.py +++ b/python/valuecell/core/conversation/manager.py @@ -128,6 +128,8 @@ async def add_item( async def get_conversation_items( self, conversation_id: str, + event: Optional[ConversationItemEvent] = None, + component_type: Optional[str] = None, ) -> List[ConversationItem]: """Get items for a conversation with optional filtering and pagination @@ -137,7 +139,9 @@ async def get_conversation_items( offset: Number of items to skip role: Filter by specific role (optional) """ - return await self.item_store.get_items(conversation_id) + return await self.item_store.get_items( + conversation_id, event=event, component_type=component_type + ) async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]: """Get latest item in a conversation""" diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 7d4594c7b..8b0726cf5 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -4,6 +4,11 @@ from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent from valuecell.core.agent.connect import RemoteConnections +from valuecell.core.conversation import ( + ConversationManager, + ConversationStatus, + SQLiteItemStore, +) from valuecell.core.coordinate.response import ResponseFactory from valuecell.core.coordinate.response_buffer import ResponseBuffer, SaveItem from valuecell.core.coordinate.response_router import ( @@ -11,14 +16,9 @@ SideEffectKind, handle_status_update, ) -from valuecell.core.conversation import ( - ConversationManager, - ConversationStatus, - SQLiteItemStore, -) from valuecell.core.task import Task, TaskManager from valuecell.core.task.models import TaskPattern -from valuecell.core.types import BaseResponse, UserInput +from valuecell.core.types import BaseResponse, ConversationItemEvent, UserInput from valuecell.utils import resolve_db_path from valuecell.utils.uuid import generate_thread_id @@ -221,10 +221,15 @@ async def close_conversation(self, conversation_id: str): await self._cancel_execution(conversation_id) async def get_conversation_history( - self, conversation_id: str + self, + conversation_id: str, + event: Optional[ConversationItemEvent] = None, + component_type: Optional[str] = None, ) -> list[BaseResponse]: """Get conversation message history""" - items = await self.conversation_manager.get_conversation_items(conversation_id) + items = await self.conversation_manager.get_conversation_items( + conversation_id, event=event, component_type=component_type + ) return [self._response_factory.from_conversation_item(it) for it in items] async def cleanup(self): From 3b43454ba229a90734967f1ff27554502dee15ce Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Thu, 25 Sep 2025 18:04:56 +0800 Subject: [PATCH 2/4] fix tests --- python/valuecell/core/conversation/tests/test_conv_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/valuecell/core/conversation/tests/test_conv_manager.py b/python/valuecell/core/conversation/tests/test_conv_manager.py index f9f15da63..fd6c0f2bd 100644 --- a/python/valuecell/core/conversation/tests/test_conv_manager.py +++ b/python/valuecell/core/conversation/tests/test_conv_manager.py @@ -364,7 +364,7 @@ async def test_get_conversation_items(self): result = await manager.get_conversation_items("conv-123") assert result == items - manager.item_store.get_items.assert_called_once_with("conv-123") + manager.item_store.get_items.assert_called_once_with("conv-123", event=None, component_type=None) @pytest.mark.asyncio async def test_get_latest_item(self): From 0b42e1fa1fe542aa8b6e00e10cd1eae5ee34de6a Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Thu, 25 Sep 2025 18:05:31 +0800 Subject: [PATCH 3/4] fix format --- python/valuecell/core/conversation/tests/test_conv_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/conversation/tests/test_conv_manager.py b/python/valuecell/core/conversation/tests/test_conv_manager.py index fd6c0f2bd..19630e596 100644 --- a/python/valuecell/core/conversation/tests/test_conv_manager.py +++ b/python/valuecell/core/conversation/tests/test_conv_manager.py @@ -364,7 +364,9 @@ async def test_get_conversation_items(self): result = await manager.get_conversation_items("conv-123") assert result == items - manager.item_store.get_items.assert_called_once_with("conv-123", event=None, component_type=None) + manager.item_store.get_items.assert_called_once_with( + "conv-123", event=None, component_type=None + ) @pytest.mark.asyncio async def test_get_latest_item(self): From 57b1774c04417eb778a45bbeae78e0e1213235c9 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Thu, 25 Sep 2025 18:13:46 +0800 Subject: [PATCH 4/4] feat: add optional filters for event and component type in SQLiteItemStore --- .../valuecell/core/conversation/item_store.py | 14 ++-- .../tests/test_sqlite_item_store.py | 67 +++++++++++++++++++ 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/python/valuecell/core/conversation/item_store.py b/python/valuecell/core/conversation/item_store.py index 7c99503ea..da68d4fcd 100644 --- a/python/valuecell/core/conversation/item_store.py +++ b/python/valuecell/core/conversation/item_store.py @@ -173,6 +173,14 @@ async def get_items( if role is not None: where += " AND role = ?" params.append(getattr(role, "value", str(role))) + # Add additional optional filters before building the final SQL string + if event is not None: + where += " AND event = ?" + params.append(getattr(event, "value", str(event))) + if component_type is not None: + where += " AND json_extract(payload, '$.component_type') = ?" + params.append(component_type) + sql = f"SELECT * FROM conversation_items {where} ORDER BY datetime(created_at) ASC" if limit is not None: sql += " LIMIT ?" @@ -182,12 +190,6 @@ async def get_items( sql += " LIMIT -1" sql += " OFFSET ?" params.append(int(offset)) - if event is not None: - where += " AND event = ?" - params.append(getattr(event, "value", str(event))) - if component_type is not None: - where += " AND json_extract(payload, '$.component_type') = ?" - params.append(component_type) async with aiosqlite.connect(self.db_path) as db: db.row_factory = sqlite3.Row cur = await db.execute(sql, params) diff --git a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py index 3bb4d5284..3813d53da 100644 --- a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py +++ b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py @@ -62,3 +62,70 @@ async def test_sqlite_item_store_basic_crud(): finally: if os.path.exists(path): os.remove(path) + + +@pytest.mark.asyncio +async def test_sqlite_item_store_filters_and_pagination(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + try: + store = SQLiteItemStore(path) + + # create several items with varying roles, events and payloads + items = [ + ConversationItem( + item_id="a1", + role=Role.SYSTEM, + event=SystemResponseEvent.THREAD_STARTED, + conversation_id="s2", + thread_id="t1", + task_id=None, + payload='{"a":1}', + ), + ConversationItem( + item_id="a2", + role=Role.AGENT, + event=SystemResponseEvent.DONE, + conversation_id="s2", + thread_id="t1", + task_id=None, + payload='{"component_type":"card","a":2}', + ), + ConversationItem( + item_id="a3", + role=Role.AGENT, + event=SystemResponseEvent.THREAD_STARTED, + conversation_id="s2", + thread_id="t2", + task_id=None, + payload='{"component_type":"chart","a":3}', + ), + ] + + # save in order + for it in items: + await store.save_item(it) + + # filter by role + agent_items = await store.get_items("s2", role=Role.AGENT) + assert {i.item_id for i in agent_items} == {"a2", "a3"} + + # filter by event + thread_started = await store.get_items( + "s2", event=SystemResponseEvent.THREAD_STARTED + ) + assert {i.item_id for i in thread_started} == {"a1", "a3"} + + # filter by component_type (json_extract on payload) + cards = await store.get_items("s2", component_type="card") + assert [i.item_id for i in cards] == ["a2"] + + # limit & offset: get first item only, then skip first + first = await store.get_items("s2", limit=1) + assert len(first) == 1 + second_page = await store.get_items("s2", limit=1, offset=1) + assert len(second_page) == 1 + + finally: + if os.path.exists(path): + os.remove(path)