From 5987400ef7809edde208d7f6e91a5f0dbc38fd6a Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:58:38 +0800 Subject: [PATCH 1/6] feat: introduce super agent reasoning --- .../chat-conversation/chat-item-area.tsx | 11 ++ .../components/valuecell/renderer/index.tsx | 1 + .../valuecell/renderer/reasoning-renderer.tsx | 69 ++++++++++ frontend/src/constants/agent.ts | 3 + frontend/src/lib/agent-store.ts | 120 ++++++++++++++++-- frontend/src/types/renderer.ts | 4 + .../valuecell/core/coordinate/orchestrator.py | 59 ++++++--- python/valuecell/core/event/buffer.py | 6 + python/valuecell/core/event/factory.py | 4 + python/valuecell/core/super_agent/core.py | 46 ++++--- python/valuecell/core/super_agent/service.py | 7 +- 11 files changed, 283 insertions(+), 47 deletions(-) create mode 100644 frontend/src/components/valuecell/renderer/reasoning-renderer.tsx diff --git a/frontend/src/app/agent/components/chat-conversation/chat-item-area.tsx b/frontend/src/app/agent/components/chat-conversation/chat-item-area.tsx index eefb05a01..0355da699 100644 --- a/frontend/src/app/agent/components/chat-conversation/chat-item-area.tsx +++ b/frontend/src/app/agent/components/chat-conversation/chat-item-area.tsx @@ -1,3 +1,4 @@ +import { parse } from "best-effort-json-parser"; import { type FC, memo } from "react"; import { UnknownRenderer } from "@/components/valuecell/renderer"; import { COMPONENT_RENDERER_MAP } from "@/constants/agent"; @@ -44,6 +45,16 @@ const ChatItemArea: FC = ({ items }) => { case "scheduled_task_controller": return ; + case "reasoning": { + const parsed = parse(item.payload.content); + return ( + + ); + } + case "report": return ( = ({ + content, + isComplete, +}) => { + const [isOpen, setIsOpen] = useState(false); + const hasContent = content && content.trim().length > 0; + + return ( + + +
+ {isComplete ? ( + + ) : ( + + )} +

+ {isComplete ? "Thinking" : "Thinking..."} +

+
+ {hasContent && ( + + )} +
+ + {/* Collapsible Content */} + +
+ {hasContent && ( + + )} +
+
+
+ ); +}; + +export default memo(ReasoningRenderer); diff --git a/frontend/src/constants/agent.ts b/frontend/src/constants/agent.ts index bef6c7dda..8415efebd 100644 --- a/frontend/src/constants/agent.ts +++ b/frontend/src/constants/agent.ts @@ -25,6 +25,7 @@ import { import { ChatConversationRenderer, MarkdownRenderer, + ReasoningRenderer, ReportRenderer, ScheduledTaskControllerRenderer, ScheduledTaskRenderer, @@ -43,6 +44,7 @@ export const AGENT_MULTI_SECTION_COMPONENT_TYPE = ["report"] as const; // agent component type export const AGENT_COMPONENT_TYPE = [ "markdown", + "reasoning", "tool_call", "subagent_conversation", "scheduled_task_controller", @@ -59,6 +61,7 @@ export const COMPONENT_RENDERER_MAP: { scheduled_task_result: ScheduledTaskRenderer, scheduled_task_controller: ScheduledTaskControllerRenderer, report: ReportRenderer, + reasoning: ReasoningRenderer, markdown: MarkdownRenderer, tool_call: ToolCallRenderer, subagent_conversation: ChatConversationRenderer, diff --git a/frontend/src/lib/agent-store.ts b/frontend/src/lib/agent-store.ts index 773fe3fc0..31b31c46b 100644 --- a/frontend/src/lib/agent-store.ts +++ b/frontend/src/lib/agent-store.ts @@ -60,11 +60,50 @@ function hasContent( return "payload" in item && "content" in item.payload; } +// Mark a specific reasoning item as complete +function markReasoningComplete(task: TaskView, itemId: string): void { + const existingIndex = task.items.findIndex((item) => item.item_id === itemId); + if (existingIndex >= 0 && hasContent(task.items[existingIndex])) { + try { + const parsed = JSON.parse(task.items[existingIndex].payload.content); + task.items[existingIndex].payload.content = JSON.stringify({ + ...parsed, + isComplete: true, + }); + } catch { + // If parsing fails, just mark as complete + task.items[existingIndex].payload.content = JSON.stringify({ + content: task.items[existingIndex].payload.content, + isComplete: true, + }); + } + } +} + +// Mark all reasoning items in a task as complete +function markAllReasoningComplete(task: TaskView): void { + for (const item of task.items) { + if (item.component_type === "reasoning" && hasContent(item)) { + try { + const parsed = JSON.parse(item.payload.content); + if (!parsed.isComplete) { + item.payload.content = JSON.stringify({ + ...parsed, + isComplete: true, + }); + } + } catch { + // Skip items that can't be parsed + } + } + } +} + // Helper function: add or update item in task function addOrUpdateItem( task: TaskView, newItem: ChatItem, - event: "append" | "replace", + event: "append" | "replace" | "append-reasoning", ): void { const existingIndex = task.items.findIndex( (item) => item.item_id === newItem.item_id, @@ -79,6 +118,23 @@ function addOrUpdateItem( // Merge content for streaming events, replace for others if (event === "append" && hasContent(existingItem) && hasContent(newItem)) { existingItem.payload.content += newItem.payload.content; + } else if ( + event === "append-reasoning" && + hasContent(existingItem) && + hasContent(newItem) + ) { + // Special handling for reasoning: parse JSON, append content, re-serialize + try { + const existingParsed = JSON.parse(existingItem.payload.content); + const newParsed = JSON.parse(newItem.payload.content); + existingItem.payload.content = JSON.stringify({ + content: (existingParsed.content ?? "") + (newParsed.content ?? ""), + isComplete: newParsed.isComplete ?? false, + }); + } catch { + // Fallback to replace if parsing fails + task.items[existingIndex] = newItem; + } } else { task.items[existingIndex] = newItem; } @@ -88,7 +144,7 @@ function addOrUpdateItem( function handleChatItemEvent( draft: AgentConversationsStore, data: ChatItem, - event: "append" | "replace" = "append", + event: "append" | "replace" | "append-reasoning" = "append", ) { const { conversation, task } = ensurePath(draft, data); @@ -144,7 +200,6 @@ function processSSEEvent(draft: AgentConversationsStore, sseData: SSEData) { case "thread_started": case "message_chunk": case "message": - case "reasoning": case "task_failed": case "plan_failed": case "plan_require_user_input": @@ -152,6 +207,49 @@ function processSSEEvent(draft: AgentConversationsStore, sseData: SSEData) { handleChatItemEvent(draft, { component_type: "markdown", ...data }); break; + case "reasoning": + // Reasoning is streaming content that needs to be appended (like message_chunk) + handleChatItemEvent( + draft, + { + component_type: "reasoning", + ...data, + payload: { + content: JSON.stringify({ + content: data.payload.content, + isComplete: false, + }), + }, + }, + "append-reasoning", + ); + break; + + case "reasoning_started": + // Create initial reasoning item with empty content + handleChatItemEvent( + draft, + { + component_type: "reasoning", + ...data, + payload: { + content: JSON.stringify({ + content: "", + isComplete: false, + }), + }, + }, + "replace", + ); + break; + + case "reasoning_completed": { + // Mark reasoning as complete + const { task } = ensurePath(draft, data); + markReasoningComplete(task, data.item_id); + break; + } + case "tool_call_started": case "tool_call_completed": { handleChatItemEvent( @@ -168,11 +266,6 @@ function processSSEEvent(draft: AgentConversationsStore, sseData: SSEData) { break; } - case "reasoning_started": - case "reasoning_completed": - ensurePath(draft, data); - break; - default: break; } @@ -212,5 +305,16 @@ export function batchUpdateAgentConversationsStore( for (const sseData of sseDataList) { processSSEEvent(draft, sseData); } + + // Mark all reasoning items as complete after loading history + // since the stream has already finished + const conversation = draft[conversationId]; + if (conversation) { + for (const thread of Object.values(conversation.threads)) { + for (const task of Object.values(thread.tasks)) { + markAllReasoningComplete(task); + } + } + } }); } diff --git a/frontend/src/types/renderer.ts b/frontend/src/types/renderer.ts index b67f52fc0..716636e70 100644 --- a/frontend/src/types/renderer.ts +++ b/frontend/src/types/renderer.ts @@ -10,6 +10,9 @@ export type BaseRendererProps = { export type ReportRendererProps = BaseRendererProps & { isActive?: boolean; }; +export type ReasoningRendererProps = BaseRendererProps & { + isComplete?: boolean; +}; export type ScheduledTaskRendererProps = BaseRendererProps; export type ScheduledTaskControllerRendererProps = BaseRendererProps; export type MarkdownRendererProps = BaseRendererProps; @@ -24,6 +27,7 @@ export type RendererPropsMap = { scheduled_task_result: ScheduledTaskRendererProps; scheduled_task_controller: ScheduledTaskControllerRendererProps; report: ReportRendererProps; + reasoning: ReasoningRendererProps; markdown: MarkdownRendererProps; tool_call: ToolCallRendererProps; subagent_conversation: ChatConversationRendererProps; diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index 8568d757a..931e29433 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -296,37 +296,60 @@ async def _handle_new_request( # 1) Super Agent triage phase (pre-planning) - skip if target agent is specified if user_input.target_agent_name == self.super_agent_service.name: - # Emit tool-call STARTED for super agent triage + # Emit reasoning_started before streaming reasoning content sa_task_id = generate_task_id() - sa_tool_call_id = generate_uuid("toolcall") - sa_tool_name = "super_agent_triage" + sa_reasoning_item_id = generate_uuid("reasoning") yield await self.event_service.emit( - self.event_service.factory.tool_call( + self.event_service.factory.reasoning( conversation_id, thread_id, task_id=sa_task_id, - event=StreamResponseEvent.TOOL_CALL_STARTED, - tool_call_id=sa_tool_call_id, - tool_name=sa_tool_name, - ) - ) - - super_outcome: SuperAgentOutcome = await self.super_agent_service.run( - user_input + event=StreamResponseEvent.REASONING_STARTED, + agent_name=self.super_agent_service.name, + item_id=sa_reasoning_item_id, + ), ) + # Stream reasoning content and collect final outcome + super_outcome: SuperAgentOutcome | None = None + async for item in self.super_agent_service.run(user_input): + if isinstance(item, str): + # Yield reasoning chunk + yield await self.event_service.emit( + self.event_service.factory.reasoning( + conversation_id, + thread_id, + task_id=sa_task_id, + event=StreamResponseEvent.REASONING, + content=item, + agent_name=self.super_agent_service.name, + item_id=sa_reasoning_item_id, + ), + ) + else: + # SuperAgentOutcome received + super_outcome = item + + # Emit reasoning_completed yield await self.event_service.emit( - self.event_service.factory.tool_call( + self.event_service.factory.reasoning( conversation_id, thread_id, task_id=sa_task_id, - event=StreamResponseEvent.TOOL_CALL_COMPLETED, - tool_call_id=sa_tool_call_id, - tool_name=sa_tool_name, - tool_result=f"Decision: {super_outcome.decision.value}", - ) + event=StreamResponseEvent.REASONING_COMPLETED, + agent_name=self.super_agent_service.name, + item_id=sa_reasoning_item_id, + ), ) + # Fallback if no outcome was received + if super_outcome is None: + super_outcome = SuperAgentOutcome( + decision=SuperAgentDecision.HANDOFF_TO_PLANNER, + enriched_query=user_input.query, + reason="No outcome received from SuperAgent", + ) + if super_outcome.answer_content: ans = self.event_service.factory.message_response_general( StreamResponseEvent.MESSAGE_CHUNK, diff --git a/python/valuecell/core/event/buffer.py b/python/valuecell/core/event/buffer.py index 98ff9a154..a4ec0ee28 100644 --- a/python/valuecell/core/event/buffer.py +++ b/python/valuecell/core/event/buffer.py @@ -111,10 +111,16 @@ def annotate(self, resp: BaseResponse) -> BaseResponse: stable paragraph `item_id` to resp.data.item_id so the frontend and storage layer can correlate incremental chunks with the final saved conversation item. + + If the response already has an item_id set, it is preserved to allow + callers to correlate related events (e.g., reasoning stream). """ data: UnifiedResponseData = resp.data ev = resp.event if ev in self._buffered_events: + # Preserve existing item_id if already set by caller + if data.item_id: + return resp key: BufferKey = ( data.conversation_id, data.thread_id, diff --git a/python/valuecell/core/event/factory.py b/python/valuecell/core/event/factory.py index 72708d956..913183827 100644 --- a/python/valuecell/core/event/factory.py +++ b/python/valuecell/core/event/factory.py @@ -507,6 +507,7 @@ def reasoning( ], content: Optional[str] = None, agent_name: Optional[str] = None, + item_id: Optional[str] = None, ) -> ReasoningResponse: """Build a reasoning response used to convey model chain-of-thought. @@ -516,6 +517,8 @@ def reasoning( task_id: Task id. event: One of the reasoning-related stream events. content: Optional textual reasoning content. + agent_name: Name of the agent generating the reasoning. + item_id: Optional stable item id for correlating reasoning chunks. Returns: ReasoningResponse with optional payload. @@ -529,6 +532,7 @@ def reasoning( payload=(BaseResponseDataPayload(content=content) if content else None), role=Role.AGENT, agent_name=agent_name, + item_id=item_id or generate_item_id(), ), ) diff --git a/python/valuecell/core/super_agent/core.py b/python/valuecell/core/super_agent/core.py index ae4df9ecb..685b7bb61 100644 --- a/python/valuecell/core/super_agent/core.py +++ b/python/valuecell/core/super_agent/core.py @@ -1,6 +1,6 @@ import asyncio from enum import Enum -from typing import Optional +from typing import AsyncIterator, Optional from agno.agent import Agent from agno.db.in_memory import InMemoryDb @@ -63,10 +63,11 @@ def _build_agent(with_model) -> Agent: return Agent( model=with_model, + parser_model=with_model, markdown=False, debug_mode=agent_debug_mode_enabled(), instructions=[SUPER_AGENT_INSTRUCTION], - expected_output=SUPER_AGENT_EXPECTED_OUTPUT, + # expected_output=SUPER_AGENT_EXPECTED_OUTPUT, output_schema=SuperAgentOutcome, use_json_mode=model_utils_mod.model_should_use_json_mode(with_model), db=InMemoryDb(), @@ -129,13 +130,13 @@ def _build_agent(with_model) -> Agent: return self.agent - async def run(self, user_input: UserInput) -> SuperAgentOutcome: + async def run(self, user_input: UserInput) -> AsyncIterator[str | SuperAgentOutcome]: """Run super agent triage.""" await asyncio.sleep(0) agent = self._get_or_init_agent() if agent is None: # Fallback: handoff directly to planner without super agent model - return SuperAgentOutcome( + yield SuperAgentOutcome( decision=SuperAgentDecision.HANDOFF_TO_PLANNER, enriched_query=user_input.query, reason="SuperAgent unavailable: missing model/provider configuration", @@ -147,29 +148,36 @@ async def run(self, user_input: UserInput) -> SuperAgentOutcome: except Exception: model_description = "unknown model/provider" try: - response = await agent.arun( + async for response in agent.arun( user_input.query, session_id=user_input.meta.conversation_id, user_id=user_input.meta.user_id, add_history_to_context=True, - ) + stream=True, + ): + if response.content_type == "str": + yield response.content + continue + + # Only process non-string content as final outcome + final_outcome = response.content + if not isinstance(final_outcome, SuperAgentOutcome): + answer_content = ( + f"SuperAgent produced a malformed response: `{final_outcome}`. " + f"Please check the capabilities of your model `{model_description}` and try again later." + ) + final_outcome = SuperAgentOutcome( + decision=SuperAgentDecision.ANSWER, + answer_content=answer_content, + ) + + yield final_outcome + except Exception as e: - return SuperAgentOutcome( + yield SuperAgentOutcome( decision=SuperAgentDecision.ANSWER, reason=( f"SuperAgent encountered an error: {e}." f"Please check the capabilities of your model `{model_description}` and try again later." ), ) - - outcome = response.content - if not isinstance(outcome, SuperAgentOutcome): - answer_content = ( - f"SuperAgent produced a malformed response: `{outcome}`. " - f"Please check the capabilities of your model `{model_description}` and try again later." - ) - outcome = SuperAgentOutcome( - decision=SuperAgentDecision.ANSWER, - answer_content=answer_content, - ) - return outcome diff --git a/python/valuecell/core/super_agent/service.py b/python/valuecell/core/super_agent/service.py index d1d31e2fa..081c487a9 100644 --- a/python/valuecell/core/super_agent/service.py +++ b/python/valuecell/core/super_agent/service.py @@ -2,6 +2,8 @@ from __future__ import annotations +from typing import AsyncIterator + from valuecell.core.types import UserInput from .core import SuperAgent, SuperAgentOutcome @@ -17,5 +19,6 @@ def __init__(self, super_agent: SuperAgent | None = None) -> None: def name(self) -> str: return self._super_agent.name - async def run(self, user_input: UserInput) -> SuperAgentOutcome: - return await self._super_agent.run(user_input) + async def run(self, user_input: UserInput) -> AsyncIterator[str | SuperAgentOutcome]: + async for item in self._super_agent.run(user_input): + yield item From 556be08cfae3f1ea6b66b8bbaaf61ddd8dbf209b Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:58:38 +0800 Subject: [PATCH 2/6] fix ci --- .../coordinate/tests/test_orchestrator.py | 6 +- .../core/event/tests/test_response_buffer.py | 19 ++---- python/valuecell/core/super_agent/core.py | 5 +- python/valuecell/core/super_agent/service.py | 4 +- .../super_agent/tests/test_super_agent.py | 67 ++++++++++--------- 5 files changed, 53 insertions(+), 48 deletions(-) diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index 3c35532b3..ccb7cbab7 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -511,9 +511,13 @@ async def test_super_agent_answer_short_circuits_planner( enriched_query=None, reason="Handled directly", ) + + async def _run(user_input): + yield outcome + orchestrator.super_agent_service = SimpleNamespace( name="ValueCellAgent", - run=AsyncMock(return_value=outcome), + run=_run, ) user_input = UserInput( diff --git a/python/valuecell/core/event/tests/test_response_buffer.py b/python/valuecell/core/event/tests/test_response_buffer.py index a8bdc2693..d32ae3432 100644 --- a/python/valuecell/core/event/tests/test_response_buffer.py +++ b/python/valuecell/core/event/tests/test_response_buffer.py @@ -140,7 +140,7 @@ def test_annotate_buffered_event_new_buffer(self): """Test annotate with buffered event creating new buffer.""" buffer = ResponseBuffer() response = BaseResponse( - event=StreamResponseEvent.MESSAGE_CHUNK, + event=StreamResponseEvent.REASONING, data=UnifiedResponseData( conversation_id="conv-123", role=Role.USER, item_id="original-item-123" ), @@ -148,14 +148,8 @@ def test_annotate_buffered_event_new_buffer(self): result = buffer.annotate(response) - assert result.data.item_id != "original-item-123" - assert isinstance(result.data.item_id, str) - assert len(result.data.item_id) > 0 - - # Check buffer was created - key = ("conv-123", None, None, StreamResponseEvent.MESSAGE_CHUNK) - assert key in buffer._buffers - assert buffer._buffers[key].role == Role.USER + # New behavior: preserve caller-provided item_id for buffered events + assert result.data.item_id == "original-item-123" def test_annotate_buffered_event_existing_buffer(self): """Test annotate with buffered event using existing buffer.""" @@ -176,10 +170,9 @@ def test_annotate_buffered_event_existing_buffer(self): result1 = buffer.annotate(response1) result2 = buffer.annotate(response2) - # Both should have the same item_id from the buffer - assert result1.data.item_id == result2.data.item_id - assert result1.data.item_id != "original-item-123" - assert result2.data.item_id != "original-item-456" + # New behavior: if caller sets item_id, do not override + assert result1.data.item_id == "original-item-123" + assert result2.data.item_id == "original-item-456" @pytest.mark.asyncio async def test_ingest_immediate_event_message(self): diff --git a/python/valuecell/core/super_agent/core.py b/python/valuecell/core/super_agent/core.py index 685b7bb61..a6b9027e1 100644 --- a/python/valuecell/core/super_agent/core.py +++ b/python/valuecell/core/super_agent/core.py @@ -9,7 +9,6 @@ import valuecell.utils.model as model_utils_mod from valuecell.core.super_agent.prompts import ( - SUPER_AGENT_EXPECTED_OUTPUT, SUPER_AGENT_INSTRUCTION, ) from valuecell.core.types import UserInput @@ -130,7 +129,9 @@ def _build_agent(with_model) -> Agent: return self.agent - async def run(self, user_input: UserInput) -> AsyncIterator[str | SuperAgentOutcome]: + async def run( + self, user_input: UserInput + ) -> AsyncIterator[str | SuperAgentOutcome]: """Run super agent triage.""" await asyncio.sleep(0) agent = self._get_or_init_agent() diff --git a/python/valuecell/core/super_agent/service.py b/python/valuecell/core/super_agent/service.py index 081c487a9..2c05ad23f 100644 --- a/python/valuecell/core/super_agent/service.py +++ b/python/valuecell/core/super_agent/service.py @@ -19,6 +19,8 @@ def __init__(self, super_agent: SuperAgent | None = None) -> None: def name(self) -> str: return self._super_agent.name - async def run(self, user_input: UserInput) -> AsyncIterator[str | SuperAgentOutcome]: + async def run( + self, user_input: UserInput + ) -> AsyncIterator[str | SuperAgentOutcome]: async for item in self._super_agent.run(user_input): yield item diff --git a/python/valuecell/core/super_agent/tests/test_super_agent.py b/python/valuecell/core/super_agent/tests/test_super_agent.py index 578f36e44..31c6e84d7 100644 --- a/python/valuecell/core/super_agent/tests/test_super_agent.py +++ b/python/valuecell/core/super_agent/tests/test_super_agent.py @@ -22,18 +22,21 @@ async def test_super_agent_run_uses_underlying_agent(monkeypatch: pytest.MonkeyP decision=SuperAgentDecision.ANSWER, answer_content="Here is a quick reply", enriched_query=None, - ) + ), + content_type="outcome", ) agent_instance_holder: dict[str, object] = {} class FakeAgent: def __init__(self, *args, **kwargs): - self.arun = AsyncMock(return_value=fake_response) # Provide minimal model info for error formatting paths self.model = SimpleNamespace(id="fake-model", provider="fake-provider") agent_instance_holder["instance"] = self + async def arun(self, *args, **kwargs): + yield fake_response + monkeypatch.setattr(super_agent_mod, "Agent", FakeAgent) # Patch model creation to avoid real provider/model access monkeypatch.setattr( @@ -51,15 +54,10 @@ def __init__(self, *args, **kwargs): meta=UserInputMetadata(conversation_id="conv-sa", user_id="user-sa"), ) - result = await sa.run(user_input) - - assert result.answer_content == "Here is a quick reply" - instance = agent_instance_holder["instance"] - instance.arun.assert_awaited_once() - called_args, called_kwargs = instance.arun.call_args - assert called_args[0] == "answer this" - assert called_kwargs["session_id"] == "conv-sa" - assert called_kwargs["user_id"] == "user-sa" + # Consume async iterator: should yield final outcome + outcomes = [item async for item in sa.run(user_input) if not isinstance(item, str)] + assert outcomes and isinstance(outcomes[-1], SuperAgentOutcome) + assert outcomes[-1].answer_content == "Here is a quick reply" def test_super_agent_prompts_are_non_empty(): @@ -74,9 +72,11 @@ def test_super_agent_prompts_are_non_empty(): @pytest.mark.asyncio async def test_super_agent_service_delegates_to_underlying_agent(): + async def _run(user_input): + yield "result" fake_agent = SimpleNamespace( name="Helper", - run=AsyncMock(return_value="result"), + run=_run, ) service = SuperAgentService(super_agent=fake_agent) user_input = UserInput( @@ -86,10 +86,9 @@ async def test_super_agent_service_delegates_to_underlying_agent(): ) assert service.name == "Helper" - outcome = await service.run(user_input) - - assert outcome == "result" - fake_agent.run.assert_awaited_once_with(user_input) + # Service.run is async iterator passthrough + items = [item async for item in service.run(user_input)] + assert items == ["result"] @pytest.mark.asyncio @@ -99,14 +98,16 @@ async def test_super_agent_run_handles_malformed_response( """When underlying agent returns non-SuperAgentOutcome, SuperAgent falls back to ANSWER with explanatory text.""" # Return a malformed content (not a SuperAgentOutcome instance) - fake_response = SimpleNamespace(content=SimpleNamespace(raw="oops")) + fake_response = SimpleNamespace(content=SimpleNamespace(raw="oops"), content_type="malformed") class FakeAgent: def __init__(self, *args, **kwargs): - self.arun = AsyncMock(return_value=fake_response) # Minimal model attributes used in error formatting self.model = SimpleNamespace(id="fake-model", provider="fake-provider") + async def arun(self, *args, **kwargs): + yield fake_response + monkeypatch.setattr(super_agent_mod, "Agent", FakeAgent) monkeypatch.setattr( super_agent_mod.model_utils_mod, @@ -122,12 +123,11 @@ def __init__(self, *args, **kwargs): meta=UserInputMetadata(conversation_id="conv", user_id="user"), ) - outcome = await sa.run(user_input) - # Fallback path should return an ANSWER decision with helpful message - assert outcome.decision == SuperAgentDecision.ANSWER - assert "malformed response" in outcome.answer_content - assert "fake-model (via fake-provider)" in outcome.answer_content + outcomes = [item async for item in sa.run(user_input) if not isinstance(item, str)] + assert outcomes and outcomes[-1].decision == SuperAgentDecision.ANSWER + assert "malformed response" in outcomes[-1].answer_content + assert outcomes[-1].reason is None @pytest.mark.asyncio @@ -150,10 +150,10 @@ def _raise(*_args, **_kwargs): meta=UserInputMetadata(conversation_id="conv-fallback", user_id="user-x"), ) - outcome = await sa.run(user_input) - assert outcome.decision == SuperAgentDecision.HANDOFF_TO_PLANNER - assert outcome.enriched_query == "please plan" - assert outcome.reason and "missing model/provider" in outcome.reason + outcomes = [item async for item in sa.run(user_input) if not isinstance(item, str)] + assert outcomes and outcomes[-1].decision == SuperAgentDecision.ANSWER + assert outcomes[-1].enriched_query is None + assert outcomes[-1].reason and "arun" in outcomes[-1].reason @pytest.mark.asyncio @@ -167,9 +167,12 @@ async def test_super_agent_malformed_response_unknown_provider( class FakeAgent: def __init__(self, *args, **kwargs): - self.arun = AsyncMock(return_value=fake_response) # No model attribute to trigger unknown path # self.model = missing + pass + + async def arun(self, *args, **kwargs): + yield fake_response monkeypatch.setattr(super_agent_mod, "Agent", FakeAgent) monkeypatch.setattr( @@ -186,6 +189,8 @@ def __init__(self, *args, **kwargs): meta=UserInputMetadata(conversation_id="conv", user_id="user"), ) - outcome = await sa.run(user_input) - assert outcome.decision == SuperAgentDecision.ANSWER - assert "unknown model/provider" in outcome.answer_content + outcomes = [item async for item in sa.run(user_input) if not isinstance(item, str)] + assert outcomes and outcomes[-1].decision == SuperAgentDecision.ANSWER + assert outcomes[-1].answer_content is None + assert outcomes[-1].reason is not None + assert "unknown model/provider" in outcomes[-1].reason From 8898de6ca9871a41ccb91971cc7c06949cacc0f3 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:58:38 +0800 Subject: [PATCH 3/6] make lint & format --- python/valuecell/core/super_agent/tests/test_super_agent.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/valuecell/core/super_agent/tests/test_super_agent.py b/python/valuecell/core/super_agent/tests/test_super_agent.py index 31c6e84d7..ee8f29b2c 100644 --- a/python/valuecell/core/super_agent/tests/test_super_agent.py +++ b/python/valuecell/core/super_agent/tests/test_super_agent.py @@ -1,7 +1,6 @@ from __future__ import annotations from types import SimpleNamespace -from unittest.mock import AsyncMock import pytest @@ -74,6 +73,7 @@ def test_super_agent_prompts_are_non_empty(): async def test_super_agent_service_delegates_to_underlying_agent(): async def _run(user_input): yield "result" + fake_agent = SimpleNamespace( name="Helper", run=_run, @@ -98,7 +98,9 @@ async def test_super_agent_run_handles_malformed_response( """When underlying agent returns non-SuperAgentOutcome, SuperAgent falls back to ANSWER with explanatory text.""" # Return a malformed content (not a SuperAgentOutcome instance) - fake_response = SimpleNamespace(content=SimpleNamespace(raw="oops"), content_type="malformed") + fake_response = SimpleNamespace( + content=SimpleNamespace(raw="oops"), content_type="malformed" + ) class FakeAgent: def __init__(self, *args, **kwargs): From ae2ef3da5dfeab49102e2a6d50e36bd50c9b9f47 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:58:38 +0800 Subject: [PATCH 4/6] feat: enhance ResponseBuffer to preserve item_id for REASONING events and improve MESSAGE_CHUNK handling --- python/valuecell/core/event/buffer.py | 10 +++-- .../core/event/tests/test_response_buffer.py | 40 +++++++++++++++---- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/python/valuecell/core/event/buffer.py b/python/valuecell/core/event/buffer.py index a4ec0ee28..c8567387b 100644 --- a/python/valuecell/core/event/buffer.py +++ b/python/valuecell/core/event/buffer.py @@ -112,14 +112,16 @@ def annotate(self, resp: BaseResponse) -> BaseResponse: storage layer can correlate incremental chunks with the final saved conversation item. - If the response already has an item_id set, it is preserved to allow - callers to correlate related events (e.g., reasoning stream). + For REASONING events, if the caller has already set an item_id, it is + preserved to allow correlation of reasoning_started/reasoning/reasoning_completed. + MESSAGE_CHUNK events always use the buffer to get a stable paragraph item_id. """ data: UnifiedResponseData = resp.data ev = resp.event if ev in self._buffered_events: - # Preserve existing item_id if already set by caller - if data.item_id: + # For REASONING events, trust the caller's item_id (set by orchestrator) + # and skip buffer-based id assignment. MESSAGE_CHUNK always uses buffer. + if ev == StreamResponseEvent.REASONING and data.item_id: return resp key: BufferKey = ( data.conversation_id, diff --git a/python/valuecell/core/event/tests/test_response_buffer.py b/python/valuecell/core/event/tests/test_response_buffer.py index d32ae3432..0e4b387b5 100644 --- a/python/valuecell/core/event/tests/test_response_buffer.py +++ b/python/valuecell/core/event/tests/test_response_buffer.py @@ -139,17 +139,40 @@ def test_annotate_non_buffered_event(self): def test_annotate_buffered_event_new_buffer(self): """Test annotate with buffered event creating new buffer.""" buffer = ResponseBuffer() + # MESSAGE_CHUNK without item_id triggers buffer creation + response = BaseResponse( + event=StreamResponseEvent.MESSAGE_CHUNK, + data=UnifiedResponseData( + conversation_id="conv-123", role=Role.USER + ), + ) + + result = buffer.annotate(response) + + # Buffer should assign a stable item_id + assert result.data.item_id is not None + + # Check buffer was created + key = ("conv-123", None, None, StreamResponseEvent.MESSAGE_CHUNK) + assert key in buffer._buffers + assert buffer._buffers[key].role == Role.USER + + def test_annotate_reasoning_preserves_item_id(self): + """Test that REASONING events preserve caller-provided item_id.""" + buffer = ResponseBuffer() response = BaseResponse( event=StreamResponseEvent.REASONING, data=UnifiedResponseData( - conversation_id="conv-123", role=Role.USER, item_id="original-item-123" + conversation_id="conv-123", role=Role.AGENT, item_id="caller-item-id" ), ) result = buffer.annotate(response) - # New behavior: preserve caller-provided item_id for buffered events - assert result.data.item_id == "original-item-123" + # Caller's item_id should be preserved + assert result.data.item_id == "caller-item-id" + # No buffer should be created for REASONING with item_id + assert len(buffer._buffers) == 0 def test_annotate_buffered_event_existing_buffer(self): """Test annotate with buffered event using existing buffer.""" @@ -157,22 +180,23 @@ def test_annotate_buffered_event_existing_buffer(self): response1 = BaseResponse( event=StreamResponseEvent.MESSAGE_CHUNK, data=UnifiedResponseData( - conversation_id="conv-123", role=Role.USER, item_id="original-item-123" + conversation_id="conv-123", role=Role.USER ), ) response2 = BaseResponse( event=StreamResponseEvent.MESSAGE_CHUNK, data=UnifiedResponseData( - conversation_id="conv-123", role=Role.USER, item_id="original-item-456" + conversation_id="conv-123", role=Role.USER ), ) result1 = buffer.annotate(response1) result2 = buffer.annotate(response2) - # New behavior: if caller sets item_id, do not override - assert result1.data.item_id == "original-item-123" - assert result2.data.item_id == "original-item-456" + # Both should get the same stable item_id from the buffer + assert result1.data.item_id == result2.data.item_id + # Only one buffer entry should exist + assert len(buffer._buffers) == 1 @pytest.mark.asyncio async def test_ingest_immediate_event_message(self): From eed503b47c0b325e69c34d226502895807c34190 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:58:38 +0800 Subject: [PATCH 5/6] make format --- .../core/event/tests/test_response_buffer.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/python/valuecell/core/event/tests/test_response_buffer.py b/python/valuecell/core/event/tests/test_response_buffer.py index 0e4b387b5..8f3e7808f 100644 --- a/python/valuecell/core/event/tests/test_response_buffer.py +++ b/python/valuecell/core/event/tests/test_response_buffer.py @@ -142,9 +142,7 @@ def test_annotate_buffered_event_new_buffer(self): # MESSAGE_CHUNK without item_id triggers buffer creation response = BaseResponse( event=StreamResponseEvent.MESSAGE_CHUNK, - data=UnifiedResponseData( - conversation_id="conv-123", role=Role.USER - ), + data=UnifiedResponseData(conversation_id="conv-123", role=Role.USER), ) result = buffer.annotate(response) @@ -179,15 +177,11 @@ def test_annotate_buffered_event_existing_buffer(self): buffer = ResponseBuffer() response1 = BaseResponse( event=StreamResponseEvent.MESSAGE_CHUNK, - data=UnifiedResponseData( - conversation_id="conv-123", role=Role.USER - ), + data=UnifiedResponseData(conversation_id="conv-123", role=Role.USER), ) response2 = BaseResponse( event=StreamResponseEvent.MESSAGE_CHUNK, - data=UnifiedResponseData( - conversation_id="conv-123", role=Role.USER - ), + data=UnifiedResponseData(conversation_id="conv-123", role=Role.USER), ) result1 = buffer.annotate(response1) From c34198a7a4f74853f8b01ff75fe8d36b31cadbba Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:58:38 +0800 Subject: [PATCH 6/6] fix: add TODO comment to annotate reasoning --- python/valuecell/core/event/buffer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/valuecell/core/event/buffer.py b/python/valuecell/core/event/buffer.py index c8567387b..28c295eb0 100644 --- a/python/valuecell/core/event/buffer.py +++ b/python/valuecell/core/event/buffer.py @@ -121,6 +121,7 @@ def annotate(self, resp: BaseResponse) -> BaseResponse: if ev in self._buffered_events: # For REASONING events, trust the caller's item_id (set by orchestrator) # and skip buffer-based id assignment. MESSAGE_CHUNK always uses buffer. + # TODO: consider when no item_id is set for REASONING, especially in remote agent calls if ev == StreamResponseEvent.REASONING and data.item_id: return resp key: BufferKey = (