From fa06c055a1ca578e20f6799953af8d197cdd0707 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 09:58:14 +0800 Subject: [PATCH 1/8] refactor: update conversation_id parameter to be optional in item stores and manager --- .../valuecell/core/conversation/item_store.py | 30 ++++--- python/valuecell/core/conversation/manager.py | 4 +- .../conversation/tests/test_conv_manager.py | 2 +- .../tests/test_in_memory_item_store.py | 79 +++++++++++++++++-- .../tests/test_sqlite_item_store.py | 58 ++++++++++++++ .../valuecell/core/coordinate/orchestrator.py | 4 +- 6 files changed, 157 insertions(+), 20 deletions(-) diff --git a/python/valuecell/core/conversation/item_store.py b/python/valuecell/core/conversation/item_store.py index b20d1e98f..c54aafa4a 100644 --- a/python/valuecell/core/conversation/item_store.py +++ b/python/valuecell/core/conversation/item_store.py @@ -23,7 +23,7 @@ async def save_item(self, item: ConversationItem) -> None: ... @abstractmethod async def get_items( self, - conversation_id: str, + conversation_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, @@ -61,13 +61,19 @@ async def save_item(self, item: ConversationItem) -> None: async def get_items( self, - conversation_id: str, + conversation_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, **kwargs, ) -> List[ConversationItem]: - items = list(self._items.get(conversation_id, [])) + if conversation_id is not None: + items = list(self._items.get(conversation_id, [])) + else: + # Collect all items from all conversations + items = [] + for conv_items in self._items.values(): + items.extend(conv_items) if role is not None: items = [m for m in items if m.role == role] if offset: @@ -176,7 +182,7 @@ async def save_item(self, item: ConversationItem) -> None: async def get_items( self, - conversation_id: str, + conversation_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0, role: Optional[Role] = None, @@ -185,19 +191,23 @@ async def get_items( **kwargs, ) -> List[ConversationItem]: await self._ensure_initialized() - params = [conversation_id] - where = "WHERE conversation_id = ?" + params = [] + where_clauses = [] + if conversation_id is not None: + where_clauses.append("conversation_id = ?") + params.append(conversation_id) if role is not None: - where += " AND role = ?" + where_clauses.append("role = ?") params.append(getattr(role, "value", str(role))) - # Add additional optional filters before building the final SQL string if event is not None: - where += " AND event = ?" + where_clauses.append("event = ?") params.append(getattr(event, "value", str(event))) if component_type is not None: - where += " AND json_extract(payload, '$.component_type') = ?" + where_clauses.append("json_extract(payload, '$.component_type') = ?") params.append(component_type) + where = "WHERE " + " AND ".join(where_clauses) if where_clauses else "" + sql = f"SELECT * FROM conversation_items {where} ORDER BY datetime(created_at) ASC" if limit is not None: sql += " LIMIT ?" diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py index 309a28833..b77091923 100644 --- a/python/valuecell/core/conversation/manager.py +++ b/python/valuecell/core/conversation/manager.py @@ -132,7 +132,7 @@ async def add_item( async def get_conversation_items( self, - conversation_id: str, + conversation_id: Optional[str] = None, event: Optional[ConversationItemEvent] = None, component_type: Optional[str] = None, ) -> List[ConversationItem]: @@ -145,7 +145,7 @@ async def get_conversation_items( role: Filter by specific role (optional) """ return await self.item_store.get_items( - conversation_id, event=event, component_type=component_type + conversation_id=conversation_id, event=event, component_type=component_type ) async def get_latest_item(self, conversation_id: str) -> Optional[ConversationItem]: diff --git a/python/valuecell/core/conversation/tests/test_conv_manager.py b/python/valuecell/core/conversation/tests/test_conv_manager.py index 19630e596..24e0fdf7c 100644 --- a/python/valuecell/core/conversation/tests/test_conv_manager.py +++ b/python/valuecell/core/conversation/tests/test_conv_manager.py @@ -365,7 +365,7 @@ async def test_get_conversation_items(self): assert result == items manager.item_store.get_items.assert_called_once_with( - "conv-123", event=None, component_type=None + conversation_id="conv-123", event=None, component_type=None ) @pytest.mark.asyncio diff --git a/python/valuecell/core/conversation/tests/test_in_memory_item_store.py b/python/valuecell/core/conversation/tests/test_in_memory_item_store.py index 8f7247f41..1fac2045a 100644 --- a/python/valuecell/core/conversation/tests/test_in_memory_item_store.py +++ b/python/valuecell/core/conversation/tests/test_in_memory_item_store.py @@ -353,11 +353,80 @@ async def test_delete_conversation_items_existing(self): assert count_other == 1 @pytest.mark.asyncio - async def test_delete_conversation_items_nonexistent(self): - """Test deleting items for a nonexistent conversation.""" + async def test_get_items_all_conversations(self): + """Test getting items from all conversations when conversation_id is None.""" store = InMemoryItemStore() - # Should not raise an error - await store.delete_conversation_items("nonexistent") + # Add items to different conversations + conv1_items = [ + ConversationItem( + item_id=f"conv1-{i}", + role=Role.USER, + event="message", + conversation_id="conv-1", + payload=f"Conv1 Message {i}", + ) + for i in range(2) + ] + conv2_items = [ + ConversationItem( + item_id=f"conv2-{i}", + role=Role.AGENT, + event="message", + conversation_id="conv-2", + payload=f"Conv2 Message {i}", + ) + for i in range(2) + ] - assert "nonexistent" not in store._items + for item in conv1_items + conv2_items: + await store.save_item(item) + + # Get all items across conversations + result = await store.get_items(conversation_id=None) + + assert len(result) == 4 + # Items should be in order of conversations, but since dict iteration order is not guaranteed, + # just check that all items are present + result_ids = {item.item_id for item in result} + expected_ids = {f"conv1-{i}" for i in range(2)} | {f"conv2-{i}" for i in range(2)} + assert result_ids == expected_ids + + @pytest.mark.asyncio + async def test_get_items_all_conversations_with_role_filter(self): + """Test getting items from all conversations with role filter.""" + store = InMemoryItemStore() + + # Add items to different conversations with different roles + user_item = ConversationItem( + item_id="user-conv1", + role=Role.USER, + event="message", + conversation_id="conv-1", + payload="User message", + ) + agent_item_conv1 = ConversationItem( + item_id="agent-conv1", + role=Role.AGENT, + event="message", + conversation_id="conv-1", + payload="Agent message conv1", + ) + agent_item_conv2 = ConversationItem( + item_id="agent-conv2", + role=Role.AGENT, + event="message", + conversation_id="conv-2", + payload="Agent message conv2", + ) + + await store.save_item(user_item) + await store.save_item(agent_item_conv1) + await store.save_item(agent_item_conv2) + + # Filter all conversations by AGENT role + result = await store.get_items(conversation_id=None, role=Role.AGENT) + + assert len(result) == 2 + result_ids = {item.item_id for item in result} + assert result_ids == {"agent-conv1", "agent-conv2"} diff --git a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py index 3813d53da..68c529b9f 100644 --- a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py +++ b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py @@ -64,6 +64,64 @@ async def test_sqlite_item_store_basic_crud(): os.remove(path) +@pytest.mark.asyncio +async def test_sqlite_item_store_get_items_all_conversations(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + try: + store = SQLiteItemStore(path) + + # Create items in different conversations + items = [ + ConversationItem( + item_id="c1-i1", + role=Role.USER, + event=SystemResponseEvent.THREAD_STARTED, + conversation_id="conv-1", + thread_id="t1", + task_id=None, + payload='{"msg": "conv1 item1"}', + ), + ConversationItem( + item_id="c1-i2", + role=Role.AGENT, + event=SystemResponseEvent.DONE, + conversation_id="conv-1", + thread_id="t1", + task_id=None, + payload='{"msg": "conv1 item2"}', + ), + ConversationItem( + item_id="c2-i1", + role=Role.USER, + event=SystemResponseEvent.THREAD_STARTED, + conversation_id="conv-2", + thread_id="t2", + task_id=None, + payload='{"msg": "conv2 item1"}', + ), + ] + + for item in items: + await store.save_item(item) + + # Get all items across conversations + all_items = await store.get_items(conversation_id=None) + assert len(all_items) == 3 + item_ids = {item.item_id for item in all_items} + assert item_ids == {"c1-i1", "c1-i2", "c2-i1"} + + # Get all items with role filter + user_items = await store.get_items(conversation_id=None, role=Role.USER) + assert len(user_items) == 2 + user_ids = {item.item_id for item in user_items} + assert user_ids == {"c1-i1", "c2-i1"} + + finally: + if os.path.exists(path): + os.remove(path) + + @pytest.mark.asyncio async def test_sqlite_item_store_filters_and_pagination(): fd, path = tempfile.mkstemp(suffix=".db") diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 291ba8d34..c4c256a8a 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -279,7 +279,7 @@ async def close_conversation(self, conversation_id: str): async def get_conversation_history( self, - conversation_id: str, + conversation_id: Optional[str] = None, event: Optional[ConversationItemEvent] = None, component_type: Optional[str] = None, ) -> list[BaseResponse]: @@ -295,7 +295,7 @@ async def get_conversation_history( ConversationItems. """ items = await self.conversation_manager.get_conversation_items( - conversation_id, event=event, component_type=component_type + conversation_id=conversation_id, event=event, component_type=component_type ) return [self._response_factory.from_conversation_item(it) for it in items] From f5d130278a8ba1b98c4d4cdc1c4007c1c107bb78 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 10:31:50 +0800 Subject: [PATCH 2/8] feat: add agent_name field to conversation items and related components --- .../valuecell/core/conversation/item_store.py | 7 +++++-- python/valuecell/core/conversation/manager.py | 2 ++ .../conversation/tests/test_conv_manager.py | 4 ++++ .../tests/test_in_memory_item_store.py | 4 +++- .../tests/test_sqlite_item_store.py | 3 +++ .../valuecell/core/coordinate/orchestrator.py | 10 ++++++++++ python/valuecell/core/coordinate/response.py | 15 ++++++++++++++ .../core/coordinate/response_buffer.py | 20 ++++++++++++++++--- .../core/coordinate/response_router.py | 5 +++++ .../coordinate/tests/test_response_buffer.py | 8 +++++++- .../coordinate/tests/test_response_factory.py | 9 ++++++++- .../coordinate/tests/test_response_router.py | 7 +++++++ python/valuecell/core/types.py | 6 ++++++ 13 files changed, 92 insertions(+), 8 deletions(-) diff --git a/python/valuecell/core/conversation/item_store.py b/python/valuecell/core/conversation/item_store.py index c54aafa4a..234a3626a 100644 --- a/python/valuecell/core/conversation/item_store.py +++ b/python/valuecell/core/conversation/item_store.py @@ -132,6 +132,7 @@ async def _ensure_initialized(self) -> None: thread_id TEXT, task_id TEXT, payload TEXT, + agent_name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ @@ -155,6 +156,7 @@ def _row_to_item(row: sqlite3.Row) -> ConversationItem: thread_id=row["thread_id"], task_id=row["task_id"], payload=row["payload"], + agent_name=row["agent_name"], ) async def save_item(self, item: ConversationItem) -> None: @@ -165,8 +167,8 @@ async def save_item(self, item: ConversationItem) -> None: await db.execute( """ INSERT OR REPLACE INTO conversation_items ( - item_id, role, event, conversation_id, thread_id, task_id, payload - ) VALUES (?, ?, ?, ?, ?, ?, ?) + item_id, role, event, conversation_id, thread_id, task_id, payload, agent_name + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( item.item_id, @@ -176,6 +178,7 @@ async def save_item(self, item: ConversationItem) -> None: item.thread_id, item.task_id, item.payload, + item.agent_name, ), ) await db.commit() diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py index b77091923..13b3dda86 100644 --- a/python/valuecell/core/conversation/manager.py +++ b/python/valuecell/core/conversation/manager.py @@ -81,6 +81,7 @@ async def add_item( task_id: Optional[str] = None, payload: ResponsePayload = None, item_id: Optional[str] = None, + agent_name: Optional[str] = None, ) -> Optional[ConversationItem]: """Add item to conversation @@ -119,6 +120,7 @@ async def add_item( thread_id=thread_id, task_id=task_id, payload=payload_str, + agent_name=agent_name, ) # Save item directly to item store diff --git a/python/valuecell/core/conversation/tests/test_conv_manager.py b/python/valuecell/core/conversation/tests/test_conv_manager.py index 24e0fdf7c..0b6046d7d 100644 --- a/python/valuecell/core/conversation/tests/test_conv_manager.py +++ b/python/valuecell/core/conversation/tests/test_conv_manager.py @@ -234,6 +234,7 @@ async def test_add_item_success(self): event=NotifyResponseEvent.MESSAGE, conversation_id="conv-123", payload='{"message": "Hello"}', + agent_name="agent-123", ) assert result is not None @@ -242,12 +243,15 @@ async def test_add_item_success(self): assert result.event == NotifyResponseEvent.MESSAGE assert result.conversation_id == "conv-123" assert result.payload == '{"message": "Hello"}' + assert result.agent_name == "agent-123" # Verify stores were called manager.conversation_store.load_conversation.assert_called_once_with( "conv-123" ) manager.item_store.save_item.assert_called_once() + saved_item = manager.item_store.save_item.call_args.args[0] + assert saved_item.agent_name == "agent-123" manager.conversation_store.save_conversation.assert_called_once_with( conversation ) diff --git a/python/valuecell/core/conversation/tests/test_in_memory_item_store.py b/python/valuecell/core/conversation/tests/test_in_memory_item_store.py index 1fac2045a..1950ee1e4 100644 --- a/python/valuecell/core/conversation/tests/test_in_memory_item_store.py +++ b/python/valuecell/core/conversation/tests/test_in_memory_item_store.py @@ -389,7 +389,9 @@ async def test_get_items_all_conversations(self): # Items should be in order of conversations, but since dict iteration order is not guaranteed, # just check that all items are present result_ids = {item.item_id for item in result} - expected_ids = {f"conv1-{i}" for i in range(2)} | {f"conv2-{i}" for i in range(2)} + expected_ids = {f"conv1-{i}" for i in range(2)} | { + f"conv2-{i}" for i in range(2) + } assert result_ids == expected_ids @pytest.mark.asyncio diff --git a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py index 68c529b9f..65cbcfc3f 100644 --- a/python/valuecell/core/conversation/tests/test_sqlite_item_store.py +++ b/python/valuecell/core/conversation/tests/test_sqlite_item_store.py @@ -90,6 +90,7 @@ async def test_sqlite_item_store_get_items_all_conversations(): thread_id="t1", task_id=None, payload='{"msg": "conv1 item2"}', + agent_name="agent-alpha", ), ConversationItem( item_id="c2-i1", @@ -110,6 +111,8 @@ async def test_sqlite_item_store_get_items_all_conversations(): assert len(all_items) == 3 item_ids = {item.item_id for item in all_items} assert item_ids == {"c1-i1", "c1-i2", "c2-i1"} + agent_items = [item for item in all_items if item.role == Role.AGENT] + assert agent_items and agent_items[0].agent_name == "agent-alpha" # Get all items with role filter user_items = await store.get_items(conversation_id=None, role=Role.USER) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index c4c256a8a..25741b677 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -413,6 +413,7 @@ async def _handle_new_request( thread_id, task_id=generate_task_id(), content=super_outcome.answer_content, + agent_name=self.super_agent.name, ) await self._persist_from_buffer(ans) yield ans @@ -648,6 +649,9 @@ async def _execute_plan_with_input_support( for task in plan.tasks: subagent_conversation_item_id = generate_item_id() + await self.conversation_manager.create_conversation( + plan.user_id, conversation_id=task.conversation_id + ) if task.handoff_from_super_agent: yield self._response_factory.component_generator( conversation_id=conversation_id, @@ -662,6 +666,7 @@ async def _execute_plan_with_input_support( ), component_type="subagent_conversation", item_id=subagent_conversation_item_id, + agent_name=task.agent_name, ) try: # Register the task with TaskManager (persist in-memory) @@ -687,6 +692,7 @@ async def _execute_plan_with_input_support( thread_id, task.task_id, error_msg, + agent_name=task.agent_name, ) finally: if task.handoff_from_super_agent: @@ -703,6 +709,7 @@ async def _execute_plan_with_input_support( ), component_type="subagent_conversation", item_id=subagent_conversation_item_id, + agent_name=task.agent_name, ) async def _execute_task_with_input_support( @@ -764,6 +771,7 @@ async def _execute_task_with_input_support( conversation_id=conversation_id, thread_id=thread_id, task_id=task_id, + agent_name=task.agent_name, ) continue @@ -794,6 +802,7 @@ async def _execute_task_with_input_support( conversation_id=conversation_id, thread_id=thread_id, task_id=task_id, + agent_name=task.agent_name, ) # Finalize buffered aggregates for this task (explicit flush at task end) items = self._response_buffer.flush_task( @@ -830,4 +839,5 @@ async def _persist_items(self, items: list[SaveItem]): task_id=it.task_id, payload=it.payload, item_id=it.item_id, + agent_name=it.agent_name, ) diff --git a/python/valuecell/core/coordinate/response.py b/python/valuecell/core/coordinate/response.py index 178a1373b..6037890f2 100644 --- a/python/valuecell/core/coordinate/response.py +++ b/python/valuecell/core/coordinate/response.py @@ -96,6 +96,7 @@ def make_data(payload=None): payload=payload, role=role, item_id=item.item_id, + agent_name=item.agent_name, ) # ----- System-level events ----- @@ -269,6 +270,7 @@ def task_failed( thread_id: str, task_id: str, content: str, + agent_name: Optional[str] = None, ) -> TaskFailedResponse: """Create a TaskFailedResponse for a failed task execution. @@ -288,6 +290,7 @@ def task_failed( task_id=task_id, payload=BaseResponseDataPayload(content=content), role=Role.AGENT, + agent_name=agent_name, ) ) @@ -296,6 +299,7 @@ def task_started( conversation_id: str, thread_id: str, task_id: str, + agent_name: Optional[str] = None, ) -> TaskStartedResponse: """Return a TaskStartedResponse indicating a task has begun execution. @@ -313,6 +317,7 @@ def task_started( thread_id=thread_id, task_id=task_id, role=Role.AGENT, + agent_name=agent_name, ), ) @@ -321,6 +326,7 @@ def task_completed( conversation_id: str, thread_id: str, task_id: str, + agent_name: Optional[str] = None, ) -> TaskCompletedResponse: """Create a TaskCompletedResponse signalling successful completion. @@ -338,6 +344,7 @@ def task_completed( thread_id=thread_id, task_id=task_id, role=Role.AGENT, + agent_name=agent_name, ), ) @@ -353,6 +360,7 @@ def tool_call( tool_call_id: str, tool_name: str, tool_result: Optional[str] = None, + agent_name: Optional[str] = None, ) -> ToolCallResponse: """Build a ToolCallResponse representing a tool invocation/result. @@ -380,6 +388,7 @@ def tool_call( tool_result=tool_result, ), role=Role.AGENT, + agent_name=agent_name, item_id=tool_call_id, ), ) @@ -392,6 +401,7 @@ def message_response_general( task_id: str, content: str, item_id: Optional[str] = None, + agent_name: Optional[str] = None, ) -> MessageResponse: """Create a generic message response used for both stream and notify. @@ -418,6 +428,7 @@ def message_response_general( ), role=Role.AGENT, item_id=item_id or generate_item_id(), + agent_name=agent_name, ), ) @@ -432,6 +443,7 @@ def reasoning( StreamResponseEvent.REASONING_COMPLETED, ], content: Optional[str] = None, + agent_name: Optional[str] = None, ) -> ReasoningResponse: """Build a reasoning response used to convey model chain-of-thought. @@ -453,6 +465,7 @@ def reasoning( task_id=task_id, payload=(BaseResponseDataPayload(content=content) if content else None), role=Role.AGENT, + agent_name=agent_name, ), ) @@ -464,6 +477,7 @@ def component_generator( content: str, component_type: str, item_id: Optional[str] = None, + agent_name: Optional[str] = None, ) -> ComponentGeneratorResponse: """Create a ComponentGeneratorResponse for UI component generation. @@ -489,5 +503,6 @@ def component_generator( ), role=Role.AGENT, item_id=item_id or generate_item_id(), + agent_name=agent_name, ), ) diff --git a/python/valuecell/core/coordinate/response_buffer.py b/python/valuecell/core/coordinate/response_buffer.py index e6930ff45..51749929e 100644 --- a/python/valuecell/core/coordinate/response_buffer.py +++ b/python/valuecell/core/coordinate/response_buffer.py @@ -26,6 +26,7 @@ class SaveItem: task_id: Optional[str] payload: Optional[BaseModel] role: Role = Role.AGENT + agent_name: Optional[str] = None # conversation_id, thread_id, task_id, event @@ -40,7 +41,12 @@ class BufferEntry: be correlated with the final persisted ConversationItem. """ - def __init__(self, item_id: Optional[str] = None, role: Optional[Role] = None): + def __init__( + self, + item_id: Optional[str] = None, + role: Optional[Role] = None, + agent_name: Optional[str] = None, + ): self.parts: List[str] = [] self.last_updated: float = time.monotonic() # Stable paragraph id for this buffer entry. Reused across streamed chunks @@ -48,6 +54,7 @@ def __init__(self, item_id: Optional[str] = None, role: Optional[Role] = None): # we rotate to a new paragraph id for subsequent chunks. self.item_id: str = item_id or generate_item_id() self.role: Optional[Role] = role + self.agent_name: Optional[str] = agent_name def append(self, text: str): """Append a chunk of text to this buffer and update the timestamp.""" @@ -114,8 +121,10 @@ def annotate(self, resp: BaseResponse) -> BaseResponse: entry = self._buffers.get(key) if not entry: # Start a new paragraph buffer with a fresh paragraph item_id - entry = BufferEntry(role=data.role) + entry = BufferEntry(role=data.role, agent_name=data.agent_name) self._buffers[key] = entry + if entry.agent_name is None and data.agent_name: + entry.agent_name = data.agent_name # Stamp the response with the stable paragraph id data.item_id = entry.item_id resp.data = data @@ -158,8 +167,10 @@ def ingest(self, resp: BaseResponse) -> List[SaveItem]: entry = self._buffers.get(key) if not entry: # If annotate() wasn't called, create an entry now. - entry = BufferEntry(role=data.role) + entry = BufferEntry(role=data.role, agent_name=data.agent_name) self._buffers[key] = entry + elif entry.agent_name is None and data.agent_name: + entry.agent_name = data.agent_name # Extract text content from payload payload = data.payload @@ -229,6 +240,7 @@ def _finalize_keys(self, keys: List[BufferKey]) -> List[SaveItem]: task_id=key[2], payload=payload, role=entry.role or Role.AGENT, + agent_name=entry.agent_name, ) ) if key in self._buffers: @@ -275,6 +287,7 @@ def _make_save_item_from_response(self, resp: BaseResponse) -> SaveItem: task_id=data.task_id, payload=bm, role=data.role, + agent_name=data.agent_name, ) def _make_save_item( @@ -292,4 +305,5 @@ def _make_save_item( task_id=data.task_id, payload=payload, role=data.role, + agent_name=data.agent_name, ) diff --git a/python/valuecell/core/coordinate/response_router.py b/python/valuecell/core/coordinate/response_router.py index da61c4b21..76f3ae957 100644 --- a/python/valuecell/core/coordinate/response_router.py +++ b/python/valuecell/core/coordinate/response_router.py @@ -83,6 +83,7 @@ async def handle_status_update( thread_id=thread_id, task_id=task.task_id, content=err_msg, + agent_name=task.agent_name, ) ) return RouteResult( @@ -113,6 +114,7 @@ async def handle_status_update( tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result, + agent_name=task.agent_name, ) ) return RouteResult(responses) @@ -127,6 +129,7 @@ async def handle_status_update( task_id=task.task_id, event=response_event, content=content, + agent_name=task.agent_name, ) ) return RouteResult(responses) @@ -144,6 +147,7 @@ async def handle_status_update( task_id=task.task_id, content=content, component_type=component_type, + agent_name=task.agent_name, ) ) return RouteResult(responses) @@ -157,6 +161,7 @@ async def handle_status_update( thread_id=thread_id, task_id=task.task_id, content=content, + agent_name=task.agent_name, ) ) return RouteResult(responses) diff --git a/python/valuecell/core/coordinate/tests/test_response_buffer.py b/python/valuecell/core/coordinate/tests/test_response_buffer.py index b84aedf37..3cb68cf97 100644 --- a/python/valuecell/core/coordinate/tests/test_response_buffer.py +++ b/python/valuecell/core/coordinate/tests/test_response_buffer.py @@ -35,17 +35,19 @@ def test_init_default(self): assert isinstance(entry.item_id, str) assert len(entry.item_id) > 0 assert entry.role is None + assert entry.agent_name is None def test_init_with_params(self): """Test BufferEntry initialization with parameters.""" item_id = "test-item-123" role = Role.USER - entry = BufferEntry(item_id=item_id, role=role) + entry = BufferEntry(item_id=item_id, role=role, agent_name="agent-test") assert entry.parts == [] assert isinstance(entry.last_updated, float) assert entry.item_id == item_id assert entry.role == role + assert entry.agent_name == "agent-test" def test_append_empty_text(self): """Test appending empty text.""" @@ -190,6 +192,7 @@ async def test_ingest_immediate_event_message(self): role=Role.USER, item_id="item-123", payload=BaseResponseDataPayload(content="Hello"), + agent_name="agent-immediate", ), ) @@ -202,6 +205,7 @@ async def test_ingest_immediate_event_message(self): assert result[0].conversation_id == "conv-123" assert result[0].role == Role.USER assert result[0].payload.content == "Hello" + assert result[0].agent_name == "agent-immediate" @pytest.mark.asyncio async def test_ingest_buffered_event_message_chunk(self): @@ -214,6 +218,7 @@ async def test_ingest_buffered_event_message_chunk(self): role=Role.AGENT, item_id="item-123", payload=BaseResponseDataPayload(content="Hello"), + agent_name="agent-buffer", ), ) @@ -225,6 +230,7 @@ async def test_ingest_buffered_event_message_chunk(self): assert result[0].conversation_id == "conv-123" assert result[0].role == Role.AGENT assert result[0].payload.content == "Hello" + assert result[0].agent_name == "agent-buffer" @pytest.mark.asyncio async def test_ingest_buffered_event_reasoning(self): diff --git a/python/valuecell/core/coordinate/tests/test_response_factory.py b/python/valuecell/core/coordinate/tests/test_response_factory.py index 3488e4fe7..f4825e87e 100644 --- a/python/valuecell/core/coordinate/tests/test_response_factory.py +++ b/python/valuecell/core/coordinate/tests/test_response_factory.py @@ -27,6 +27,7 @@ def _mk_item( conversation_id: str = "sess-1", thread_id: str | None = "th-1", task_id: str | None = "tk-1", + agent_name: str | None = None, ) -> ConversationItem: return ConversationItem( item_id=item_id, @@ -36,6 +37,7 @@ def _mk_item( thread_id=thread_id, task_id=task_id, payload=payload, + agent_name=agent_name, ) @@ -54,10 +56,15 @@ def test_thread_started_with_payload(factory: ResponseFactory): def test_message_chunk(factory: ResponseFactory): payload = BaseResponseDataPayload(content="chunk").model_dump_json() - item = _mk_item(event=StreamResponseEvent.MESSAGE_CHUNK.value, payload=payload) + item = _mk_item( + event=StreamResponseEvent.MESSAGE_CHUNK.value, + payload=payload, + agent_name="agent-x", + ) resp = factory.from_conversation_item(item) assert resp.event == StreamResponseEvent.MESSAGE_CHUNK assert resp.data.payload.content == "chunk" # type: ignore[attr-defined] + assert resp.data.agent_name == "agent-x" def test_notify_message(factory: ResponseFactory): diff --git a/python/valuecell/core/coordinate/tests/test_response_router.py b/python/valuecell/core/coordinate/tests/test_response_router.py index 7c374dc9b..52c3a0d87 100644 --- a/python/valuecell/core/coordinate/tests/test_response_router.py +++ b/python/valuecell/core/coordinate/tests/test_response_router.py @@ -173,6 +173,7 @@ async def test_failed_state(self): thread_id="thread-123", task_id="task-123", content="Task failed", + agent_name="test-agent", ) async def test_failed_state_with_complex_message(self): @@ -295,6 +296,7 @@ async def test_tool_call_event(self): tool_call_id="call-123", tool_name="test_tool", tool_result="Tool result", + agent_name="test-agent", ) async def test_tool_call_event_no_result(self): @@ -344,6 +346,7 @@ async def test_tool_call_event_no_result(self): tool_call_id="call-123", tool_name="test_tool", tool_result=None, + agent_name="test-agent", ) async def test_reasoning_event(self): @@ -391,6 +394,7 @@ async def test_reasoning_event(self): task_id="task-123", event="reasoning_started", content="Thinking...", + agent_name="test-agent", ) async def test_component_generator_event(self): @@ -437,6 +441,7 @@ async def test_component_generator_event(self): task_id="task-123", content="Generating component", component_type="button", + agent_name="test-agent", ) async def test_component_generator_event_no_component_type(self): @@ -481,6 +486,7 @@ async def test_component_generator_event_no_component_type(self): task_id="task-123", content="Generating component", component_type="unknown", + agent_name="test-agent", ) async def test_message_event(self): @@ -528,6 +534,7 @@ async def test_message_event(self): thread_id="thread-123", task_id="task-123", content="Hello world", + agent_name="test-agent", ) async def test_unknown_event_type(self): diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 9f748be8c..319f5406a 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -195,6 +195,9 @@ class ConversationItem(BaseModel): item_id: str = Field(..., description="Unique message identifier") role: Role = Field(..., description="Role of the message sender") + agent_name: Optional[str] = Field( + None, description="Name of the agent that sent this message" + ) event: ConversationItemEvent = Field(..., description="Event type of the message") conversation_id: str = Field( ..., description="Conversation ID this message belongs to" @@ -218,6 +221,9 @@ class UnifiedResponseData(BaseModel): None, description="Unique ID for the message thread" ) task_id: Optional[str] = Field(None, description="Unique ID for the task") + agent_name: Optional[str] = Field( + None, description="Name of the agent associated with this response" + ) payload: Optional[ResponsePayload] = Field( None, description="The message data payload" ) From f8eb58b2841deb3fcba215639abb47d457a40dba Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:12:49 +0800 Subject: [PATCH 3/8] feat: introduce ComponentType and SubagentConversationPhase enums for improved conversation handling --- python/valuecell/core/coordinate/orchestrator.py | 10 ++++++---- python/valuecell/core/types.py | 7 +++++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 25741b677..9f8ab1ae7 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -42,6 +42,8 @@ ConversationItemEvent, StreamResponseEvent, UserInput, + ComponentType, + SubagentConversationPhase, ) from valuecell.utils import resolve_db_path from valuecell.utils.i18n_utils import get_current_language, get_current_timezone @@ -661,10 +663,10 @@ async def _execute_plan_with_input_support( { "conversation_id": task.conversation_id, "agent_name": task.agent_name, - "phase": "start", + "phase": SubagentConversationPhase.START.value, } ), - component_type="subagent_conversation", + component_type=ComponentType.SUBAGENT_CONVERSATION.value, item_id=subagent_conversation_item_id, agent_name=task.agent_name, ) @@ -704,10 +706,10 @@ async def _execute_plan_with_input_support( { "conversation_id": task.conversation_id, "agent_name": task.agent_name, - "phase": "end", + "phase": SubagentConversationPhase.END.value, } ), - component_type="subagent_conversation", + component_type=ComponentType.SUBAGENT_CONVERSATION.value, item_id=subagent_conversation_item_id, agent_name=task.agent_name, ) diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 319f5406a..91f141f94 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -147,6 +147,13 @@ class ComponentType(str, Enum): REPORT = "report" PROFILE = "profile" + SUBAGENT_CONVERSATION = "subagent_conversation" + +class SubagentConversationPhase(str, Enum): + """Phases for subagent conversation component.""" + + START = "start" + END = "end" class ReportComponentData(BaseModel): From 2227f2369810b70e7da7c567a936c0a04a6a6881 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:19:33 +0800 Subject: [PATCH 4/8] fix format --- python/valuecell/core/coordinate/orchestrator.py | 4 ++-- python/valuecell/core/types.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 9f8ab1ae7..94659dbf5 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -39,11 +39,11 @@ from valuecell.core.task.models import TaskPattern from valuecell.core.types import ( BaseResponse, + ComponentType, ConversationItemEvent, StreamResponseEvent, - UserInput, - ComponentType, SubagentConversationPhase, + UserInput, ) from valuecell.utils import resolve_db_path from valuecell.utils.i18n_utils import get_current_language, get_current_timezone diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 91f141f94..6eb921fc0 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -149,6 +149,7 @@ class ComponentType(str, Enum): PROFILE = "profile" SUBAGENT_CONVERSATION = "subagent_conversation" + class SubagentConversationPhase(str, Enum): """Phases for subagent conversation component.""" From 94802fda04df01b6a865217f3cd436f3e695413a Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 13:56:49 +0800 Subject: [PATCH 5/8] feat: yield thread_started response with conversation details in AgentOrchestrator --- python/valuecell/core/coordinate/orchestrator.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 94659dbf5..9974cb123 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -670,6 +670,11 @@ async def _execute_plan_with_input_support( item_id=subagent_conversation_item_id, agent_name=task.agent_name, ) + yield self._response_factory.thread_started( + conversation_id=task.conversation_id, + thread_id=thread_id, + user_query=task.query, + ) try: # Register the task with TaskManager (persist in-memory) await self.task_manager.update_task(task) From e583de9235f0db96123fd96c5a836549f793ae59 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 14:10:45 +0800 Subject: [PATCH 6/8] refactor: replace UUID generation with dedicated functions for conversation and item IDs --- python/valuecell/core/conversation/manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/valuecell/core/conversation/manager.py b/python/valuecell/core/conversation/manager.py index 13b3dda86..293f5a44c 100644 --- a/python/valuecell/core/conversation/manager.py +++ b/python/valuecell/core/conversation/manager.py @@ -7,7 +7,7 @@ ResponsePayload, Role, ) -from valuecell.utils import generate_uuid +from valuecell.utils.uuid import generate_conversation_id, generate_item_id from .conversation_store import ConversationStore, InMemoryConversationStore from .item_store import InMemoryItemStore, ItemStore @@ -38,7 +38,7 @@ async def create_conversation( ) -> Conversation: """Create new conversation""" conversation = Conversation( - conversation_id=conversation_id or generate_uuid("conversation"), + conversation_id=conversation_id or generate_conversation_id(), user_id=user_id, title=title, ) @@ -113,7 +113,7 @@ async def add_item( payload_str = None item = ConversationItem( - item_id=item_id or generate_uuid("item"), + item_id=item_id or generate_item_id(), role=role, event=event, conversation_id=conversation_id, From f028eec8f046ab3b7b28b8b8f92578f633d26d90 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 14:10:52 +0800 Subject: [PATCH 7/8] feat: streamline subagent conversation handling by using a dictionary for content --- .../valuecell/core/coordinate/orchestrator.py | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 9974cb123..41416ee63 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -651,6 +651,11 @@ async def _execute_plan_with_input_support( for task in plan.tasks: subagent_conversation_item_id = generate_item_id() + subagent_component_content_dict = { + "conversation_id": task.conversation_id, + "agent_name": task.agent_name, + "phase": SubagentConversationPhase.START.value, + } await self.conversation_manager.create_conversation( plan.user_id, conversation_id=task.conversation_id ) @@ -659,13 +664,7 @@ async def _execute_plan_with_input_support( conversation_id=conversation_id, thread_id=thread_id, task_id=task.task_id, - content=json.dumps( - { - "conversation_id": task.conversation_id, - "agent_name": task.agent_name, - "phase": SubagentConversationPhase.START.value, - } - ), + content=json.dumps(subagent_component_content_dict), component_type=ComponentType.SUBAGENT_CONVERSATION.value, item_id=subagent_conversation_item_id, agent_name=task.agent_name, @@ -703,17 +702,12 @@ async def _execute_plan_with_input_support( ) finally: if task.handoff_from_super_agent: + subagent_component_content_dict["phase"] = SubagentConversationPhase.END.value yield self._response_factory.component_generator( conversation_id=conversation_id, thread_id=thread_id, task_id=task.task_id, - content=json.dumps( - { - "conversation_id": task.conversation_id, - "agent_name": task.agent_name, - "phase": SubagentConversationPhase.END.value, - } - ), + content=json.dumps(subagent_component_content_dict), component_type=ComponentType.SUBAGENT_CONVERSATION.value, item_id=subagent_conversation_item_id, agent_name=task.agent_name, From 58a1ca6fe7eaf82bec84983abb476866bdcda029 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 21 Oct 2025 19:20:16 +0800 Subject: [PATCH 8/8] fix ci --- .../core/conversation/tests/test_conv_manager.py | 4 ++-- .../valuecell/core/coordinate/tests/test_component_id.py | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/python/valuecell/core/conversation/tests/test_conv_manager.py b/python/valuecell/core/conversation/tests/test_conv_manager.py index 0b6046d7d..8d2cc2dd3 100644 --- a/python/valuecell/core/conversation/tests/test_conv_manager.py +++ b/python/valuecell/core/conversation/tests/test_conv_manager.py @@ -41,7 +41,7 @@ async def test_create_conversation_minimal(self): manager = ConversationManager() user_id = "user-123" - with patch("valuecell.core.conversation.manager.generate_uuid") as mock_uuid: + with patch("valuecell.core.conversation.manager.generate_conversation_id") as mock_uuid: mock_uuid.return_value = "conv-generated-123" result = await manager.create_conversation(user_id) @@ -226,7 +226,7 @@ async def test_add_item_success(self): manager.item_store.save_item = AsyncMock() manager.conversation_store.save_conversation = AsyncMock() - with patch("valuecell.core.conversation.manager.generate_uuid") as mock_uuid: + with patch("valuecell.core.conversation.manager.generate_item_id") as mock_uuid: mock_uuid.return_value = "item-generated-123" result = await manager.add_item( diff --git a/python/valuecell/core/coordinate/tests/test_component_id.py b/python/valuecell/core/coordinate/tests/test_component_id.py index e7d6e2189..190f2ed27 100644 --- a/python/valuecell/core/coordinate/tests/test_component_id.py +++ b/python/valuecell/core/coordinate/tests/test_component_id.py @@ -123,7 +123,6 @@ def test_response_factory_with_both_item_id_and_component_id(self): task_id="task_789", content='{"data": "test"}', component_type="test_component", - item_id="item_abc", component_id="component_xyz", ) @@ -140,7 +139,7 @@ def test_response_factory_with_only_item_id(self): task_id="task_789", content='{"data": "test"}', component_type="test_component", - item_id="item_custom", + component_id="item_custom", ) # item_id should be used @@ -157,7 +156,6 @@ def test_response_factory_priority_order(self): task_id="task", content="test", component_type="type", - item_id="item_id", component_id="component_id", ) assert r1.data.item_id == "component_id" @@ -169,10 +167,9 @@ def test_response_factory_priority_order(self): task_id="task", content="test", component_type="type", - item_id="item_id", component_id=None, ) - assert r2.data.item_id == "item_id" + assert r2.data.item_id != "item_id" # Priority 3: auto-generated (lowest) r3 = factory.component_generator( @@ -181,7 +178,6 @@ def test_response_factory_priority_order(self): task_id="task", content="test", component_type="type", - item_id=None, component_id=None, ) assert r3.data.item_id.startswith("item-")