Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions python/valuecell/core/conversation/item_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import asyncio
import sqlite3
import aiosqlite
from abc import ABC, abstractmethod
from typing import Dict, List, Optional

from valuecell.core.types import ConversationItem, Role
import aiosqlite
from valuecell.core.types import ConversationItem, ConversationItemEvent, Role


class ItemStore(ABC):
Expand All @@ -20,6 +20,7 @@ async def get_items(
limit: Optional[int] = None,
offset: int = 0,
role: Optional[Role] = None,
**kwargs,
) -> List[ConversationItem]: ...

@abstractmethod
Expand Down Expand Up @@ -52,6 +53,7 @@ async def get_items(
limit: Optional[int] = None,
offset: int = 0,
role: Optional[Role] = None,
**kwargs,
) -> List[ConversationItem]:
items = list(self._items.get(conversation_id, []))
if role is not None:
Expand Down Expand Up @@ -161,13 +163,24 @@ async def get_items(
limit: Optional[int] = None,
offset: int = 0,
role: Optional[Role] = None,
event: Optional[ConversationItemEvent] = None,
component_type: Optional[str] = None,
**kwargs,
) -> List[ConversationItem]:
await self._ensure_initialized()
params = [conversation_id]
where = "WHERE conversation_id = ?"
if role is not None:
where += " AND role = ?"
params.append(getattr(role, "value", str(role)))
# Add additional optional filters before building the final SQL string
if event is not None:
where += " AND event = ?"
params.append(getattr(event, "value", str(event)))
if component_type is not None:
where += " AND json_extract(payload, '$.component_type') = ?"
params.append(component_type)

sql = f"SELECT * FROM conversation_items {where} ORDER BY datetime(created_at) ASC"
if limit is not None:
sql += " LIMIT ?"
Expand Down
6 changes: 5 additions & 1 deletion python/valuecell/core/conversation/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ async def add_item(
async def get_conversation_items(
self,
conversation_id: str,
event: Optional[ConversationItemEvent] = None,
component_type: Optional[str] = None,
) -> List[ConversationItem]:
"""Get items for a conversation with optional filtering and pagination

Expand All @@ -137,7 +139,9 @@ async def get_conversation_items(
offset: Number of items to skip
role: Filter by specific role (optional)
"""
return await self.item_store.get_items(conversation_id)
return await self.item_store.get_items(
conversation_id, event=event, component_type=component_type
)

async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]:
"""Get latest item in a conversation"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ async def test_get_conversation_items(self):
result = await manager.get_conversation_items("conv-123")

assert result == items
manager.item_store.get_items.assert_called_once_with("conv-123")
manager.item_store.get_items.assert_called_once_with(
"conv-123", event=None, component_type=None
)

@pytest.mark.asyncio
async def test_get_latest_item(self):
Expand Down
67 changes: 67 additions & 0 deletions python/valuecell/core/conversation/tests/test_sqlite_item_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,70 @@ async def test_sqlite_item_store_basic_crud():
finally:
if os.path.exists(path):
os.remove(path)


@pytest.mark.asyncio
async def test_sqlite_item_store_filters_and_pagination():
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
try:
store = SQLiteItemStore(path)

# create several items with varying roles, events and payloads
items = [
ConversationItem(
item_id="a1",
role=Role.SYSTEM,
event=SystemResponseEvent.THREAD_STARTED,
conversation_id="s2",
thread_id="t1",
task_id=None,
payload='{"a":1}',
),
ConversationItem(
item_id="a2",
role=Role.AGENT,
event=SystemResponseEvent.DONE,
conversation_id="s2",
thread_id="t1",
task_id=None,
payload='{"component_type":"card","a":2}',
),
ConversationItem(
item_id="a3",
role=Role.AGENT,
event=SystemResponseEvent.THREAD_STARTED,
conversation_id="s2",
thread_id="t2",
task_id=None,
payload='{"component_type":"chart","a":3}',
),
]

# save in order
for it in items:
await store.save_item(it)

# filter by role
agent_items = await store.get_items("s2", role=Role.AGENT)
assert {i.item_id for i in agent_items} == {"a2", "a3"}

# filter by event
thread_started = await store.get_items(
"s2", event=SystemResponseEvent.THREAD_STARTED
)
assert {i.item_id for i in thread_started} == {"a1", "a3"}

# filter by component_type (json_extract on payload)
cards = await store.get_items("s2", component_type="card")
assert [i.item_id for i in cards] == ["a2"]

# limit & offset: get first item only, then skip first
first = await store.get_items("s2", limit=1)
assert len(first) == 1
second_page = await store.get_items("s2", limit=1, offset=1)
assert len(second_page) == 1

finally:
if os.path.exists(path):
os.remove(path)
21 changes: 13 additions & 8 deletions python/valuecell/core/coordinate/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@

from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent
from valuecell.core.agent.connect import RemoteConnections
from valuecell.core.conversation import (
ConversationManager,
ConversationStatus,
SQLiteItemStore,
)
from valuecell.core.coordinate.response import ResponseFactory
from valuecell.core.coordinate.response_buffer import ResponseBuffer, SaveItem
from valuecell.core.coordinate.response_router import (
RouteResult,
SideEffectKind,
handle_status_update,
)
from valuecell.core.conversation import (
ConversationManager,
ConversationStatus,
SQLiteItemStore,
)
from valuecell.core.task import Task, TaskManager
from valuecell.core.task.models import TaskPattern
from valuecell.core.types import BaseResponse, UserInput
from valuecell.core.types import BaseResponse, ConversationItemEvent, UserInput
from valuecell.utils import resolve_db_path
from valuecell.utils.uuid import generate_thread_id

Expand Down Expand Up @@ -221,10 +221,15 @@ async def close_conversation(self, conversation_id: str):
await self._cancel_execution(conversation_id)

async def get_conversation_history(
self, conversation_id: str
self,
conversation_id: str,
event: Optional[ConversationItemEvent] = None,
component_type: Optional[str] = None,
) -> list[BaseResponse]:
"""Get conversation message history"""
items = await self.conversation_manager.get_conversation_items(conversation_id)
items = await self.conversation_manager.get_conversation_items(
conversation_id, event=event, component_type=component_type
)
return [self._response_factory.from_conversation_item(it) for it in items]

async def cleanup(self):
Expand Down