Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/configs/agent_cards/sec_agent.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "Sec13FundAgent",
"url": "http://localhost:10001/",
"url": "http://localhost:10003/",
"enabled": true
}
3 changes: 3 additions & 0 deletions python/valuecell/server/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ..config.settings import get_settings
from .routers.i18n import create_i18n_router
from .routers.system import create_system_router
from .routers.websocket import create_websocket_router
from .routers.watchlist import create_watchlist_router
from .schemas import SuccessResponse, AppInfoData
from ...adapters.assets import get_adapter_manager
Expand Down Expand Up @@ -125,6 +126,8 @@ async def root():
# Include system router
app.include_router(create_system_router())

# Include websocket router
app.include_router(create_websocket_router())
# Include watchlist router
app.include_router(create_watchlist_router())

Expand Down
133 changes: 133 additions & 0 deletions python/valuecell/server/api/routers/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""WebSocket router for real-time stock analysis."""

import json
import logging
from typing import Optional
from uuid import uuid4

from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, Field

from valuecell.core.coordinate.orchestrator import get_default_orchestrator
from valuecell.core.types import UserInput, UserInputMetadata

logger = logging.getLogger(__name__)

# Agent analyst mapping from the example
AGENT_ANALYST_MAP = {"Sec13FundAgent": ("Sec13FundAgent")}


class AnalysisRequest(BaseModel):
"""Request model for analysis."""

agent_name: str = Field(..., description="The name of the agent to use")
query: str = Field(..., description="The user's query for the agent")
session_id: Optional[str] = Field(
None, description="Session ID, will be auto-generated if not provided"
)
user_id: str = Field("default_user", description="User ID")


def _parse_user_input(request: AnalysisRequest) -> UserInput:
"""Parse analysis request into UserInput."""
session_id = request.session_id or str(uuid4())

return UserInput(
query=request.query,
desired_agent_name=request.agent_name,
meta=UserInputMetadata(
session_id=session_id,
user_id=request.user_id,
),
)


def create_websocket_router() -> APIRouter:
"""Create and configure WebSocket router."""
router = APIRouter()

@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time stock analysis."""
await websocket.accept()
logger.info("WebSocket connection established")

try:
orchestrator = get_default_orchestrator()

while True:
# Receive message from client
data = await websocket.receive_text()
logger.info(f"Received message: {data}")

try:
# Parse the incoming message
message_data = json.loads(data)

# Validate agent name
agent_name = message_data.get("agent_name")
if agent_name not in AGENT_ANALYST_MAP:
await websocket.send_text(
json.dumps(
{
"type": "error",
"message": f"Unsupported agent: {agent_name}. Available agents: {list(AGENT_ANALYST_MAP.keys())}",
}
)
)
continue

# Create analysis request
request = AnalysisRequest(**message_data)
user_input = _parse_user_input(request)

# Send analysis start notification
await websocket.send_text(
json.dumps(
{
"type": "analysis_started",
"agent_name": request.agent_name,
}
)
)

# Stream analysis results
async for message_chunk in orchestrator.process_user_input(
user_input
):
response = {
"type": "analysis_chunk",
"message": str(message_chunk),
"agent_name": request.agent_name,
}
await websocket.send_text(json.dumps(response))
logger.info(f"Sent message chunk: {message_chunk}")

# Send completion notification
await websocket.send_text(
json.dumps(
{
"type": "analysis_completed",
"agent_name": request.agent_name,
}
)
)

except json.JSONDecodeError:
await websocket.send_text(
json.dumps({"type": "error", "message": "Invalid JSON format"})
)
except Exception as e:
logger.error(f"Error processing request: {e}")
await websocket.send_text(
json.dumps(
{"type": "error", "message": f"Analysis failed: {str(e)}"}
)
)

except WebSocketDisconnect:
logger.info("WebSocket connection closed")
except Exception as e:
logger.error(f"WebSocket error: {e}")

return router
4 changes: 3 additions & 1 deletion python/valuecell/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from .api.app import create_app
from .config.settings import get_settings

# Create app instance for uvicorn
app = create_app()


def main():
"""Start the server."""
settings = get_settings()
app = create_app()

uvicorn.run(
app,
Expand Down