Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions python/valuecell/agents/hello_world.py

This file was deleted.

18 changes: 11 additions & 7 deletions python/valuecell/agents/sec_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,16 @@ async def _process_fund_holdings_query(
return
logger.info(f"Retrieved {len(filings)} 13F filings")

# Parse filing data
current_filing = filings.iloc[0].obj()
current_data = current_filing.infotable.to_json()
# %%
o = filings[1].obj()
current_filing = o.infotable.to_json()

# %%
o = filings[2].obj()
previous_filing = o.infotable.to_json()



previous_filing = filings.iloc[1].obj()
previous_data = previous_filing.infotable.to_json()

logger.info("Successfully parsed current and historical holdings data")

Expand All @@ -262,10 +266,10 @@ async def _process_fund_holdings_query(
As a professional investment analyst, please conduct an in-depth analysis of the following 13F holdings data:

## Historical holdings data (earlier period):
{previous_data}
{previous_filing}

## Current holdings data (latest period):
{current_data}
{current_filing}

## Analysis requirements:
Please provide professional analysis from the following perspectives:
Expand Down
4 changes: 2 additions & 2 deletions python/valuecell/agents/sec_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ async def main():
# Get client and send message
client = await connections.get_client("SecAgent")
async for task, event in await client.send_message(
"伯克希尔最近持仓变化", streaming=True
"Recent Berkshire Hathaway holdings changes", streaming=True
):
print(f"接收到Task: {task}")
print(f"Received Task: {task}")

print(f"Calculation result: {task.status}")

Expand Down
25 changes: 0 additions & 25 deletions python/valuecell/agents/tests/test_import.py

This file was deleted.

4 changes: 2 additions & 2 deletions python/valuecell/core/coordinate/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from pydantic import BaseModel, Field

from valuecell.core.task import Task
Expand All @@ -8,7 +8,7 @@ class ExecutionPlan(BaseModel):
"""Execution plan containing multiple tasks"""

plan_id: str = Field(..., description="Unique plan identifier")
session_id: str = Field(..., description="Session ID this plan belongs to")
session_id: Optional[str] = Field(..., description="Session ID this plan belongs to")
user_id: str = Field(..., description="User ID who requested this plan")
query: str = Field(..., description="Original user input")
tasks: List[Task] = Field(default_factory=list, description="Tasks to execute")
Expand Down
4 changes: 2 additions & 2 deletions python/valuecell/examples/asset_adapter_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def demonstrate_asset_search():
results_zh = search_assets("00700.HK", language="zh-Hans", limit=5)

if results_zh["success"]:
logger.info(f"找到 {results_zh['count']} 个结果:")
logger.info(f"Found {results_zh['count']} results:")
for result in results_zh["results"]:
logger.info(
f" - {result['ticker']}: {result['display_name']} "
Expand Down Expand Up @@ -192,7 +192,7 @@ def demonstrate_asset_info():
info_zh = get_asset_info(ticker, language="zh-Hans")
if info_zh["success"]:
logger.info(
f" 中文: {info_zh['display_name']} ({info_zh['asset_type_display']})"
f" Chinese: {info_zh['display_name']} ({info_zh['asset_type_display']})"
)


Expand Down
3 changes: 3 additions & 0 deletions python/valuecell/server/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .routers.system import create_system_router
from .routers.websocket import create_websocket_router
from .routers.watchlist import create_watchlist_router
from .routers.agent_stream import create_agent_stream_router
from .schemas import SuccessResponse, AppInfoData
from ...adapters.assets import get_adapter_manager

Expand Down Expand Up @@ -130,6 +131,8 @@ async def root():
app.include_router(create_websocket_router())
# Include watchlist router
app.include_router(create_watchlist_router())
# Include agent stream router
app.include_router(create_agent_stream_router(), prefix="/api/v1")


# For uvicorn
Expand Down
54 changes: 54 additions & 0 deletions python/valuecell/server/api/routers/agent_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Agent stream router for handling streaming agent queries.
"""

from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from valuecell.server.api.schemas.agent_stream import AgentStreamRequest
from valuecell.server.services.agent_stream_service import AgentStreamService
import json


def create_agent_stream_router() -> APIRouter:
"""Create and configure the agent stream router."""

router = APIRouter(prefix="/agents", tags=["Agent Stream"])
agent_service = AgentStreamService()

@router.post("/stream")
async def stream_query_agent(request: AgentStreamRequest):
"""
Stream agent query responses in real-time.

This endpoint accepts a user query and returns a streaming response
with agent-generated content in Server-Sent Events (SSE) format.
"""
try:

async def generate_stream():
"""Generate SSE formatted stream chunks."""
async for chunk in agent_service.stream_query_agent(
query=request.query, agent_name=request.agent_name
):
# Format as SSE (Server-Sent Events)
data = json.dumps({"content": chunk, "is_final": False})
yield f"data: {data}\n\n"

# Send final chunk
final_data = json.dumps({"content": "", "is_final": True})
yield f"data: {final_data}\n\n"

return StreamingResponse(
generate_stream(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
},
)

except Exception as e:
raise HTTPException(status_code=500, detail=f"Agent query failed: {str(e)}")

return router
32 changes: 32 additions & 0 deletions python/valuecell/server/api/schemas/agent_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Agent stream API schemas for handling streaming agent queries.
"""

from pydantic import BaseModel, Field


class AgentStreamRequest(BaseModel):
"""Request model for agent streaming queries."""

query: str = Field(..., description="User query to send to the agent")
agent_name: str = Field(None, description="Specific agent name to use for the query")

class Config:
json_schema_extra = {
"example": {
"query": "What is the current market trend?",
"agent_name": "MarketAnalystAgent",
}
}


class StreamChunk(BaseModel):
"""Response chunk model for streaming data."""

content: str = Field(..., description="Content chunk from the agent response")
is_final: bool = Field(False, description="Whether this is the final chunk")

class Config:
json_schema_extra = {
"example": {"content": "The current market shows...", "is_final": False}
}
5 changes: 2 additions & 3 deletions python/valuecell/server/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Main entry point for ValueCell Server Backend."""

import uvicorn
from .api.app import create_app
from .config.settings import get_settings
from valuecell.server.api.app import create_app
from valuecell.server.config.settings import get_settings

# Create app instance for uvicorn
app = create_app()
Expand All @@ -16,7 +16,6 @@ def main():
app,
host=settings.API_HOST,
port=settings.API_PORT,
# reload=settings.API_DEBUG,
)


Expand Down
61 changes: 61 additions & 0 deletions python/valuecell/server/services/agent_stream_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Agent stream service for handling streaming agent interactions.
"""

from typing import AsyncGenerator, Optional
from valuecell.core.coordinate.orchestrator import get_default_orchestrator
from valuecell.core.coordinate.tests.test_orchestrator import session_id

Check failure on line 7 in python/valuecell/server/services/agent_stream_service.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F401)

python/valuecell/server/services/agent_stream_service.py:7:63: F401 `valuecell.core.coordinate.tests.test_orchestrator.session_id` imported but unused
from valuecell.core.types import UserInput, UserInputMetadata
import logging

logger = logging.getLogger(__name__)


class AgentStreamService:
"""Service for handling streaming agent queries."""

def __init__(self):
"""Initialize the agent stream service."""
self.orchestrator = get_default_orchestrator()
logger.info("Agent stream service initialized")

async def stream_query_agent(
self, query: str, agent_name: Optional[str] = None
) -> AsyncGenerator[str, None]:
"""
Stream agent responses for a given query.

Args:
query: User query to process
agent_name: Optional specific agent name to use. If provided, takes precedence over query parsing.

Yields:
str: Content chunks from the agent response
"""
try:
logger.info(f"Processing streaming query: {query[:100]}...")

user_id = "default_user"
desired_agent_name = agent_name
session_id = agent_name + "_session_" + user_id

user_input_meta = UserInputMetadata(user_id=user_id, session_id=session_id)

user_input = UserInput(
query=query, desired_agent_name=desired_agent_name, meta=user_input_meta
)

# Use the orchestrator's process_user_input method for streaming
async for response_chunk in self.orchestrator.process_user_input(
user_input
):
if (
response_chunk
and response_chunk.content
and response_chunk.content.strip()
):
yield response_chunk.content

except Exception as e:
logger.error(f"Error in stream_query_agent: {str(e)}")
yield f"Error processing query: {str(e)}"
Loading