Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 23 additions & 30 deletions python/valuecell/core/agent/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from valuecell.core.types import (
BaseAgent,
NotifyResponse,
NotifyResponseEvent,
StreamResponse,
StreamResponseEvent,
)
Expand All @@ -39,6 +38,7 @@
get_next_available_port,
parse_host_port,
)
from .responses import EventPredicates

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -184,9 +184,14 @@ async def _add_chunk(
if not response.content:
return

response_event = response.event
parts = [Part(root=TextPart(text=response.content))]
metadata = {"response_event": response_event.value}
response_event = response.event
metadata = {
"response_event": response_event.value,
"subtask_id": response.subtask_id,
}
if response_event == StreamResponseEvent.COMPONENT_GENERATOR:
metadata["component_type"] = response.metadata.get("component_type")
await updater.add_artifact(
parts=parts,
artifact_id=artifact_id,
Expand All @@ -213,21 +218,32 @@ async def _add_chunk(
)

response_event = response.event
if is_task_failed(response_event):
if EventPredicates.is_task_failed(response_event):
raise RuntimeError(
f"Agent {agent_name} reported failure: {response.content}"
)

is_complete = is_task_completed(response_event)
if is_tool_call(response_event):
is_complete = EventPredicates.is_task_completed(response_event)
if EventPredicates.is_tool_call(response_event):
await updater.update_status(
TaskState.working,
message=message,
message=new_agent_text_message(response.content or ""),
metadata={
"event": response_event.value,
"tool_call_id": response.metadata.get("tool_call_id"),
"tool_name": response.metadata.get("tool_name"),
"tool_result": response.metadata.get("content"),
"subtask_id": response.subtask_id,
},
)
continue
if EventPredicates.is_reasoning(response_event):
await updater.update_status(
TaskState.working,
message=new_agent_text_message(response.content or ""),
metadata={
"event": response_event.value,
"subtask_id": response.subtask_id,
},
)
continue
Expand All @@ -251,29 +267,6 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None
raise ServerError(error=UnsupportedOperationError())


def is_task_completed(response_type: str) -> bool:
return response_type in {
StreamResponseEvent.TASK_DONE,
StreamResponseEvent.TASK_FAILED,
NotifyResponseEvent.TASK_DONE,
NotifyResponseEvent.TASK_FAILED,
}


def is_task_failed(response_type: str) -> bool:
return response_type in {
StreamResponseEvent.TASK_FAILED,
NotifyResponseEvent.TASK_FAILED,
}


def is_tool_call(response_type: str) -> bool:
return response_type in {
StreamResponseEvent.TOOL_CALL_STARTED,
StreamResponseEvent.TOOL_CALL_COMPLETED,
}


def _create_agent_executor(agent_instance):
return GenericAgentExecutor(agent_instance)

Expand Down
129 changes: 92 additions & 37 deletions python/valuecell/core/agent/responses.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,3 @@
"""User-facing response constructors under valuecell.core.agent.

Prefer importing from here if you're already working inside the core.agent
namespace. For a stable top-level import, you can also use
`valuecell.responses` which provides the same API.

Example:
from valuecell.core.agent.responses import stream, notify
# Or explicit aliases for clarity:
from valuecell.core.agent.responses import streaming, notification

yield stream.message_chunk("Thinking…")
yield stream.reasoning("Plan: 1) fetch 2) analyze")
yield stream.tool_call_start("call_1", "search")
yield stream.tool_call_result('{"items": 12}', "call_1", "search")
yield stream.done()

send(notify.message("Task submitted"))
send(notify.done("OK"))
"""

from __future__ import annotations

from typing import Optional
Expand All @@ -28,50 +7,92 @@
NotifyResponseEvent,
StreamResponse,
StreamResponseEvent,
ToolCallContent,
SystemResponseEvent,
ToolCallPayload,
_TaskResponseEvent,
)


class _StreamResponseNamespace:
"""Factory methods for streaming responses."""

def message_chunk(self, content: str) -> StreamResponse:
return StreamResponse(event=StreamResponseEvent.MESSAGE_CHUNK, content=content)
def message_chunk(
self, content: str, subtask_id: str | None = None
) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.MESSAGE_CHUNK,
content=content,
subtask_id=subtask_id,
)

def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse:
def tool_call_started(
self, tool_call_id: str, tool_name: str, subtask_id: str | None = None
) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.TOOL_CALL_STARTED,
metadata=ToolCallContent(
tool_call_id=tool_call_id, tool_name=tool_name
metadata=ToolCallPayload(
tool_call_id=tool_call_id,
tool_name=tool_name,
).model_dump(),
subtask_id=subtask_id,
)

def tool_call_completed(
self, tool_result: str, tool_call_id: str, tool_name: str
self,
tool_result: str,
tool_call_id: str,
tool_name: str,
subtask_id: str | None = None,
) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
metadata=ToolCallContent(
tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result
metadata=ToolCallPayload(
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_result=tool_result,
).model_dump(),
subtask_id=subtask_id,
)

def reasoning(self, content: str) -> StreamResponse:
def reasoning_started(self, subtask_id: str | None = None) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.REASONING_STARTED,
subtask_id=subtask_id,
)

def reasoning(self, content: str, subtask_id: str | None = None) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.REASONING,
content=content,
subtask_id=subtask_id,
)

def reasoning_completed(self, subtask_id: str | None = None) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.REASONING_COMPLETED,
subtask_id=subtask_id,
)

def component_generator(
self, content: str, component_type: str, subtask_id: str | None = None
) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.COMPONENT_GENERATOR,
content=content,
metadata={"component_type": component_type},
subtask_id=subtask_id,
)

def done(self, content: Optional[str] = None) -> StreamResponse:
return StreamResponse(
content=content,
event=StreamResponseEvent.TASK_DONE,
event=_TaskResponseEvent.TASK_COMPLETED,
)

def failed(self, content: Optional[str] = None) -> StreamResponse:
return StreamResponse(
content=content,
event=StreamResponseEvent.TASK_FAILED,
event=SystemResponseEvent.TASK_FAILED,
)


Expand All @@ -90,22 +111,56 @@ def message(self, content: str) -> NotifyResponse:
def done(self, content: Optional[str] = None) -> NotifyResponse:
return NotifyResponse(
content=content,
event=NotifyResponseEvent.TASK_DONE,
event=_TaskResponseEvent.TASK_COMPLETED,
)

def failed(self, content: Optional[str] = None) -> NotifyResponse:
return NotifyResponse(
content=content,
event=NotifyResponse.TASK_FAILED,
event=SystemResponseEvent.TASK_FAILED,
)


notification = _NotifyResponseNamespace()


class EventPredicates:
"""Utilities to classify response event types.

These mirror the helper predicates previously defined in decorator.py
and centralize them next to response event definitions.
"""

@staticmethod
def is_task_completed(response_type) -> bool:
return response_type in {
_TaskResponseEvent.TASK_COMPLETED,
}

@staticmethod
def is_task_failed(response_type) -> bool:
return response_type in {
SystemResponseEvent.TASK_FAILED,
}

@staticmethod
def is_tool_call(response_type) -> bool:
return response_type in {
StreamResponseEvent.TOOL_CALL_STARTED,
StreamResponseEvent.TOOL_CALL_COMPLETED,
}

@staticmethod
def is_reasoning(response_type) -> bool:
return response_type in {
StreamResponseEvent.REASONING_STARTED,
StreamResponseEvent.REASONING,
StreamResponseEvent.REASONING_COMPLETED,
}


__all__ = [
"streaming",
"notification",
"StreamResponse",
"NotifyResponse",
"EventPredicates",
]
Loading