From b55c1a49d9ddc9cb498c1871cff49a72724d3a5c Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 29 Nov 2025 13:49:11 +0800 Subject: [PATCH 1/5] feat: enhance tool call handling and frontend result formatting in orchestrator and event factory --- .../valuecell/core/coordinate/orchestrator.py | 31 ++++++++++++- python/valuecell/core/event/factory.py | 43 +++++++++++++++++- python/valuecell/core/plan/planner.py | 4 +- python/valuecell/core/task/executor.py | 44 ++++++++++++++++++- 4 files changed, 115 insertions(+), 7 deletions(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 9ac929d12..a789a6715 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -7,6 +7,7 @@ from valuecell.core.conversation import ConversationService, ConversationStatus from valuecell.core.event import EventResponseService from valuecell.core.plan import PlanService +from valuecell.core.plan.models import ExecutionPlan from valuecell.core.super_agent import ( SuperAgentDecision, SuperAgentOutcome, @@ -18,7 +19,7 @@ StreamResponseEvent, UserInput, ) -from valuecell.utils.uuid import generate_task_id, generate_thread_id +from valuecell.utils.uuid import generate_task_id, generate_thread_id, generate_uuid from .services import AgentServiceBundle @@ -361,6 +362,20 @@ async def _monitor_planning_task( conversation_id = user_input.meta.conversation_id user_id = user_input.meta.user_id + plan_task_id = generate_task_id() + plan_tool_call_id = generate_uuid("tool_call") + plan_tool_name = "generate_execution_plan" + yield await self.event_service.emit( + self.event_service.factory.tool_call( + conversation_id, + thread_id, + task_id=plan_task_id, + event=StreamResponseEvent.TOOL_CALL_STARTED, + tool_call_id=plan_tool_call_id, + tool_name=plan_tool_name, + ) + ) + # Wait for planning completion or user input request while not planning_task.done(): if self.plan_service.has_pending_request(conversation_id): @@ -389,7 +404,19 @@ async def _monitor_planning_task( await asyncio.sleep(ASYNC_SLEEP_INTERVAL) # Planning completed, execute plan - plan = await planning_task + plan: "ExecutionPlan" = await planning_task + + yield await self.event_service.emit( + self.event_service.factory.tool_call( + conversation_id, + thread_id, + task_id=plan_task_id, + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + tool_call_id=plan_tool_call_id, + tool_name=plan_tool_name, + tool_result="success", + ) + ) # Set conversation title once if not set yet and a task title is available if getattr(plan, "tasks", None): diff --git a/python/valuecell/core/event/factory.py b/python/valuecell/core/event/factory.py index 5ab4af648..72708d956 100644 --- a/python/valuecell/core/event/factory.py +++ b/python/valuecell/core/event/factory.py @@ -34,6 +34,41 @@ from valuecell.utils.uuid import generate_item_id, generate_uuid +def _format_tool_result_for_frontend(result: str | None) -> str | None: + """Format tool result as JSON array for frontend rendering. + + The frontend tool-call-renderer expects tool_result to be a JSON array + of objects with 'content' field, e.g., '[{"content": "result text"}]'. + + If the result is already in this format, return it unchanged. + + Args: + result: Raw tool result string. + + Returns: + JSON-formatted string compatible with frontend renderer, or None if empty. + """ + import json + + if not result: + return result + + # Check if already in expected format: [{"content": ...}] + try: + parsed = json.loads(result) + if ( + isinstance(parsed, list) + and len(parsed) > 0 + and isinstance(parsed[0], dict) + and "content" in parsed[0] + ): + return result + except (json.JSONDecodeError, TypeError): + pass + + return json.dumps([{"content": result}]) + + class ResponseFactory: def from_conversation_item(self, item: ConversationItem): """Reconstruct a BaseResponse from a persisted ConversationItem. @@ -398,6 +433,12 @@ def tool_call( Returns: ToolCallResponse containing a ToolCallPayload. """ + # Format tool_result for frontend when event is TOOL_CALL_COMPLETED + formatted_result = ( + _format_tool_result_for_frontend(tool_result) + if event == StreamResponseEvent.TOOL_CALL_COMPLETED + else tool_result + ) return ToolCallResponse( event=event, data=UnifiedResponseData( @@ -407,7 +448,7 @@ def tool_call( payload=ToolCallPayload( tool_call_id=tool_call_id, tool_name=tool_name, - tool_result=tool_result, + tool_result=formatted_result, ), role=Role.AGENT, agent_name=agent_name, diff --git a/python/valuecell/core/plan/planner.py b/python/valuecell/core/plan/planner.py index b6eb28cbc..39f29b03c 100644 --- a/python/valuecell/core/plan/planner.py +++ b/python/valuecell/core/plan/planner.py @@ -263,9 +263,9 @@ async def _analyze_input_and_create_tasks( logger.info(f"Planner produced plan: {plan_raw}") # Check if plan is inadequate or has no tasks + guidance_message = plan_raw.guidance_message or plan_raw.reason if not plan_raw.adequate or not plan_raw.tasks: # Use guidance_message from planner, or fall back to reason - guidance_message = plan_raw.guidance_message or plan_raw.reason logger.info(f"Planner needs user guidance: {guidance_message}") return [], guidance_message # Return empty task list with guidance @@ -308,7 +308,7 @@ async def _analyze_input_and_create_tasks( ) ) - return tasks, None # Return tasks with no guidance message + return tasks, guidance_message # Return tasks with no guidance message def _create_task( self, diff --git a/python/valuecell/core/task/executor.py b/python/valuecell/core/task/executor.py index 0aa32e339..8e168f256 100644 --- a/python/valuecell/core/task/executor.py +++ b/python/valuecell/core/task/executor.py @@ -33,7 +33,7 @@ ) from valuecell.utils.i18n_utils import get_current_language, get_current_timezone from valuecell.utils.user_profile_utils import get_user_profile_metadata -from valuecell.utils.uuid import generate_item_id, generate_task_id +from valuecell.utils.uuid import generate_item_id, generate_task_id, generate_uuid class ScheduledTaskResultAccumulator: @@ -125,7 +125,6 @@ async def execute_plan( agent_name="Planner", ) yield await self._event_service.emit(response) - return for task in plan.tasks: subagent_component_id = generate_item_id() @@ -328,10 +327,51 @@ async def _execute_single_task_run( metadata: dict, ) -> AsyncGenerator[BaseResponse, None]: agent_name = task.agent_name + + # Emit a tool-call STARTED event for invoking the agent (get_client) + tool_call_id = generate_uuid("toolcall") + tool_task_id = generate_task_id() + tool_name = "connect_agent" + yield await self._event_service.emit( + self._event_service.factory.tool_call( + task.conversation_id, + thread_id, + task_id=tool_task_id, + event=StreamResponseEvent.TOOL_CALL_STARTED, + tool_call_id=tool_call_id, + tool_name=tool_name, + ) + ) + client = await self._agent_connections.get_client(agent_name) if not client: + # Emit a TOOL_CALL_COMPLETED with a failure message (no client) + yield await self._event_service.emit( + self._event_service.factory.tool_call( + task.conversation_id, + thread_id, + task_id=tool_task_id, + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + tool_call_id=tool_call_id, + tool_name=tool_name, + tool_result=f"ERROR: could not connect to agent {agent_name}", + ) + ) raise RuntimeError(f"Could not connect to agent {agent_name}") + # Emit a TOOL_CALL_COMPLETED indicating successful client acquisition + yield await self._event_service.emit( + self._event_service.factory.tool_call( + task.conversation_id, + thread_id, + task_id=tool_task_id, + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + tool_call_id=tool_call_id, + tool_name="invoke_agent", + tool_result="connected", + ) + ) + remote_response = await client.send_message( task.query, conversation_id=task.conversation_id, From 12fe576df94a5b9c45ca91270a321661bbe6412f Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 29 Nov 2025 13:54:43 +0800 Subject: [PATCH 2/5] test: improve assertions in agent connection error test and add tests for tool call result formatting --- .../coordinate/tests/test_orchestrator.py | 6 +- .../core/event/tests/test_response_factory.py | 121 +++++++++++++++++- 2 files changed, 125 insertions(+), 2 deletions(-) diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index 0b303b27e..3c35532b3 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -494,7 +494,11 @@ async def test_agent_connection_error( async for chunk in orchestrator.process_user_input(sample_user_input): out.append(chunk) - assert any("(Error)" in c.data.payload.content for c in out if c.data.payload) + assert any( + hasattr(c.data.payload, "content") and "(Error)" in c.data.payload.content + for c in out + if c.data.payload + ) @pytest.mark.asyncio diff --git a/python/valuecell/core/event/tests/test_response_factory.py b/python/valuecell/core/event/tests/test_response_factory.py index 7c29e0060..6c006976f 100644 --- a/python/valuecell/core/event/tests/test_response_factory.py +++ b/python/valuecell/core/event/tests/test_response_factory.py @@ -1,7 +1,11 @@ import json import pytest -from valuecell.core.event.factory import ResponseFactory + +from valuecell.core.event.factory import ( + ResponseFactory, + _format_tool_result_for_frontend, +) from valuecell.core.task.models import Task from valuecell.core.types import ( BaseResponseDataPayload, @@ -161,3 +165,118 @@ def test_schedule_task_result_component(factory: ResponseFactory): assert resp.data.agent_name == "agent" assert resp.data.metadata == {"task_title": "Daily summary"} assert resp.data.payload.content == '{"result":1}' # type: ignore[attr-defined] + + +# ============================================================ +# Tests for _format_tool_result_for_frontend +# ============================================================ + + +class TestFormatToolResultForFrontend: + """Tests for the _format_tool_result_for_frontend helper function.""" + + def test_none_returns_none(self): + assert _format_tool_result_for_frontend(None) is None + + def test_empty_string_returns_empty(self): + assert _format_tool_result_for_frontend("") == "" + + def test_plain_string_wrapped_in_json_array(self): + result = _format_tool_result_for_frontend("SUCCESS") + parsed = json.loads(result) + assert parsed == [{"content": "SUCCESS"}] + + def test_already_formatted_unchanged(self): + already_formatted = '[{"content": "some result"}]' + result = _format_tool_result_for_frontend(already_formatted) + assert result == already_formatted + + def test_already_formatted_multiple_items_unchanged(self): + already_formatted = '[{"content": "item1"}, {"content": "item2"}]' + result = _format_tool_result_for_frontend(already_formatted) + assert result == already_formatted + + def test_json_object_without_content_wrapped(self): + # JSON object without 'content' key should be wrapped + input_str = '{"key": "value"}' + result = _format_tool_result_for_frontend(input_str) + parsed = json.loads(result) + assert parsed == [{"content": '{"key": "value"}'}] + + def test_json_array_without_content_wrapped(self): + # JSON array without 'content' in first element should be wrapped + input_str = '[{"other": "field"}]' + result = _format_tool_result_for_frontend(input_str) + parsed = json.loads(result) + assert parsed == [{"content": '[{"other": "field"}]'}] + + def test_error_message_wrapped(self): + result = _format_tool_result_for_frontend("ERROR: connection failed") + parsed = json.loads(result) + assert parsed == [{"content": "ERROR: connection failed"}] + + +# ============================================================ +# Tests for tool_call method with formatting +# ============================================================ + + +class TestToolCallMethodFormatting: + """Tests for ResponseFactory.tool_call with tool_result formatting.""" + + def test_tool_call_started_no_formatting(self, factory: ResponseFactory): + resp = factory.tool_call( + conversation_id="conv-1", + thread_id="th-1", + task_id="tk-1", + event=StreamResponseEvent.TOOL_CALL_STARTED, + tool_call_id="tc-1", + tool_name="search", + tool_result=None, + ) + assert resp.event == StreamResponseEvent.TOOL_CALL_STARTED + assert resp.data.payload.tool_result is None # type: ignore[attr-defined] + + def test_tool_call_completed_formats_plain_string(self, factory: ResponseFactory): + resp = factory.tool_call( + conversation_id="conv-1", + thread_id="th-1", + task_id="tk-1", + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + tool_call_id="tc-1", + tool_name="search", + tool_result="Success", + ) + assert resp.event == StreamResponseEvent.TOOL_CALL_COMPLETED + result = resp.data.payload.tool_result # type: ignore[attr-defined] + parsed = json.loads(result) + assert parsed == [{"content": "Success"}] + + def test_tool_call_completed_preserves_already_formatted( + self, factory: ResponseFactory + ): + already_formatted = '[{"content": "formatted result"}]' + resp = factory.tool_call( + conversation_id="conv-1", + thread_id="th-1", + task_id="tk-1", + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + tool_call_id="tc-1", + tool_name="search", + tool_result=already_formatted, + ) + assert resp.event == StreamResponseEvent.TOOL_CALL_COMPLETED + assert resp.data.payload.tool_result == already_formatted # type: ignore[attr-defined] + + def test_tool_call_completed_with_none_result(self, factory: ResponseFactory): + resp = factory.tool_call( + conversation_id="conv-1", + thread_id="th-1", + task_id="tk-1", + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + tool_call_id="tc-1", + tool_name="search", + tool_result=None, + ) + assert resp.event == StreamResponseEvent.TOOL_CALL_COMPLETED + assert resp.data.payload.tool_result is None # type: ignore[attr-defined] From 92cfb966f2eb2044eb459c701f35f628bf2902e6 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 29 Nov 2025 14:00:33 +0800 Subject: [PATCH 3/5] feat: implement tool call events for super agent triage process --- .../valuecell/core/coordinate/orchestrator.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index a789a6715..bb42b7b1b 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -296,9 +296,37 @@ async def _handle_new_request( # 1) Super Agent triage phase (pre-planning) - skip if target agent is specified if user_input.target_agent_name == self.super_agent_service.name: + # Emit tool-call STARTED for super agent triage + sa_task_id = generate_task_id() + sa_tool_call_id = generate_uuid("toolcall") + sa_tool_name = "super_agent_triage" + yield await self.event_service.emit( + self.event_service.factory.tool_call( + conversation_id, + thread_id, + task_id=sa_task_id, + event=StreamResponseEvent.TOOL_CALL_STARTED, + tool_call_id=sa_tool_call_id, + tool_name=sa_tool_name, + ) + ) + super_outcome: SuperAgentOutcome = await self.super_agent_service.run( user_input ) + + yield await self.event_service.emit( + self.event_service.factory.tool_call( + conversation_id, + thread_id, + task_id=sa_task_id, + event=StreamResponseEvent.TOOL_CALL_COMPLETED, + tool_call_id=sa_tool_call_id, + tool_name=sa_tool_name, + tool_result=f"Decision: {super_outcome.decision.value}", + ) + ) + if super_outcome.answer_content: ans = self.event_service.factory.message_response_general( StreamResponseEvent.MESSAGE_CHUNK, From 8d4c2fc9f14b26624129b5451a5b785a47bd612f Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 29 Nov 2025 14:03:10 +0800 Subject: [PATCH 4/5] feat: update tool call result messaging to include guidance details --- python/valuecell/core/coordinate/orchestrator.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index bb42b7b1b..8568d757a 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -442,7 +442,11 @@ async def _monitor_planning_task( event=StreamResponseEvent.TOOL_CALL_COMPLETED, tool_call_id=plan_tool_call_id, tool_name=plan_tool_name, - tool_result="success", + tool_result=( + f"Reason: {plan.guidance_message}" + if plan.guidance_message + else "Completed" + ), ) ) From ad4f0555db8212cef0ddcb4bdfe9d3c1193f7fcd Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 29 Nov 2025 14:14:08 +0800 Subject: [PATCH 5/5] feat: update tool name in tool call completion event --- python/valuecell/core/task/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/valuecell/core/task/executor.py b/python/valuecell/core/task/executor.py index 8e168f256..c2e1beda4 100644 --- a/python/valuecell/core/task/executor.py +++ b/python/valuecell/core/task/executor.py @@ -367,7 +367,7 @@ async def _execute_single_task_run( task_id=tool_task_id, event=StreamResponseEvent.TOOL_CALL_COMPLETED, tool_call_id=tool_call_id, - tool_name="invoke_agent", + tool_name=tool_name, tool_result="connected", ) )