Skip to content
Merged
37 changes: 25 additions & 12 deletions python/valuecell/core/conversation/item_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def save_item(self, item: ConversationItem) -> None: ...
@abstractmethod
async def get_items(
self,
conversation_id: str,
conversation_id: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
role: Optional[Role] = None,
Expand Down Expand Up @@ -61,13 +61,19 @@ async def save_item(self, item: ConversationItem) -> None:

async def get_items(
self,
conversation_id: str,
conversation_id: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
role: Optional[Role] = None,
**kwargs,
) -> List[ConversationItem]:
items = list(self._items.get(conversation_id, []))
if conversation_id is not None:
items = list(self._items.get(conversation_id, []))
else:
# Collect all items from all conversations
items = []
for conv_items in self._items.values():
items.extend(conv_items)
if role is not None:
items = [m for m in items if m.role == role]
if offset:
Expand Down Expand Up @@ -126,6 +132,7 @@ async def _ensure_initialized(self) -> None:
thread_id TEXT,
task_id TEXT,
payload TEXT,
agent_name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
Expand All @@ -149,6 +156,7 @@ def _row_to_item(row: sqlite3.Row) -> ConversationItem:
thread_id=row["thread_id"],
task_id=row["task_id"],
payload=row["payload"],
agent_name=row["agent_name"],
)

