From 37abe950188700ed4d7d42e3319eacaee70b97e8 Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Thu, 18 Sep 2025 18:05:20 +0800 Subject: [PATCH 1/3] refactor: remove Chinese text from code and make agent_name optional --- python/valuecell/server/api/app.py | 3 + .../server/api/routers/agent_stream.py | 54 +++++++++++++++++ .../server/api/schemas/agent_stream.py | 32 ++++++++++ .../server/services/agent_stream_service.py | 59 +++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 python/valuecell/server/api/routers/agent_stream.py create mode 100644 python/valuecell/server/api/schemas/agent_stream.py create mode 100644 python/valuecell/server/services/agent_stream_service.py diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index 525d9f1a1..a55a7f08e 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -16,6 +16,7 @@ from .routers.system import create_system_router from .routers.websocket import create_websocket_router from .routers.watchlist import create_watchlist_router +from .routers.agent_stream import create_agent_stream_router from .schemas import SuccessResponse, AppInfoData from ...adapters.assets import get_adapter_manager @@ -130,6 +131,8 @@ async def root(): app.include_router(create_websocket_router()) # Include watchlist router app.include_router(create_watchlist_router()) + # Include agent stream router + app.include_router(create_agent_stream_router(), prefix="/api/v1") # For uvicorn diff --git a/python/valuecell/server/api/routers/agent_stream.py b/python/valuecell/server/api/routers/agent_stream.py new file mode 100644 index 000000000..52210afad --- /dev/null +++ b/python/valuecell/server/api/routers/agent_stream.py @@ -0,0 +1,54 @@ +""" +Agent stream router for handling streaming agent queries. +""" + +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse +from valuecell.server.api.schemas.agent_stream import AgentStreamRequest +from valuecell.server.services.agent_stream_service import AgentStreamService +import json + + +def create_agent_stream_router() -> APIRouter: + """Create and configure the agent stream router.""" + + router = APIRouter(prefix="/agents", tags=["Agent Stream"]) + agent_service = AgentStreamService() + + @router.post("/stream") + async def stream_query_agent(request: AgentStreamRequest): + """ + Stream agent query responses in real-time. + + This endpoint accepts a user query and returns a streaming response + with agent-generated content in Server-Sent Events (SSE) format. + """ + try: + + async def generate_stream(): + """Generate SSE formatted stream chunks.""" + async for chunk in agent_service.stream_query_agent( + query=request.query, agent_name=request.agent_name + ): + # Format as SSE (Server-Sent Events) + data = json.dumps({"content": chunk, "is_final": False}) + yield f"data: {data}\n\n" + + # Send final chunk + final_data = json.dumps({"content": "", "is_final": True}) + yield f"data: {final_data}\n\n" + + return StreamingResponse( + generate_stream(), + media_type="text/plain", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Content-Type": "text/event-stream", + }, + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Agent query failed: {str(e)}") + + return router diff --git a/python/valuecell/server/api/schemas/agent_stream.py b/python/valuecell/server/api/schemas/agent_stream.py new file mode 100644 index 000000000..132bef5a7 --- /dev/null +++ b/python/valuecell/server/api/schemas/agent_stream.py @@ -0,0 +1,32 @@ +""" +Agent stream API schemas for handling streaming agent queries. +""" + +from pydantic import BaseModel, Field + + +class AgentStreamRequest(BaseModel): + """Request model for agent streaming queries.""" + + query: str = Field(..., description="User query to send to the agent") + agent_name: str = Field(..., description="Specific agent name to use for the query") + + class Config: + json_schema_extra = { + "example": { + "query": "What is the current market trend?", + "agent_name": "HelloWorldAgent", + } + } + + +class StreamChunk(BaseModel): + """Response chunk model for streaming data.""" + + content: str = Field(..., description="Content chunk from the agent response") + is_final: bool = Field(False, description="Whether this is the final chunk") + + class Config: + json_schema_extra = { + "example": {"content": "The current market shows...", "is_final": False} + } diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py new file mode 100644 index 000000000..259961a9d --- /dev/null +++ b/python/valuecell/server/services/agent_stream_service.py @@ -0,0 +1,59 @@ +""" +Agent stream service for handling streaming agent interactions. +""" + +from typing import AsyncGenerator, Optional +from valuecell.core.coordinate.orchestrator import get_default_orchestrator +from valuecell.core.types import UserInput, UserInputMetadata +import logging + +logger = logging.getLogger(__name__) + + +class AgentStreamService: + """Service for handling streaming agent queries.""" + + def __init__(self): + """Initialize the agent stream service.""" + self.orchestrator = get_default_orchestrator() + logger.info("Agent stream service initialized") + + async def stream_query_agent( + self, query: str, agent_name: Optional[str] = None + ) -> AsyncGenerator[str, None]: + """ + Stream agent responses for a given query. + + Args: + query: User query to process + agent_name: Optional specific agent name to use. If provided, takes precedence over query parsing. + + Yields: + str: Content chunks from the agent response + """ + try: + logger.info(f"Processing streaming query: {query[:100]}...") + + user_id = "default_user" + desired_agent_name = agent_name + + user_input_meta = UserInputMetadata(user_id=user_id) + + user_input = UserInput( + query=query, desired_agent_name=desired_agent_name, meta=user_input_meta + ) + + # Use the orchestrator's process_user_input method for streaming + async for response_chunk in self.orchestrator.process_user_input( + user_input + ): + if ( + response_chunk + and response_chunk.content + and response_chunk.content.strip() + ): + yield response_chunk.content + + except Exception as e: + logger.error(f"Error in stream_query_agent: {str(e)}") + yield f"Error processing query: {str(e)}" From 2a48ed947dd065ccf026e853a33d9a83c99b31b1 Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Fri, 19 Sep 2025 09:57:06 +0800 Subject: [PATCH 2/3] fix sec agent --- python/valuecell/agents/hello_world.py | 15 ----------- python/valuecell/agents/sec_agent.py | 18 +++++++------ python/valuecell/agents/sec_client.py | 4 +-- python/valuecell/agents/tests/test_import.py | 25 ------------------- python/valuecell/core/coordinate/models.py | 4 +-- .../valuecell/examples/agent_i18n_example.py | 4 +-- .../examples/asset_adapter_example.py | 4 +-- .../server/api/schemas/agent_stream.py | 6 ++--- python/valuecell/server/main.py | 7 +++--- .../server/services/agent_stream_service.py | 4 ++- 10 files changed, 28 insertions(+), 63 deletions(-) delete mode 100644 python/valuecell/agents/hello_world.py delete mode 100644 python/valuecell/agents/tests/test_import.py diff --git a/python/valuecell/agents/hello_world.py b/python/valuecell/agents/hello_world.py deleted file mode 100644 index 15ca80e1e..000000000 --- a/python/valuecell/agents/hello_world.py +++ /dev/null @@ -1,15 +0,0 @@ -from valuecell.core.agent.decorator import serve -from valuecell.core.types import BaseAgent - - -@serve() -class HelloWorldAgent(BaseAgent): - """ - A simple Hello World Agent that responds with a greeting message. - """ - - async def stream(self, query, session_id, task_id): - yield { - "content": f"Hello! You said: {query}", - "is_task_complete": True, - } diff --git a/python/valuecell/agents/sec_agent.py b/python/valuecell/agents/sec_agent.py index a25340ea7..c11600773 100644 --- a/python/valuecell/agents/sec_agent.py +++ b/python/valuecell/agents/sec_agent.py @@ -248,12 +248,16 @@ async def _process_fund_holdings_query( return logger.info(f"Retrieved {len(filings)} 13F filings") - # Parse filing data - current_filing = filings.iloc[0].obj() - current_data = current_filing.infotable.to_json() + # %% + o = filings[1].obj() + current_filing = o.infotable.to_json() + + # %% + o = filings[2].obj() + previous_filing = o.infotable.to_json() + + - previous_filing = filings.iloc[1].obj() - previous_data = previous_filing.infotable.to_json() logger.info("Successfully parsed current and historical holdings data") @@ -262,10 +266,10 @@ async def _process_fund_holdings_query( As a professional investment analyst, please conduct an in-depth analysis of the following 13F holdings data: ## Historical holdings data (earlier period): - {previous_data} + {previous_filing} ## Current holdings data (latest period): - {current_data} + {current_filing} ## Analysis requirements: Please provide professional analysis from the following perspectives: diff --git a/python/valuecell/agents/sec_client.py b/python/valuecell/agents/sec_client.py index e11573891..572e90e7e 100644 --- a/python/valuecell/agents/sec_client.py +++ b/python/valuecell/agents/sec_client.py @@ -17,9 +17,9 @@ async def main(): # Get client and send message client = await connections.get_client("SecAgent") async for task, event in await client.send_message( - "伯克希尔最近持仓变化", streaming=True + "Recent Berkshire Hathaway holdings changes", streaming=True ): - print(f"接收到Task: {task}") + print(f"Received Task: {task}") print(f"Calculation result: {task.status}") diff --git a/python/valuecell/agents/tests/test_import.py b/python/valuecell/agents/tests/test_import.py deleted file mode 100644 index 399257c64..000000000 --- a/python/valuecell/agents/tests/test_import.py +++ /dev/null @@ -1,25 +0,0 @@ -import pytest -from a2a.types import AgentCard -from valuecell.core.agent.connect import RemoteConnections - - -@pytest.mark.asyncio -async def test_run_hello_world(): - connections = RemoteConnections() - name = "HelloWorldAgent" - try: - available = connections.list_available_agents() - assert name in available - - agent_card = await connections.start_agent("HelloWorldAgent") - assert isinstance(agent_card, AgentCard) and agent_card - - client = await connections.get_client("HelloWorldAgent") - turns = 0 - async for task, event in await client.send_message("Hi there!"): - assert task is not None - assert event is None - turns += 1 - assert turns == 1 - finally: - await connections.stop_all() diff --git a/python/valuecell/core/coordinate/models.py b/python/valuecell/core/coordinate/models.py index 6155f4333..c4cacb88f 100644 --- a/python/valuecell/core/coordinate/models.py +++ b/python/valuecell/core/coordinate/models.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Optional from pydantic import BaseModel, Field from valuecell.core.task import Task @@ -8,7 +8,7 @@ class ExecutionPlan(BaseModel): """Execution plan containing multiple tasks""" plan_id: str = Field(..., description="Unique plan identifier") - session_id: str = Field(..., description="Session ID this plan belongs to") + session_id: Optional[str] = Field(..., description="Session ID this plan belongs to") user_id: str = Field(..., description="User ID who requested this plan") query: str = Field(..., description="Original user input") tasks: List[Task] = Field(default_factory=list, description="Tasks to execute") diff --git a/python/valuecell/examples/agent_i18n_example.py b/python/valuecell/examples/agent_i18n_example.py index d4ac6e1ba..c47503e4c 100644 --- a/python/valuecell/examples/agent_i18n_example.py +++ b/python/valuecell/examples/agent_i18n_example.py @@ -154,8 +154,8 @@ def main(): # Batch process multiple users user_requests = { "user1": "Calculate my returns", - "user2": "分析我的投资组合", - "user3": "顯示我的資產配置", + "user2": "Analyze my investment portfolio", + "user3": "Show my asset allocation", "user4": "Update my risk profile", } diff --git a/python/valuecell/examples/asset_adapter_example.py b/python/valuecell/examples/asset_adapter_example.py index 09bd010cb..852000cb6 100644 --- a/python/valuecell/examples/asset_adapter_example.py +++ b/python/valuecell/examples/asset_adapter_example.py @@ -114,7 +114,7 @@ def demonstrate_asset_search(): results_zh = search_assets("00700.HK", language="zh-Hans", limit=5) if results_zh["success"]: - logger.info(f"找到 {results_zh['count']} 个结果:") + logger.info(f"Found {results_zh['count']} results:") for result in results_zh["results"]: logger.info( f" - {result['ticker']}: {result['display_name']} " @@ -192,7 +192,7 @@ def demonstrate_asset_info(): info_zh = get_asset_info(ticker, language="zh-Hans") if info_zh["success"]: logger.info( - f" 中文: {info_zh['display_name']} ({info_zh['asset_type_display']})" + f" Chinese: {info_zh['display_name']} ({info_zh['asset_type_display']})" ) diff --git a/python/valuecell/server/api/schemas/agent_stream.py b/python/valuecell/server/api/schemas/agent_stream.py index 132bef5a7..7470d51cb 100644 --- a/python/valuecell/server/api/schemas/agent_stream.py +++ b/python/valuecell/server/api/schemas/agent_stream.py @@ -9,13 +9,13 @@ class AgentStreamRequest(BaseModel): """Request model for agent streaming queries.""" query: str = Field(..., description="User query to send to the agent") - agent_name: str = Field(..., description="Specific agent name to use for the query") + agent_name: str = Field(None, description="Specific agent name to use for the query") class Config: json_schema_extra = { "example": { "query": "What is the current market trend?", - "agent_name": "HelloWorldAgent", + "agent_name": "MarketAnalystAgent", } } @@ -29,4 +29,4 @@ class StreamChunk(BaseModel): class Config: json_schema_extra = { "example": {"content": "The current market shows...", "is_final": False} - } + } \ No newline at end of file diff --git a/python/valuecell/server/main.py b/python/valuecell/server/main.py index 4fc764b67..0ad460345 100644 --- a/python/valuecell/server/main.py +++ b/python/valuecell/server/main.py @@ -1,8 +1,8 @@ """Main entry point for ValueCell Server Backend.""" import uvicorn -from .api.app import create_app -from .config.settings import get_settings +from valuecell.server.api.app import create_app +from valuecell.server.config.settings import get_settings # Create app instance for uvicorn app = create_app() @@ -15,8 +15,7 @@ def main(): uvicorn.run( app, host=settings.API_HOST, - port=settings.API_PORT, - reload=settings.API_DEBUG, + port=settings.API_PORT ) diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 259961a9d..db699c0e7 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -4,6 +4,7 @@ from typing import AsyncGenerator, Optional from valuecell.core.coordinate.orchestrator import get_default_orchestrator +from valuecell.core.coordinate.tests.test_orchestrator import session_id from valuecell.core.types import UserInput, UserInputMetadata import logging @@ -36,8 +37,9 @@ async def stream_query_agent( user_id = "default_user" desired_agent_name = agent_name + session_id = agent_name + "_session_" + user_id - user_input_meta = UserInputMetadata(user_id=user_id) + user_input_meta = UserInputMetadata(user_id=user_id, session_id=session_id) user_input = UserInput( query=query, desired_agent_name=desired_agent_name, meta=user_input_meta From 9ef337fdf4fa0398293ee13cac49b4b693e16d57 Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Fri, 19 Sep 2025 10:02:26 +0800 Subject: [PATCH 3/3] lint --- python/valuecell/examples/agent_i18n_example.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/valuecell/examples/agent_i18n_example.py b/python/valuecell/examples/agent_i18n_example.py index c47503e4c..d4ac6e1ba 100644 --- a/python/valuecell/examples/agent_i18n_example.py +++ b/python/valuecell/examples/agent_i18n_example.py @@ -154,8 +154,8 @@ def main(): # Batch process multiple users user_requests = { "user1": "Calculate my returns", - "user2": "Analyze my investment portfolio", - "user3": "Show my asset allocation", + "user2": "分析我的投资组合", + "user3": "顯示我的資產配置", "user4": "Update my risk profile", }