diff --git a/python/valuecell/core/__init__.py b/python/valuecell/core/__init__.py index 4a78bda58..d05ef617e 100644 --- a/python/valuecell/core/__init__.py +++ b/python/valuecell/core/__init__.py @@ -3,8 +3,6 @@ from .agent.responses import notification, streaming from .session import ( InMemorySessionStore, - Message, - Role, Session, SessionManager, SessionStatus, @@ -30,8 +28,6 @@ __all__ = [ # Session exports - "Message", - "Role", "Session", "SessionStatus", "SessionManager", diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 77cb56e3b..41d6c8120 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -277,13 +277,3 @@ def get_agent_card(self, agent_name: str) -> Optional[AgentCard]: if ctx.local_agent_card: return ctx.local_agent_card return None - - -# Global default instance for backward compatibility and ease of use -_default_remote_connections = RemoteConnections() - - -# Convenience functions that delegate to the default instance -def get_default_remote_connections() -> RemoteConnections: - """Get the default RemoteConnections instance""" - return _default_remote_connections diff --git a/python/valuecell/core/coordinate/__init__.py b/python/valuecell/core/coordinate/__init__.py index 9c1c53bcb..b989c686d 100644 --- a/python/valuecell/core/coordinate/__init__.py +++ b/python/valuecell/core/coordinate/__init__.py @@ -1,11 +1,10 @@ from .models import ExecutionPlan -from .orchestrator import AgentOrchestrator, get_default_orchestrator +from .orchestrator import AgentOrchestrator from .planner import ExecutionPlanner __all__ = [ "AgentOrchestrator", - "get_default_orchestrator", "ExecutionPlanner", "ExecutionPlan", ] diff --git a/python/valuecell/core/coordinate/orchestrator.py b/python/valuecell/core/coordinate/orchestrator.py index a7a0f8089..7e4a61510 100644 --- a/python/valuecell/core/coordinate/orchestrator.py +++ b/python/valuecell/core/coordinate/orchestrator.py @@ -3,7 +3,7 @@ from typing import AsyncGenerator, Dict, Optional from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent -from valuecell.core.agent.connect import get_default_remote_connections +from valuecell.core.agent.connect import RemoteConnections from valuecell.core.coordinate.response import ResponseFactory from valuecell.core.coordinate.response_buffer import ResponseBuffer, SaveItem from valuecell.core.coordinate.response_router import ( @@ -11,10 +11,11 @@ SideEffectKind, handle_status_update, ) -from valuecell.core.session import SessionStatus, get_default_session_manager -from valuecell.core.task import Task, get_default_task_manager +from valuecell.core.session import SessionManager, SessionStatus, SQLiteMessageStore +from valuecell.core.task import Task, TaskManager from valuecell.core.task.models import TaskPattern from valuecell.core.types import BaseResponse, UserInput +from valuecell.utils import resolve_db_path from valuecell.utils.uuid import generate_thread_id from .models import ExecutionPlan @@ -104,9 +105,11 @@ class AgentOrchestrator: """ def __init__(self): - self.session_manager = get_default_session_manager() - self.task_manager = get_default_task_manager() - self.agent_connections = get_default_remote_connections() + self.session_manager = SessionManager( + message_store=SQLiteMessageStore(resolve_db_path()) + ) + self.task_manager = TaskManager() + self.agent_connections = RemoteConnections() # Initialize user input management self.user_input_manager = UserInputManager() @@ -605,13 +608,3 @@ async def _persist_items(self, items: list[SaveItem]): payload=it.payload, item_id=it.item_id, ) - - -# ==================== Module-level Factory Function ==================== - -_orchestrator = AgentOrchestrator() - - -def get_default_orchestrator() -> AgentOrchestrator: - """Get the default singleton instance of AgentOrchestrator""" - return _orchestrator diff --git a/python/valuecell/core/session/__init__.py b/python/valuecell/core/session/__init__.py index 16bd38c75..11cea3f45 100644 --- a/python/valuecell/core/session/__init__.py +++ b/python/valuecell/core/session/__init__.py @@ -1,24 +1,21 @@ """Session module initialization""" -from .manager import ( - SessionManager, - get_default_session_manager, -) -from valuecell.core.types import ConversationItem as Message, Role +from .manager import SessionManager +from .message_store import InMemoryMessageStore, MessageStore, SQLiteMessageStore from .models import Session, SessionStatus from .store import InMemorySessionStore, SessionStore __all__ = [ # Models - "Message", - "Role", "Session", "SessionStatus", # Session management "SessionManager", - "get_default_session_manager", # Session storage "SessionStore", "InMemorySessionStore", - # Message storage (re-exported from core.__init__) + # Message storage + "MessageStore", + "InMemoryMessageStore", + "SQLiteMessageStore", ] diff --git a/python/valuecell/core/session/manager.py b/python/valuecell/core/session/manager.py index 4aa66d664..9b06f7502 100644 --- a/python/valuecell/core/session/manager.py +++ b/python/valuecell/core/session/manager.py @@ -1,4 +1,3 @@ -import os from datetime import datetime from typing import List, Optional @@ -10,7 +9,7 @@ ) from valuecell.utils import generate_uuid -from .message_store import InMemoryMessageStore, MessageStore, SQLiteMessageStore +from .message_store import InMemoryMessageStore, MessageStore from .models import Session, SessionStatus from .store import InMemorySessionStore, SessionStore @@ -203,28 +202,3 @@ async def get_sessions_by_status( user_id, limit * 2, offset ) return [session for session in all_sessions if session.status == status][:limit] - - -# Default session manager instance -def _default_db_path() -> str: - """Resolve repository root and return default DB path valuecell.db. - - Layout assumption: this file is at repo_root/python/valuecell/core/session/manager.py - We walk up 4 levels to reach repo_root. - """ - here = os.path.dirname(__file__) - repo_root = os.path.abspath(os.path.join(here, "..", "..", "..", "..")) - return os.path.join(repo_root, "valuecell.db") - - -def _resolve_db_path() -> str: - return os.environ.get("VALUECELL_SQLITE_DB") or _default_db_path() - - -# Default: use SQLite at repo root valuecell.db (env VALUECELL_SQLITE_DB overrides) -_session_manager = SessionManager(message_store=SQLiteMessageStore(_resolve_db_path())) - - -def get_default_session_manager() -> SessionManager: - """Get the default session manager instance""" - return _session_manager diff --git a/python/valuecell/core/task/__init__.py b/python/valuecell/core/task/__init__.py index ea768f605..2eae9f9ac 100644 --- a/python/valuecell/core/task/__init__.py +++ b/python/valuecell/core/task/__init__.py @@ -1,6 +1,6 @@ """Task module initialization""" -from .manager import TaskManager, get_default_task_manager +from .manager import TaskManager from .models import Task, TaskStatus, TaskPattern from .store import InMemoryTaskStore, TaskStore @@ -11,5 +11,4 @@ "TaskManager", "TaskStore", "InMemoryTaskStore", - "get_default_task_manager", ] diff --git a/python/valuecell/core/task/manager.py b/python/valuecell/core/task/manager.py index 6cab35770..a81a383fb 100644 --- a/python/valuecell/core/task/manager.py +++ b/python/valuecell/core/task/manager.py @@ -170,10 +170,3 @@ async def cancel_agent_tasks(self, agent_name: str) -> int: cancelled_count += 1 return cancelled_count - - -_task_manager = TaskManager() - - -def get_default_task_manager() -> TaskManager: - return _task_manager diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index 169399fbd..b276307e5 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -14,7 +14,6 @@ from ..config.settings import get_settings from .routers.i18n import create_i18n_router from .routers.system import create_system_router -from .routers.websocket import create_websocket_router from .routers.watchlist import create_watchlist_router from .routers.agent_stream import create_agent_stream_router from .routers.agent import create_agent_router @@ -128,8 +127,6 @@ async def root(): # Include system router app.include_router(create_system_router()) - # Include websocket router - app.include_router(create_websocket_router()) # Include watchlist router app.include_router(create_watchlist_router()) # Include agent stream router diff --git a/python/valuecell/server/api/routers/websocket.py b/python/valuecell/server/api/routers/websocket.py deleted file mode 100644 index 67e1f6acf..000000000 --- a/python/valuecell/server/api/routers/websocket.py +++ /dev/null @@ -1,133 +0,0 @@ -"""WebSocket router for real-time stock analysis.""" - -import json -import logging -from typing import Optional -from uuid import uuid4 - -from fastapi import APIRouter, WebSocket, WebSocketDisconnect -from pydantic import BaseModel, Field - -from valuecell.core.coordinate.orchestrator import get_default_orchestrator -from valuecell.core.types import UserInput, UserInputMetadata - -logger = logging.getLogger(__name__) - -# Agent analyst mapping from the example -AGENT_ANALYST_MAP = {"SecAgent": ("SecAgent")} - - -class AnalysisRequest(BaseModel): - """Request model for analysis.""" - - agent_name: str = Field(..., description="The name of the agent to use") - query: str = Field(..., description="The user's query for the agent") - session_id: Optional[str] = Field( - None, description="Session ID, will be auto-generated if not provided" - ) - user_id: str = Field("default_user", description="User ID") - - -def _parse_user_input(request: AnalysisRequest) -> UserInput: - """Parse analysis request into UserInput.""" - session_id = request.session_id or str(uuid4()) - - return UserInput( - query=request.query, - desired_agent_name=request.agent_name, - meta=UserInputMetadata( - session_id=session_id, - user_id=request.user_id, - ), - ) - - -def create_websocket_router() -> APIRouter: - """Create and configure WebSocket router.""" - router = APIRouter() - - @router.websocket("/ws") - async def websocket_endpoint(websocket: WebSocket): - """WebSocket endpoint for real-time stock analysis.""" - await websocket.accept() - logger.info("WebSocket connection established") - - try: - orchestrator = get_default_orchestrator() - - while True: - # Receive message from client - data = await websocket.receive_text() - logger.info(f"Received message: {data}") - - try: - # Parse the incoming message - message_data = json.loads(data) - - # Validate agent name - agent_name = message_data.get("agent_name") - if agent_name not in AGENT_ANALYST_MAP: - await websocket.send_text( - json.dumps( - { - "type": "error", - "message": f"Unsupported agent: {agent_name}. Available agents: {list(AGENT_ANALYST_MAP.keys())}", - } - ) - ) - continue - - # Create analysis request - request = AnalysisRequest(**message_data) - user_input = _parse_user_input(request) - - # Send analysis start notification - await websocket.send_text( - json.dumps( - { - "type": "analysis_started", - "agent_name": request.agent_name, - } - ) - ) - - # Stream analysis results - async for message_chunk in orchestrator.process_user_input( - user_input - ): - response = { - "type": "analysis_chunk", - "message": str(message_chunk), - "agent_name": request.agent_name, - } - await websocket.send_text(json.dumps(response)) - logger.info(f"Sent message chunk: {message_chunk}") - - # Send completion notification - await websocket.send_text( - json.dumps( - { - "type": "analysis_completed", - "agent_name": request.agent_name, - } - ) - ) - - except json.JSONDecodeError: - await websocket.send_text( - json.dumps({"type": "error", "message": "Invalid JSON format"}) - ) - except Exception as e: - logger.error(f"Error processing request: {e}") - await websocket.send_text( - json.dumps( - {"type": "error", "message": f"Analysis failed: {str(e)}"} - ) - ) - - except WebSocketDisconnect: - logger.info("WebSocket connection closed") - except Exception as e: - logger.error(f"WebSocket error: {e}") - - return router diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index 209f5968d..350b94dd9 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -2,10 +2,11 @@ Agent stream service for handling streaming agent interactions. """ +import logging from typing import AsyncGenerator, Optional -from valuecell.core.coordinate.orchestrator import get_default_orchestrator + +from valuecell.core.coordinate.orchestrator import AgentOrchestrator from valuecell.core.types import UserInput, UserInputMetadata -import logging logger = logging.getLogger(__name__) @@ -15,7 +16,7 @@ class AgentStreamService: def __init__(self): """Initialize the agent stream service.""" - self.orchestrator = get_default_orchestrator() + self.orchestrator = AgentOrchestrator() logger.info("Agent stream service initialized") async def stream_query_agent( diff --git a/python/valuecell/utils/__init__.py b/python/valuecell/utils/__init__.py index 189591d91..1716c6adf 100644 --- a/python/valuecell/utils/__init__.py +++ b/python/valuecell/utils/__init__.py @@ -1,10 +1,12 @@ from .path import get_agent_card_path from .port import get_next_available_port, parse_host_port from .uuid import generate_uuid +from .db import resolve_db_path __all__ = [ "get_next_available_port", "generate_uuid", "get_agent_card_path", "parse_host_port", + "resolve_db_path", ] diff --git a/python/valuecell/utils/db.py b/python/valuecell/utils/db.py new file mode 100644 index 000000000..07481293a --- /dev/null +++ b/python/valuecell/utils/db.py @@ -0,0 +1,16 @@ +import os + + +def default_db_path() -> str: + """Resolve repository root and return default DB path valuecell.db. + + Layout assumption: this file is at repo_root/python/valuecell/utils/db.py + We walk up 3 levels to reach repo_root. + """ + here = os.path.dirname(__file__) + repo_root = os.path.abspath(os.path.join(here, "..", "..", "..")) + return os.path.join(repo_root, "valuecell.db") + + +def resolve_db_path() -> str: + return os.environ.get("VALUECELL_SQLITE_DB") or default_db_path()