async def save_item(self, item: ConversationItem) -> None:
Expand All @@ -159,8 +167,8 @@ async def save_item(self, item: ConversationItem) -> None:
await db.execute(
"""
INSERT OR REPLACE INTO conversation_items (
item_id, role, event, conversation_id, thread_id, task_id, payload
) VALUES (?, ?, ?, ?, ?, ?, ?)
item_id, role, event, conversation_id, thread_id, task_id, payload, agent_name
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
item.item_id,
Expand All @@ -170,13 +178,14 @@ async def save_item(self, item: ConversationItem) -> None:
item.thread_id,
item.task_id,
item.payload,
item.agent_name,
),
)
await db.commit()

async def get_items(
self,
conversation_id: str,
conversation_id: Optional[str] = None,
limit: Optional[int] = None,
offset: int = 0,
role: Optional[Role] = None,
Expand All @@ -185,19 +194,23 @@ async def get_items(
**kwargs,
) -> List[ConversationItem]:
await self._ensure_initialized()
params = [conversation_id]
where = "WHERE conversation_id = ?"
params = []
where_clauses = []
if conversation_id is not None:
where_clauses.append("conversation_id = ?")
params.append(conversation_id)
if role is not None:
where += " AND role = ?"
where_clauses.append("role = ?")
params.append(getattr(role, "value", str(role)))
# Add additional optional filters before building the final SQL string
if event is not None:
where += " AND event = ?"
where_clauses.append("event = ?")
params.append(getattr(event, "value", str(event)))
if component_type is not None:
where += " AND json_extract(payload, '$.component_type') = ?"
where_clauses.append("json_extract(payload, '$.component_type') = ?")
params.append(component_type)

where = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""

sql = f"SELECT * FROM conversation_items {where} ORDER BY datetime(created_at) ASC"
if limit is not None:
sql += " LIMIT ?"
Expand Down
12 changes: 7 additions & 5 deletions python/valuecell/core/conversation/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
ResponsePayload,
Role,
)
from valuecell.utils import generate_uuid
from valuecell.utils.uuid import generate_conversation_id, generate_item_id

from .conversation_store import ConversationStore, InMemoryConversationStore
from .item_store import InMemoryItemStore, ItemStore
Expand Down Expand Up @@ -38,7 +38,7 @@ async def create_conversation(
) -> Conversation:
"""Create new conversation"""
conversation = Conversation(
conversation_id=conversation_id or generate_uuid("conversation"),
conversation_id=conversation_id or generate_conversation_id(),
user_id=user_id,
title=title,
)
Expand Down Expand Up @@ -81,6 +81,7 @@ async def add_item(
task_id: Optional[str] = None,
payload: ResponsePayload = None,
item_id: Optional[str] = None,
agent_name: Optional[str] = None,
) -> Optional[ConversationItem]:
"""Add item to conversation

Expand Down Expand Up @@ -112,13 +113,14 @@ async def add_item(
payload_str = None

item = ConversationItem(
item_id=item_id or generate_uuid("item"),
item_id=item_id or generate_item_id(),
role=role,
event=event,
conversation_id=conversation_id,
thread_id=thread_id,
task_id=task_id,
payload=payload_str,
agent_name=agent_name,
)

# Save item directly to item store
Expand All @@ -132,7 +134,7 @@ async def add_item(

async def get_conversation_items(
self,
conversation_id: str,
conversation_id: Optional[str] = None,
event: Optional[ConversationItemEvent] = None,
component_type: Optional[str] = None,
) -> List[ConversationItem]:
Expand All @@ -145,7 +147,7 @@ async def get_conversation_items(
role: Filter by specific role (optional)
"""
return await self.item_store.get_items(
conversation_id, event=event, component_type=component_type
conversation_id=conversation_id, event=event, component_type=component_type
)

async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]:
Expand Down
10 changes: 7 additions & 3 deletions python/valuecell/core/conversation/tests/test_conv_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def test_create_conversation_minimal(self):
manager = ConversationManager()
user_id = "user-123"

with patch("valuecell.core.conversation.manager.generate_uuid") as mock_uuid:
with patch("valuecell.core.conversation.manager.generate_conversation_id") as mock_uuid:
mock_uuid.return_value = "conv-generated-123"

result = await manager.create_conversation(user_id)
Expand Down Expand Up @@ -226,14 +226,15 @@ async def test_add_item_success(self):
manager.item_store.save_item = AsyncMock()
manager.conversation_store.save_conversation = AsyncMock()

with patch("valuecell.core.conversation.manager.generate_uuid") as mock_uuid:
with patch("valuecell.core.conversation.manager.generate_item_id") as mock_uuid:
mock_uuid.return_value = "item-generated-123"

result = await manager.add_item(
role=Role.USER,
event=NotifyResponseEvent.MESSAGE,
conversation_id="conv-123",
payload='{"message": "Hello"}',
agent_name="agent-123",
)

assert result is not None
Expand All @@ -242,12 +243,15 @@ async def test_add_item_success(self):
assert result.event == NotifyResponseEvent.MESSAGE
assert result.conversation_id == "conv-123"
assert result.payload == '{"message": "Hello"}'
assert result.agent_name == "agent-123"

# Verify stores were called
manager.conversation_store.load_conversation.assert_called_once_with(
"conv-123"
)
manager.item_store.save_item.assert_called_once()
saved_item = manager.item_store.save_item.call_args.args[0]
assert saved_item.agent_name == "agent-123"
manager.conversation_store.save_conversation.assert_called_once_with(
conversation
)
Expand Down Expand Up @@ -365,7 +369,7 @@ async def test_get_conversation_items(self):

assert result == items
manager.item_store.get_items.assert_called_once_with(
"conv-123", event=None, component_type=None
conversation_id="conv-123", event=None, component_type=None
)

@pytest.mark.asyncio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,82 @@ async def test_delete_conversation_items_existing(self):
assert count_other == 1

@pytest.mark.asyncio
async def test_delete_conversation_items_nonexistent(self):
"""Test deleting items for a nonexistent conversation."""
async def test_get_items_all_conversations(self):
"""Test getting items from all conversations when conversation_id is None."""
store = InMemoryItemStore()

# Should not raise an error
await store.delete_conversation_items("nonexistent")
# Add items to different conversations
conv1_items = [
ConversationItem(
item_id=f"conv1-{i}",
role=Role.USER,
event="message",
conversation_id="conv-1",
payload=f"Conv1 Message {i}",
)
for i in range(2)
]
conv2_items = [
ConversationItem(
item_id=f"conv2-{i}",
role=Role.AGENT,
event="message",
conversation_id="conv-2",
payload=f"Conv2 Message {i}",
)
for i in range(2)
]

assert "nonexistent" not in store._items
for item in conv1_items + conv2_items:
await store.save_item(item)

# Get all items across conversations
result = await store.get_items(conversation_id=None)

assert len(result) == 4
# Items should be in order of conversations, but since dict iteration order is not guaranteed,
# just check that all items are present
result_ids = {item.item_id for item in result}
expected_ids = {f"conv1-{i}" for i in range(2)} | {
f"conv2-{i}" for i in range(2)
}
assert result_ids == expected_ids

@pytest.mark.asyncio
async def test_get_items_all_conversations_with_role_filter(self):
"""Test getting items from all conversations with role filter."""
store = InMemoryItemStore()

# Add items to different conversations with different roles
user_item = ConversationItem(
item_id="user-conv1",
role=Role.USER,
event="message",
conversation_id="conv-1",
payload="User message",
)
agent_item_conv1 = ConversationItem(
item_id="agent-conv1",
role=Role.AGENT,
event="message",
conversation_id="conv-1",
payload="Agent message conv1",
)
agent_item_conv2 = ConversationItem(
item_id="agent-conv2",
role=Role.AGENT,
event="message",
conversation_id="conv-2",
payload="Agent message conv2",
)

await store.save_item(user_item)
await store.save_item(agent_item_conv1)
await store.save_item(agent_item_conv2)

# Filter all conversations by AGENT role
result = await store.get_items(conversation_id=None, role=Role.AGENT)

assert len(result) == 2
result_ids = {item.item_id for item in result}
assert result_ids == {"agent-conv1", "agent-conv2"}
61 changes: 61 additions & 0 deletions python/valuecell/core/conversation/tests/test_sqlite_item_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,67 @@ async def test_sqlite_item_store_basic_crud():
os.remove(path)


@pytest.mark.asyncio
async def test_sqlite_item_store_get_items_all_conversations():
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
try:
store = SQLiteItemStore(path)

# Create items in different conversations
items = [
ConversationItem(
item_id="c1-i1",
role=Role.USER,
event=SystemResponseEvent.THREAD_STARTED,
conversation_id="conv-1",
thread_id="t1",
task_id=None,
payload='{"msg": "conv1 item1"}',
),
ConversationItem(
item_id="c1-i2",
role=Role.AGENT,
event=SystemResponseEvent.DONE,
conversation_id="conv-1",
thread_id="t1",
task_id=None,
payload='{"msg": "conv1 item2"}',
agent_name="agent-alpha",
),
ConversationItem(
item_id="c2-i1",
role=Role.USER,
event=SystemResponseEvent.THREAD_STARTED,
conversation_id="conv-2",
thread_id="t2",
task_id=None,
payload='{"msg": "conv2 item1"}',
),
]

for item in items:
await store.save_item(item)

# Get all items across conversations
all_items = await store.get_items(conversation_id=None)
assert len(all_items) == 3
item_ids = {item.item_id for item in all_items}
assert item_ids == {"c1-i1", "c1-i2", "c2-i1"}
agent_items = [item for item in all_items if item.role == Role.AGENT]
assert agent_items and agent_items[0].agent_name == "agent-alpha"

# Get all items with role filter
user_items = await store.get_items(conversation_id=None, role=Role.USER)
assert len(user_items) == 2
user_ids = {item.item_id for item in user_items}
assert user_ids == {"c1-i1", "c2-i1"}

finally:
if os.path.exists(path):
os.remove(path)


@pytest.mark.asyncio
async def test_sqlite_item_store_filters_and_pagination():
fd, path = tempfile.mkstemp(suffix=".db")
Expand Down
Loading