Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ AGENT_DEBUG_MODE=false
SEC_EMAIL=your.name@example.com

# Model IDs for OpenRouter
PLANNER_MODEL_ID=openai/gpt-4o-mini
SEC_PARSER_MODEL_ID=openai/gpt-4o-mini
SEC_ANALYSIS_MODEL_ID=deepseek/deepseek-chat-v3-0324
AI_HEDGE_FUND_PARSER_MODEL_ID=openai/gpt-4o-mini
Expand Down
2 changes: 1 addition & 1 deletion python/configs/agent_cards/hedge_fund_agent.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "AIHedgeFundAgent",
"url": "http://localhost:10001/",
"description": "AI Hedge Fund Agent provides multi-strategy analysis and investment insights. It can act like various famous analysts and specialists (valuation, technical, sentiment) to analyze stocks, portfolios, and market trends. Use its skills to get tailored perspectives ranging from deep fundamental valuation to technical trading signals.",
"description": "AI Hedge Fund Agent provides multi-strategy analysis and investment insights. It can act like various famous analysts and specialists (valuation, technical, sentiment) to analyze stocks, portfolios, and market trends. **CAUTION**: Only stock symbols in {AAPL, GOOGL, MSFT, NVDA, TSLA} is/are supported and the agent will refuse to answer for other symbols. The input should contain one or more stock symbol(s) from this list. ",
"skills": [
{
"id": "aswath_damodaran_agent",
Expand Down
19 changes: 7 additions & 12 deletions python/third_party/ai-hedge-fund/adapter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import logging
import os
from datetime import datetime
from typing import List
from typing import AsyncGenerator, List

from agno.agent import Agent
from agno.models.openrouter import OpenRouter
from dateutil.relativedelta import relativedelta
from langchain_core.messages import HumanMessage
from pydantic import BaseModel, Field, field_validator
from valuecell.core.agent.decorator import create_wrapped_agent
from valuecell.core.types import BaseAgent
from valuecell.core import BaseAgent, StreamResponse, streaming

from src.main import create_workflow
from src.utils.analysts import ANALYST_ORDER
Expand Down Expand Up @@ -69,7 +69,9 @@ def __init__(self):
markdown=True,
)

async def stream(self, query, session_id, task_id):
async def stream(
self, query, session_id, task_id
) -> AsyncGenerator[StreamResponse, None]:
logger.info(
f"Parsing query: {query}. Task ID: {task_id}, Session ID: {session_id}"
)
Expand Down Expand Up @@ -123,15 +125,8 @@ async def stream(self, query, session_id, task_id):
):
if not isinstance(chunk, str):
continue
yield {
"content": chunk,
"is_task_complete": False,
}

yield {
"content": "",
"is_task_complete": True,
}
yield streaming.message_chunk(chunk)
yield streaming.done()


def run_hedge_fund_stream(
Expand Down
9 changes: 9 additions & 0 deletions python/valuecell/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
RemoteAgentResponse,
)

from .agent.decorator import serve, create_wrapped_agent
from .agent.responses import streaming, notification

__all__ = [
# Session exports
"Message",
Expand All @@ -54,4 +57,10 @@
"BaseAgent",
"StreamResponse",
"RemoteAgentResponse",
# Agent utilities
"serve",
"create_wrapped_agent",
# Response utilities
"streaming",
"notification",
]
7 changes: 0 additions & 7 deletions python/valuecell/core/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,10 @@
from .decorator import serve
from .registry import AgentRegistry

# Import types from the unified types module
from ..types import BaseAgent, RemoteAgentResponse, StreamResponse


__all__ = [
# Core agent exports
"AgentClient",
"RemoteConnections",
"serve",
"AgentRegistry",
"BaseAgent",
"RemoteAgentResponse",
"StreamResponse",
]
4 changes: 2 additions & 2 deletions python/valuecell/core/agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def _setup_client(self):
async def send_message(
self,
query: str,
context_id: str = None,
session_id: str = None,
metadata: dict = None,
streaming: bool = False,
) -> AsyncIterator[RemoteAgentResponse]:
Expand All @@ -65,7 +65,7 @@ async def send_message(
role=Role.user,
parts=[Part(root=TextPart(text=query))],
message_id=generate_uuid("msg"),
context_id=context_id or generate_uuid("ctx"),
context_id=session_id or generate_uuid("ctx"),
metadata=metadata if metadata else None,
)

