Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions python/valuecell/core/coordinate/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from valuecell.core.conversation import ConversationService, ConversationStatus
from valuecell.core.event import EventResponseService
from valuecell.core.plan import PlanService
from valuecell.core.plan.models import ExecutionPlan
from valuecell.core.super_agent import (
SuperAgentDecision,
SuperAgentOutcome,
Expand All @@ -18,7 +19,7 @@
StreamResponseEvent,
UserInput,
)
from valuecell.utils.uuid import generate_task_id, generate_thread_id
from valuecell.utils.uuid import generate_task_id, generate_thread_id, generate_uuid

from .services import AgentServiceBundle

Expand Down Expand Up @@ -295,9 +296,37 @@ async def _handle_new_request(

# 1) Super Agent triage phase (pre-planning) - skip if target agent is specified
if user_input.target_agent_name == self.super_agent_service.name:
# Emit tool-call STARTED for super agent triage
sa_task_id = generate_task_id()
sa_tool_call_id = generate_uuid("toolcall")
sa_tool_name = "super_agent_triage"
yield await self.event_service.emit(
self.event_service.factory.tool_call(
conversation_id,
thread_id,
task_id=sa_task_id,
event=StreamResponseEvent.TOOL_CALL_STARTED,
tool_call_id=sa_tool_call_id,
tool_name=sa_tool_name,
)
)

super_outcome: SuperAgentOutcome = await self.super_agent_service.run(
user_input
)

yield await self.event_service.emit(
self.event_service.factory.tool_call(
conversation_id,
thread_id,
task_id=sa_task_id,
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
tool_call_id=sa_tool_call_id,
tool_name=sa_tool_name,
tool_result=f"Decision: {super_outcome.decision.value}",
)
)

if super_outcome.answer_content:
ans = self.event_service.factory.message_response_general(
StreamResponseEvent.MESSAGE_CHUNK,
Expand Down Expand Up @@ -361,6 +390,20 @@ async def _monitor_planning_task(
conversation_id = user_input.meta.conversation_id
user_id = user_input.meta.user_id

plan_task_id = generate_task_id()
plan_tool_call_id = generate_uuid("tool_call")
plan_tool_name = "generate_execution_plan"
yield await self.event_service.emit(
self.event_service.factory.tool_call(
conversation_id,
thread_id,
task_id=plan_task_id,
event=StreamResponseEvent.TOOL_CALL_STARTED,
tool_call_id=plan_tool_call_id,
tool_name=plan_tool_name,
)
)

# Wait for planning completion or user input request
while not planning_task.done():
if self.plan_service.has_pending_request(conversation_id):
Expand Down Expand Up @@ -389,7 +432,23 @@ async def _monitor_planning_task(
await asyncio.sleep(ASYNC_SLEEP_INTERVAL)

# Planning completed, execute plan
plan = await planning_task
plan: "ExecutionPlan" = await planning_task

yield await self.event_service.emit(
self.event_service.factory.tool_call(
conversation_id,
thread_id,
task_id=plan_task_id,
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
tool_call_id=plan_tool_call_id,
tool_name=plan_tool_name,
tool_result=(
f"Reason: {plan.guidance_message}"
if plan.guidance_message
else "Completed"
),
)
)

# Set conversation title once if not set yet and a task title is available
if getattr(plan, "tasks", None):
Expand Down
6 changes: 5 additions & 1 deletion python/valuecell/core/coordinate/tests/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,11 @@ async def test_agent_connection_error(
async for chunk in orchestrator.process_user_input(sample_user_input):
out.append(chunk)

assert any("(Error)" in c.data.payload.content for c in out if c.data.payload)
assert any(
hasattr(c.data.payload, "content") and "(Error)" in c.data.payload.content
for c in out
if c.data.payload
)


@pytest.mark.asyncio
Expand Down
43 changes: 42 additions & 1 deletion python/valuecell/core/event/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,41 @@
from valuecell.utils.uuid import generate_item_id, generate_uuid


def _format_tool_result_for_frontend(result: str | None) -> str | None:
"""Format tool result as JSON array for frontend rendering.

The frontend tool-call-renderer expects tool_result to be a JSON array
of objects with 'content' field, e.g., '[{"content": "result text"}]'.

If the result is already in this format, return it unchanged.

Args:
result: Raw tool result string.

Returns:
JSON-formatted string compatible with frontend renderer, or None if empty.
"""
import json

if not result:
return result

# Check if already in expected format: [{"content": ...}]
try:
parsed = json.loads(result)
if (
isinstance(parsed, list)
and len(parsed) > 0
and isinstance(parsed[0], dict)
and "content" in parsed[0]
):
return result
except (json.JSONDecodeError, TypeError):
pass

return json.dumps([{"content": result}])


class ResponseFactory:
def from_conversation_item(self, item: ConversationItem):
"""Reconstruct a BaseResponse from a persisted ConversationItem.
Expand Down Expand Up @@ -398,6 +433,12 @@ def tool_call(
Returns:
ToolCallResponse containing a ToolCallPayload.
"""
# Format tool_result for frontend when event is TOOL_CALL_COMPLETED
formatted_result = (
_format_tool_result_for_frontend(tool_result)
if event == StreamResponseEvent.TOOL_CALL_COMPLETED
else tool_result
)
return ToolCallResponse(
event=event,
data=UnifiedResponseData(
Expand All @@ -407,7 +448,7 @@ def tool_call(
payload=ToolCallPayload(
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_result=tool_result,
tool_result=formatted_result,
),
role=Role.AGENT,
agent_name=agent_name,
Expand Down
121 changes: 120 additions & 1 deletion python/valuecell/core/event/tests/test_response_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import json

import pytest
from valuecell.core.event.factory import ResponseFactory

from valuecell.core.event.factory import (
ResponseFactory,
_format_tool_result_for_frontend,
)
from valuecell.core.task.models import Task
from valuecell.core.types import (
BaseResponseDataPayload,
Expand Down Expand Up @@ -161,3 +165,118 @@ def test_schedule_task_result_component(factory: ResponseFactory):
assert resp.data.agent_name == "agent"
assert resp.data.metadata == {"task_title": "Daily summary"}
assert resp.data.payload.content == '{"result":1}' # type: ignore[attr-defined]


# ============================================================
# Tests for _format_tool_result_for_frontend
# ============================================================


class TestFormatToolResultForFrontend:
"""Tests for the _format_tool_result_for_frontend helper function."""

def test_none_returns_none(self):
assert _format_tool_result_for_frontend(None) is None

def test_empty_string_returns_empty(self):
assert _format_tool_result_for_frontend("") == ""

def test_plain_string_wrapped_in_json_array(self):
result = _format_tool_result_for_frontend("SUCCESS")
parsed = json.loads(result)
assert parsed == [{"content": "SUCCESS"}]

def test_already_formatted_unchanged(self):
already_formatted = '[{"content": "some result"}]'
result = _format_tool_result_for_frontend(already_formatted)
assert result == already_formatted

def test_already_formatted_multiple_items_unchanged(self):
already_formatted = '[{"content": "item1"}, {"content": "item2"}]'
result = _format_tool_result_for_frontend(already_formatted)
assert result == already_formatted

def test_json_object_without_content_wrapped(self):
# JSON object without 'content' key should be wrapped
input_str = '{"key": "value"}'
result = _format_tool_result_for_frontend(input_str)
parsed = json.loads(result)
assert parsed == [{"content": '{"key": "value"}'}]

def test_json_array_without_content_wrapped(self):
# JSON array without 'content' in first element should be wrapped
input_str = '[{"other": "field"}]'
result = _format_tool_result_for_frontend(input_str)
parsed = json.loads(result)
assert parsed == [{"content": '[{"other": "field"}]'}]

def test_error_message_wrapped(self):
result = _format_tool_result_for_frontend("ERROR: connection failed")
parsed = json.loads(result)
assert parsed == [{"content": "ERROR: connection failed"}]


# ============================================================
# Tests for tool_call method with formatting
# ============================================================


class TestToolCallMethodFormatting:
"""Tests for ResponseFactory.tool_call with tool_result formatting."""

def test_tool_call_started_no_formatting(self, factory: ResponseFactory):
resp = factory.tool_call(
conversation_id="conv-1",
thread_id="th-1",
task_id="tk-1",
event=StreamResponseEvent.TOOL_CALL_STARTED,
tool_call_id="tc-1",
tool_name="search",
tool_result=None,
)
assert resp.event == StreamResponseEvent.TOOL_CALL_STARTED
assert resp.data.payload.tool_result is None # type: ignore[attr-defined]

def test_tool_call_completed_formats_plain_string(self, factory: ResponseFactory):
resp = factory.tool_call(
conversation_id="conv-1",
thread_id="th-1",
task_id="tk-1",
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
tool_call_id="tc-1",
tool_name="search",
tool_result="Success",
)
assert resp.event == StreamResponseEvent.TOOL_CALL_COMPLETED
result = resp.data.payload.tool_result # type: ignore[attr-defined]
parsed = json.loads(result)
assert parsed == [{"content": "Success"}]

def test_tool_call_completed_preserves_already_formatted(
self, factory: ResponseFactory
):
already_formatted = '[{"content": "formatted result"}]'
resp = factory.tool_call(
conversation_id="conv-1",
thread_id="th-1",
task_id="tk-1",
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
tool_call_id="tc-1",
tool_name="search",
tool_result=already_formatted,
)
assert resp.event == StreamResponseEvent.TOOL_CALL_COMPLETED
assert resp.data.payload.tool_result == already_formatted # type: ignore[attr-defined]

def test_tool_call_completed_with_none_result(self, factory: ResponseFactory):
resp = factory.tool_call(
conversation_id="conv-1",
thread_id="th-1",
task_id="tk-1",
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
tool_call_id="tc-1",
tool_name="search",
tool_result=None,
)
assert resp.event == StreamResponseEvent.TOOL_CALL_COMPLETED
assert resp.data.payload.tool_result is None # type: ignore[attr-defined]
4 changes: 2 additions & 2 deletions python/valuecell/core/plan/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ async def _analyze_input_and_create_tasks(
logger.info(f"Planner produced plan: {plan_raw}")

# Check if plan is inadequate or has no tasks
guidance_message = plan_raw.guidance_message or plan_raw.reason
if not plan_raw.adequate or not plan_raw.tasks:
# Use guidance_message from planner, or fall back to reason
guidance_message = plan_raw.guidance_message or plan_raw.reason
logger.info(f"Planner needs user guidance: {guidance_message}")
return [], guidance_message # Return empty task list with guidance

Expand Down Expand Up @@ -308,7 +308,7 @@ async def _analyze_input_and_create_tasks(
)
)

return tasks, None # Return tasks with no guidance message
return tasks, guidance_message # Return tasks with no guidance message

def _create_task(
self,
Expand Down
44 changes: 42 additions & 2 deletions python/valuecell/core/task/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)
from valuecell.utils.i18n_utils import get_current_language, get_current_timezone
from valuecell.utils.user_profile_utils import get_user_profile_metadata
from valuecell.utils.uuid import generate_item_id, generate_task_id
from valuecell.utils.uuid import generate_item_id, generate_task_id, generate_uuid


class ScheduledTaskResultAccumulator:
Expand Down Expand Up @@ -125,7 +125,6 @@ async def execute_plan(
agent_name="Planner",
)
yield await self._event_service.emit(response)
return

for task in plan.tasks:
subagent_component_id = generate_item_id()
Expand Down Expand Up @@ -328,10 +327,51 @@ async def _execute_single_task_run(
metadata: dict,
) -> AsyncGenerator[BaseResponse, None]:
agent_name = task.agent_name

# Emit a tool-call STARTED event for invoking the agent (get_client)
tool_call_id = generate_uuid("toolcall")
tool_task_id = generate_task_id()
tool_name = "connect_agent"
yield await self._event_service.emit(
self._event_service.factory.tool_call(
task.conversation_id,
thread_id,
task_id=tool_task_id,
event=StreamResponseEvent.TOOL_CALL_STARTED,
tool_call_id=tool_call_id,
tool_name=tool_name,
)
)

client = await self._agent_connections.get_client(agent_name)
if not client:
# Emit a TOOL_CALL_COMPLETED with a failure message (no client)
yield await self._event_service.emit(
self._event_service.factory.tool_call(
task.conversation_id,
thread_id,
task_id=tool_task_id,
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_result=f"ERROR: could not connect to agent {agent_name}",
)
)
raise RuntimeError(f"Could not connect to agent {agent_name}")

# Emit a TOOL_CALL_COMPLETED indicating successful client acquisition
yield await self._event_service.emit(
self._event_service.factory.tool_call(
task.conversation_id,
thread_id,
task_id=tool_task_id,
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_result="connected",
)
)

remote_response = await client.send_message(
task.query,
conversation_id=task.conversation_id,
Expand Down