diff --git a/python/configs/agent_cards/sec_agent.json b/python/configs/agent_cards/sec_agent.json index 08f0c192e..ab42d52c7 100644 --- a/python/configs/agent_cards/sec_agent.json +++ b/python/configs/agent_cards/sec_agent.json @@ -1,5 +1,5 @@ { "name": "Sec13FundAgent", - "url": "http://localhost:10001/", + "url": "http://localhost:10003/", "enabled": true } \ No newline at end of file diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index c043b3fd2..525d9f1a1 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -14,6 +14,7 @@ from ..config.settings import get_settings from .routers.i18n import create_i18n_router from .routers.system import create_system_router +from .routers.websocket import create_websocket_router from .routers.watchlist import create_watchlist_router from .schemas import SuccessResponse, AppInfoData from ...adapters.assets import get_adapter_manager @@ -125,6 +126,8 @@ async def root(): # Include system router app.include_router(create_system_router()) + # Include websocket router + app.include_router(create_websocket_router()) # Include watchlist router app.include_router(create_watchlist_router()) diff --git a/python/valuecell/server/api/routers/websocket.py b/python/valuecell/server/api/routers/websocket.py new file mode 100644 index 000000000..52ab02965 --- /dev/null +++ b/python/valuecell/server/api/routers/websocket.py @@ -0,0 +1,133 @@ +"""WebSocket router for real-time stock analysis.""" + +import json +import logging +from typing import Optional +from uuid import uuid4 + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from pydantic import BaseModel, Field + +from valuecell.core.coordinate.orchestrator import get_default_orchestrator +from valuecell.core.types import UserInput, UserInputMetadata + +logger = logging.getLogger(__name__) + +# Agent analyst mapping from the example +AGENT_ANALYST_MAP = {"Sec13FundAgent": ("Sec13FundAgent")} + + +class AnalysisRequest(BaseModel): + """Request model for analysis.""" + + agent_name: str = Field(..., description="The name of the agent to use") + query: str = Field(..., description="The user's query for the agent") + session_id: Optional[str] = Field( + None, description="Session ID, will be auto-generated if not provided" + ) + user_id: str = Field("default_user", description="User ID") + + +def _parse_user_input(request: AnalysisRequest) -> UserInput: + """Parse analysis request into UserInput.""" + session_id = request.session_id or str(uuid4()) + + return UserInput( + query=request.query, + desired_agent_name=request.agent_name, + meta=UserInputMetadata( + session_id=session_id, + user_id=request.user_id, + ), + ) + + +def create_websocket_router() -> APIRouter: + """Create and configure WebSocket router.""" + router = APIRouter() + + @router.websocket("/ws") + async def websocket_endpoint(websocket: WebSocket): + """WebSocket endpoint for real-time stock analysis.""" + await websocket.accept() + logger.info("WebSocket connection established") + + try: + orchestrator = get_default_orchestrator() + + while True: + # Receive message from client + data = await websocket.receive_text() + logger.info(f"Received message: {data}") + + try: + # Parse the incoming message + message_data = json.loads(data) + + # Validate agent name + agent_name = message_data.get("agent_name") + if agent_name not in AGENT_ANALYST_MAP: + await websocket.send_text( + json.dumps( + { + "type": "error", + "message": f"Unsupported agent: {agent_name}. Available agents: {list(AGENT_ANALYST_MAP.keys())}", + } + ) + ) + continue + + # Create analysis request + request = AnalysisRequest(**message_data) + user_input = _parse_user_input(request) + + # Send analysis start notification + await websocket.send_text( + json.dumps( + { + "type": "analysis_started", + "agent_name": request.agent_name, + } + ) + ) + + # Stream analysis results + async for message_chunk in orchestrator.process_user_input( + user_input + ): + response = { + "type": "analysis_chunk", + "message": str(message_chunk), + "agent_name": request.agent_name, + } + await websocket.send_text(json.dumps(response)) + logger.info(f"Sent message chunk: {message_chunk}") + + # Send completion notification + await websocket.send_text( + json.dumps( + { + "type": "analysis_completed", + "agent_name": request.agent_name, + } + ) + ) + + except json.JSONDecodeError: + await websocket.send_text( + json.dumps({"type": "error", "message": "Invalid JSON format"}) + ) + except Exception as e: + logger.error(f"Error processing request: {e}") + await websocket.send_text( + json.dumps( + {"type": "error", "message": f"Analysis failed: {str(e)}"} + ) + ) + + except WebSocketDisconnect: + logger.info("WebSocket connection closed") + except Exception as e: + logger.error(f"WebSocket error: {e}") + + return router diff --git a/python/valuecell/server/main.py b/python/valuecell/server/main.py index 826a100cd..4fc764b67 100644 --- a/python/valuecell/server/main.py +++ b/python/valuecell/server/main.py @@ -4,11 +4,13 @@ from .api.app import create_app from .config.settings import get_settings +# Create app instance for uvicorn +app = create_app() + def main(): """Start the server.""" settings = get_settings() - app = create_app() uvicorn.run( app,