From cd230d83b36395849049eccbea2ce4dd0723e5b6 Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Thu, 18 Sep 2025 09:56:42 +0800 Subject: [PATCH 1/2] feat: add WebSocket endpoint for real-time stock analysis --- python/valuecell/server/api/app.py | 4 + .../valuecell/server/api/routers/websocket.py | 133 ++++++++++++++++++ python/valuecell/server/main.py | 4 +- 3 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 python/valuecell/server/api/routers/websocket.py diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index e879f5cf1..6aef59573 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -14,6 +14,7 @@ from ..config.settings import get_settings from .routers.i18n import create_i18n_router from .routers.system import create_system_router +from .routers.websocket import create_websocket_router from .schemas import SuccessResponse, AppInfoData @@ -98,6 +99,9 @@ async def root(): # Include system router app.include_router(create_system_router()) + # Include websocket router + app.include_router(create_websocket_router()) + # For uvicorn app = create_app() diff --git a/python/valuecell/server/api/routers/websocket.py b/python/valuecell/server/api/routers/websocket.py new file mode 100644 index 000000000..52ab02965 --- /dev/null +++ b/python/valuecell/server/api/routers/websocket.py @@ -0,0 +1,133 @@ +"""WebSocket router for real-time stock analysis.""" + +import json +import logging +from typing import Optional +from uuid import uuid4 + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from pydantic import BaseModel, Field + +from valuecell.core.coordinate.orchestrator import get_default_orchestrator +from valuecell.core.types import UserInput, UserInputMetadata + +logger = logging.getLogger(__name__) + +# Agent analyst mapping from the example +AGENT_ANALYST_MAP = {"Sec13FundAgent": ("Sec13FundAgent")} + + +class AnalysisRequest(BaseModel): + """Request model for analysis.""" + + agent_name: str = Field(..., description="The name of the agent to use") + query: str = Field(..., description="The user's query for the agent") + session_id: Optional[str] = Field( + None, description="Session ID, will be auto-generated if not provided" + ) + user_id: str = Field("default_user", description="User ID") + + +def _parse_user_input(request: AnalysisRequest) -> UserInput: + """Parse analysis request into UserInput.""" + session_id = request.session_id or str(uuid4()) + + return UserInput( + query=request.query, + desired_agent_name=request.agent_name, + meta=UserInputMetadata( + session_id=session_id, + user_id=request.user_id, + ), + ) + + +def create_websocket_router() -> APIRouter: + """Create and configure WebSocket router.""" + router = APIRouter() + + @router.websocket("/ws") + async def websocket_endpoint(websocket: WebSocket): + """WebSocket endpoint for real-time stock analysis.""" + await websocket.accept() + logger.info("WebSocket connection established") + + try: + orchestrator = get_default_orchestrator() + + while True: + # Receive message from client + data = await websocket.receive_text() + logger.info(f"Received message: {data}") + + try: + # Parse the incoming message + message_data = json.loads(data) + + # Validate agent name + agent_name = message_data.get("agent_name") + if agent_name not in AGENT_ANALYST_MAP: + await websocket.send_text( + json.dumps( + { + "type": "error", + "message": f"Unsupported agent: {agent_name}. Available agents: {list(AGENT_ANALYST_MAP.keys())}", + } + ) + ) + continue + + # Create analysis request + request = AnalysisRequest(**message_data) + user_input = _parse_user_input(request) + + # Send analysis start notification + await websocket.send_text( + json.dumps( + { + "type": "analysis_started", + "agent_name": request.agent_name, + } + ) + ) + + # Stream analysis results + async for message_chunk in orchestrator.process_user_input( + user_input + ): + response = { + "type": "analysis_chunk", + "message": str(message_chunk), + "agent_name": request.agent_name, + } + await websocket.send_text(json.dumps(response)) + logger.info(f"Sent message chunk: {message_chunk}") + + # Send completion notification + await websocket.send_text( + json.dumps( + { + "type": "analysis_completed", + "agent_name": request.agent_name, + } + ) + ) + + except json.JSONDecodeError: + await websocket.send_text( + json.dumps({"type": "error", "message": "Invalid JSON format"}) + ) + except Exception as e: + logger.error(f"Error processing request: {e}") + await websocket.send_text( + json.dumps( + {"type": "error", "message": f"Analysis failed: {str(e)}"} + ) + ) + + except WebSocketDisconnect: + logger.info("WebSocket connection closed") + except Exception as e: + logger.error(f"WebSocket error: {e}") + + return router diff --git a/python/valuecell/server/main.py b/python/valuecell/server/main.py index 826a100cd..4fc764b67 100644 --- a/python/valuecell/server/main.py +++ b/python/valuecell/server/main.py @@ -4,11 +4,13 @@ from .api.app import create_app from .config.settings import get_settings +# Create app instance for uvicorn +app = create_app() + def main(): """Start the server.""" settings = get_settings() - app = create_app() uvicorn.run( app, From ef5f89e6a8395ee47714ae2acb2b6c5e093a2db8 Mon Sep 17 00:00:00 2001 From: zhonghao lu Date: Thu, 18 Sep 2025 10:16:12 +0800 Subject: [PATCH 2/2] fix sec agent port --- python/configs/agent_cards/sec_agent.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/configs/agent_cards/sec_agent.json b/python/configs/agent_cards/sec_agent.json index 08f0c192e..ab42d52c7 100644 --- a/python/configs/agent_cards/sec_agent.json +++ b/python/configs/agent_cards/sec_agent.json @@ -1,5 +1,5 @@ { "name": "Sec13FundAgent", - "url": "http://localhost:10001/", + "url": "http://localhost:10003/", "enabled": true } \ No newline at end of file