Expand Down
110 changes: 87 additions & 23 deletions python/valuecell/core/agent/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
from a2a.utils import new_agent_text_message, new_task
from a2a.utils.errors import ServerError
from valuecell.core.agent import registry
from valuecell.core.types import BaseAgent
from valuecell.core.types import (
BaseAgent,
NotifyResponse,
NotifyResponseEvent,
StreamResponse,
StreamResponseEvent,
)
from valuecell.utils import (
get_agent_card_path,
get_next_available_port,
Expand Down Expand Up @@ -153,55 +159,90 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
# Prepare query and ensure a task exists in the system
query = context.get_user_input()
task = context.current_task
metadata = context.metadata
task_meta = context.metadata
agent_name = self.agent.__class__.__name__
if not task:
message = context.message
task = new_task(message)
task.metadata = metadata
task.metadata = task_meta
await event_queue.enqueue_event(task)

# Helper state
updater = TaskUpdater(event_queue, task.id, task.context_id)
artifact_id = f"{self.agent.__class__.__name__}-artifact"
chunk_idx = 0
task_id = task.id
session_id = task.context_id
updater = TaskUpdater(event_queue, task_id, session_id)
artifact_id = f"artifact-{agent_name}-{session_id}-{task_id}"
chunk_idx = -1

# Local helper to add a chunk
async def _add_chunk(content: str, last: bool = False):
async def _add_chunk(
response: StreamResponse | NotifyResponse, is_complete: bool
):
nonlocal chunk_idx
parts = [Part(root=TextPart(text=content))]

chunk_idx += 1
if not response.content:
return

response_event = response.event
parts = [Part(root=TextPart(text=response.content))]
metadata = {"response_event": response_event.value}
await updater.add_artifact(
parts=parts,
artifact_id=artifact_id,
append=chunk_idx > 0,
last_chunk=last,
last_chunk=is_complete,
metadata=metadata,
)
if not last:
chunk_idx += 1

# Stream from the user agent and update task incrementally
await updater.update_status(TaskState.working)
await updater.update_status(
TaskState.working,
message=new_agent_text_message(
f"Task received by {agent_name}", session_id, task_id
),
)
try:
query_handler = (
self.agent.notify if metadata.get("notify") else self.agent.stream
self.agent.notify if task_meta.get("notify") else self.agent.stream
)
async for item in query_handler(query, task.context_id, task.id):
content = item.get("content", "")
is_complete = item.get("is_task_complete", True)

await _add_chunk(content, last=is_complete)

async for response in query_handler(query, session_id, task_id):
if not isinstance(response, (StreamResponse, NotifyResponse)):
raise ValueError(
f"Agent {agent_name} yielded invalid response type: {type(response)}"
)

response_event = response.event
if is_task_failed(response_event):
raise RuntimeError(
f"Agent {agent_name} reported failure: {response.content}"
)

is_complete = is_task_completed(response_event)
if is_tool_call(response_event):
await updater.update_status(
TaskState.working,
message=message,
metadata={
"event": response_event.value,
"tool_call_id": response.metadata.get("tool_call_id"),
"tool_name": response.metadata.get("tool_name"),
"tool_result": response.metadata.get("content"),
},
)
continue

await _add_chunk(response, is_complete=is_complete)
if is_complete:
await updater.complete()
break

except Exception as e:
message = (
f"Error during {self.agent.__class__.__name__} agent execution: {e}"
)
message = f"Error during {agent_name} agent execution: {e}"
logger.error(message)
await updater.update_status(
TaskState.failed,
message=new_agent_text_message(message, task.context_id, task.id),
message=new_agent_text_message(message, session_id, task_id),
final=True,
)

Expand All @@ -210,6 +251,29 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None
raise ServerError(error=UnsupportedOperationError())


def is_task_completed(response_type: str) -> bool:
return response_type in {
StreamResponseEvent.TASK_DONE,
StreamResponseEvent.TASK_FAILED,
NotifyResponseEvent.TASK_DONE,
NotifyResponseEvent.TASK_FAILED,
}


def is_task_failed(response_type: str) -> bool:
return response_type in {
StreamResponseEvent.TASK_FAILED,
NotifyResponseEvent.TASK_FAILED,
}


def is_tool_call(response_type: str) -> bool:
return response_type in {
StreamResponseEvent.TOOL_CALL_STARTED,
StreamResponseEvent.TOOL_CALL_COMPLETED,
}


def _create_agent_executor(agent_instance):
return GenericAgentExecutor(agent_instance)

Expand Down
105 changes: 105 additions & 0 deletions python/valuecell/core/agent/responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""User-facing response constructors under valuecell.core.agent.

Prefer importing from here if you're already working inside the core.agent
namespace. For a stable top-level import, you can also use
`valuecell.responses` which provides the same API.

Example:
from valuecell.core.agent.responses import stream, notify
# Or explicit aliases for clarity:
from valuecell.core.agent.responses import streaming, notification

yield stream.message_chunk("Thinking…")
yield stream.reasoning("Plan: 1) fetch 2) analyze")
yield stream.tool_call_start("call_1", "search")
yield stream.tool_call_result('{"items": 12}', "call_1", "search")
yield stream.done()

send(notify.message("Task submitted"))
send(notify.done("OK"))
"""

from __future__ import annotations

from typing import Optional

from valuecell.core.types import (
NotifyResponse,
NotifyResponseEvent,
StreamResponse,
StreamResponseEvent,
ToolCallContent,
)


class _StreamResponseNamespace:
"""Factory methods for streaming responses."""

def message_chunk(self, content: str) -> StreamResponse:
return StreamResponse(event=StreamResponseEvent.MESSAGE_CHUNK, content=content)

def tool_call_started(self, tool_call_id: str, tool_name: str) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.TOOL_CALL_STARTED,
metadata=ToolCallContent(
tool_call_id=tool_call_id, tool_name=tool_name
).model_dump(),
)

def tool_call_completed(
self, tool_result: str, tool_call_id: str, tool_name: str
) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.TOOL_CALL_COMPLETED,
metadata=ToolCallContent(
tool_call_id=tool_call_id, tool_name=tool_name, tool_result=tool_result
).model_dump(),
)

def reasoning(self, content: str) -> StreamResponse:
return StreamResponse(
event=StreamResponseEvent.REASONING,
content=content,
)

def done(self, content: Optional[str] = None) -> StreamResponse:
return StreamResponse(
content=content,
event=StreamResponseEvent.TASK_DONE,
)

def failed(self, content: Optional[str] = None) -> StreamResponse:
return StreamResponse(
content=content,
event=StreamResponseEvent.TASK_FAILED,
)


streaming = _StreamResponseNamespace()


class _NotifyResponseNamespace:
"""Factory methods for notify responses."""

def message(self, content: str) -> NotifyResponse:
return NotifyResponse(
content=content,
event=NotifyResponseEvent.MESSAGE,
)

def done(self, content: Optional[str] = None) -> NotifyResponse:
return NotifyResponse(
content=content,
event=NotifyResponseEvent.TASK_DONE,
)


notification = _NotifyResponseNamespace()


__all__ = [
"streaming",
"notification",
"StreamResponse",
"NotifyResponse",
]
4 changes: 3 additions & 1 deletion python/valuecell/core/coordinate/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ class ExecutionPlan(BaseModel):
..., description="Session ID this plan belongs to"
)
user_id: str = Field(..., description="User ID who requested this plan")
query: str = Field(..., description="Original user query that generated this plan")
orig_query: str = Field(
..., description="Original user query that generated this plan"
)
tasks: List[Task] = Field(default_factory=list, description="Tasks to execute")
created_at: str = Field(..., description="Plan creation timestamp")

Expand Down
Loading