From 2b6c9a5e2d3115bd86487a13a4b7b997a982d530 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:33:05 +0800 Subject: [PATCH 1/6] feat: implement strategy auto-resume logic and enhance runtime management --- .../common/trading/_internal/runtime.py | 3 +- .../trading/_internal/stream_controller.py | 79 +++++++++- .../agents/common/trading/base_agent.py | 50 +++++-- .../agents/common/trading/data/market.py | 2 +- .../valuecell/agents/common/trading/models.py | 24 ++- python/valuecell/server/api/app.py | 29 ++-- .../server/api/routers/strategy_agent.py | 9 ++ .../db/repositories/strategy_repository.py | 25 ++++ .../server/services/agent_stream_service.py | 5 +- .../server/services/strategy_autoresume.py | 141 ++++++++++++++++++ .../server/services/strategy_persistence.py | 8 +- 11 files changed, 333 insertions(+), 42 deletions(-) create mode 100644 python/valuecell/server/services/strategy_autoresume.py diff --git a/python/valuecell/agents/common/trading/_internal/runtime.py b/python/valuecell/agents/common/trading/_internal/runtime.py index 56da88894..d556d16ba 100644 --- a/python/valuecell/agents/common/trading/_internal/runtime.py +++ b/python/valuecell/agents/common/trading/_internal/runtime.py @@ -64,6 +64,7 @@ async def create_strategy_runtime( request: UserRequest, composer: Optional[BaseComposer] = None, features_pipeline: Optional[BaseFeaturesPipeline] = None, + strategy_id_override: Optional[str] = None, ) -> StrategyRuntime: """Create a strategy runtime with async initialization (supports both paper and live trading). @@ -108,7 +109,7 @@ async def create_strategy_runtime( execution_gateway = await _create_execution_gateway(request) # Create strategy runtime components - strategy_id = generate_uuid("strategy") + strategy_id = strategy_id_override or generate_uuid("strategy") initial_capital = request.trading_config.initial_capital or 0.0 constraints = Constraints( max_positions=request.trading_config.max_positions, diff --git a/python/valuecell/agents/common/trading/_internal/stream_controller.py b/python/valuecell/agents/common/trading/_internal/stream_controller.py index 0fed8b5ef..7c866f660 100644 --- a/python/valuecell/agents/common/trading/_internal/stream_controller.py +++ b/python/valuecell/agents/common/trading/_internal/stream_controller.py @@ -16,6 +16,7 @@ from valuecell.agents.common.trading import models as agent_models from valuecell.agents.common.trading.utils import get_current_timestamp_ms +from valuecell.server.db.repositories.strategy_repository import get_strategy_repository from valuecell.server.services import strategy_persistence if TYPE_CHECKING: @@ -124,6 +125,21 @@ def persist_initial_state(self, runtime: StrategyRuntime) -> None: "Failed to persist initial portfolio/summary for {}", self.strategy_id ) + def has_initial_state(self) -> bool: + """Return True if an initial portfolio snapshot already exists. + + This allows idempotent strategy restarts without duplicating the first snapshot. + """ + try: + repo = get_strategy_repository() + snap = repo.get_latest_portfolio_snapshot(self.strategy_id) + return snap is not None + except Exception: + logger.warning( + "has_initial_state check failed for strategy {}", self.strategy_id + ) + return False + def persist_cycle_results(self, result: DecisionCycleResult) -> None: """Persist trades, portfolio view, and strategy summary for a cycle. @@ -184,8 +200,33 @@ def persist_cycle_results(self, result: DecisionCycleResult) -> None: except Exception: logger.exception("Error persisting cycle results for {}", self.strategy_id) + def persist_portfolio_snapshot(self, runtime: StrategyRuntime) -> None: + """Persist a final portfolio snapshot (used at shutdown). + + Mirrors portfolio part of cycle persistence but without trades or summary refresh. + Errors are logged and swallowed. + """ + try: + view = runtime.coordinator.portfolio_service.get_view() + try: + view.strategy_id = self.strategy_id + except Exception: + pass + ok = strategy_persistence.persist_portfolio_view(view) + if ok: + logger.info( + "Persisted final portfolio snapshot for strategy={}", + self.strategy_id, + ) + except Exception: + logger.exception( + "Failed to persist final portfolio snapshot for {}", self.strategy_id + ) + async def finalize( - self, runtime: StrategyRuntime, reason: str = "normal_exit" + self, + runtime: StrategyRuntime, + reason: agent_models.StopReason | str = agent_models.StopReason.NORMAL_EXIT, ) -> None: """Finalize strategy: close resources and mark as stopped. @@ -207,28 +248,28 @@ async def finalize( "Failed to close runtime resources for strategy {}", self.strategy_id ) - if reason == "error_closing_positions": - # Special case: we failed to close positions, so mark as ERROR to alert user - final_status = agent_models.StrategyStatus.ERROR.value - else: - final_status = agent_models.StrategyStatus.STOPPED.value + # With simplified statuses, all terminal states map to STOPPED. + # Preserve the detailed stop reason in strategy metadata for resume logic. + final_status = agent_models.StrategyStatus.STOPPED.value # Mark strategy as stopped/error in persistence try: strategy_persistence.set_strategy_status(self.strategy_id, final_status) + reason_value = getattr(reason, "value", reason) logger.info( "Marked strategy {} as {} (reason: {})", self.strategy_id, final_status, - reason, + reason_value, ) except Exception: logger.exception( "Failed to mark strategy {} for {} (reason: {})", final_status, self.strategy_id, - reason, + getattr(reason, "value", reason), ) + self._record_stop_reason(reason) def is_running(self) -> bool: """Check if strategy is still running according to persistence layer.""" @@ -259,3 +300,25 @@ def persist_trades(self, trades: list) -> None: logger.exception( "Error persisting ad-hoc trades for strategy {}", self.strategy_id ) + + def _record_stop_reason(self, reason: agent_models.StopReason | str) -> None: + """Persist last stop reason inside strategy metadata for resume decisions. + + Accept either a StopReason enum or a raw string; store the normalized + string value in the DB metadata. + """ + try: + repo = get_strategy_repository() + strategy = repo.get_strategy_by_strategy_id(self.strategy_id) + if strategy is None: + return + metadata = dict(strategy.strategy_metadata or {}) + normalized = getattr(reason, "value", reason) + if metadata.get("stop_reason") == normalized: + return + metadata["stop_reason"] = normalized + repo.upsert_strategy(strategy_id=self.strategy_id, metadata=metadata) + except Exception: + logger.warning( + "Failed to record stop reason for strategy %s", self.strategy_id + ) diff --git a/python/valuecell/agents/common/trading/base_agent.py b/python/valuecell/agents/common/trading/base_agent.py index 1df081787..1308bc591 100644 --- a/python/valuecell/agents/common/trading/base_agent.py +++ b/python/valuecell/agents/common/trading/base_agent.py @@ -10,6 +10,7 @@ from valuecell.agents.common.trading._internal.stream_controller import StreamController from valuecell.agents.common.trading.models import ( ComponentType, + StopReason, StrategyStatus, StrategyStatusContent, UserRequest, @@ -104,7 +105,7 @@ def _on_cycle_result( pass def _on_stop( - self, runtime: StrategyRuntime, request: UserRequest, reason: str + self, runtime: StrategyRuntime, request: UserRequest, reason: StopReason | str ) -> None: """Hook called before finalization when strategy stops. @@ -144,7 +145,11 @@ async def stream( return # Create runtime (calls _build_decision, _build_features_pipeline internally) - runtime = await self._create_runtime(request) + # Reuse externally supplied strategy_id if present for continuation semantics. + strategy_id_override = request.trading_config.strategy_id + runtime = await self._create_runtime( + request, strategy_id_override=strategy_id_override + ) strategy_id = runtime.strategy_id logger.info( "Created runtime for strategy_id={} conversation={} task={}", @@ -175,10 +180,17 @@ async def stream( except Exception: logger.exception("Error in _on_start hook for strategy {}", strategy_id) + stop_reason = StopReason.NORMAL_EXIT try: logger.info("Starting decision loop for strategy_id={}", strategy_id) - # Persist initial portfolio snapshot and strategy summary - controller.persist_initial_state(runtime) + # Idempotent initial state persistence: skip if already exists (resume). + if not controller.has_initial_state(): + controller.persist_initial_state(runtime) + else: + logger.info( + "Detected existing initial state; skipping duplicate persist for strategy_id={}", + strategy_id, + ) # Main decision loop while controller.is_running(): @@ -211,19 +223,19 @@ async def stream( "Strategy_id={} is no longer running, exiting decision loop", strategy_id, ) - stop_reason = "normal_exit" + stop_reason = StopReason.NORMAL_EXIT except asyncio.CancelledError: - stop_reason = "cancelled" + stop_reason = StopReason.CANCELLED logger.info("Strategy {} cancelled", strategy_id) raise except Exception as err: # noqa: BLE001 - stop_reason = "error" + stop_reason = StopReason.ERROR logger.exception("StrategyAgent stream failed: {}", err) yield streaming.message_chunk(f"StrategyAgent error: {err}") finally: # Enforce position closure on normal stop (e.g., user clicked stop) - if stop_reason == "normal_exit": + if stop_reason == StopReason.NORMAL_EXIT: try: trades = await runtime.coordinator.close_all_positions() if trades: @@ -237,7 +249,7 @@ async def stream( # However, the user intent was to stop. # Let's log it and proceed, but maybe mark status as ERROR instead of STOPPED? # For now, we stick to STOPPED but log the error clearly. - stop_reason = "error_closing_positions" + stop_reason = StopReason.ERROR_CLOSING_POSITIONS # Call user hook before finalization try: @@ -245,11 +257,22 @@ async def stream( except Exception: logger.exception("Error in _on_stop hook for strategy {}", strategy_id) - # Finalize: close resources and mark stopped + # Persist a final portfolio snapshot regardless of stop reason (best-effort) + try: + controller.persist_portfolio_snapshot(runtime) + except Exception: + logger.exception( + "Failed to persist final portfolio snapshot for strategy {}", + strategy_id, + ) + + # Finalize: close resources and mark stopped/paused/error await controller.finalize(runtime, reason=stop_reason) yield streaming.done() - async def _create_runtime(self, request: UserRequest) -> StrategyRuntime: + async def _create_runtime( + self, request: UserRequest, strategy_id_override: str | None = None + ) -> StrategyRuntime: """Create strategy runtime with custom components. Calls user hooks to build custom decision composer and features pipeline. @@ -272,5 +295,8 @@ async def _create_runtime(self, request: UserRequest) -> StrategyRuntime: # Create runtime with custom components # The runtime factory will use defaults if composer/features are None return await create_strategy_runtime( - request, composer=composer, features_pipeline=features_pipeline + request, + composer=composer, + features_pipeline=features_pipeline, + strategy_id_override=strategy_id_override, ) diff --git a/python/valuecell/agents/common/trading/data/market.py b/python/valuecell/agents/common/trading/data/market.py index ace266183..68484892c 100644 --- a/python/valuecell/agents/common/trading/data/market.py +++ b/python/valuecell/agents/common/trading/data/market.py @@ -120,7 +120,7 @@ async def _fetch(symbol: str, normalized_symbol: str) -> List[List]: exc, ) logger.debug( - f"Fetch candles for {len(candles)} symbols: {symbols}, interval: {interval}, lookback: {lookback}" + f"Fetch {len(candles)} candles symbols: {symbols}, interval: {interval}, lookback: {lookback}" ) return candles diff --git a/python/valuecell/agents/common/trading/models.py b/python/valuecell/agents/common/trading/models.py index 80cb34412..1da107f94 100644 --- a/python/valuecell/agents/common/trading/models.py +++ b/python/valuecell/agents/common/trading/models.py @@ -202,6 +202,10 @@ class TradingConfig(BaseModel): default=StrategyType.PROMPT, description="Strategy type: 'prompt based strategy' or 'grid strategy'", ) + strategy_id: Optional[str] = Field( + default=None, + description="Reuse existing strategy id to continue execution (resume semantics without extra flags)", + ) initial_capital: Optional[float] = Field( default=DEFAULT_INITIAL_CAPITAL, description="Initial capital for trading in USD", @@ -360,12 +364,28 @@ class FeatureVector(BaseModel): class StrategyStatus(str, Enum): - """High-level runtime status for strategies (for UI health dot).""" + """High-level runtime status for strategies (simplified). + + Removed legacy PAUSED and ERROR states; cancellation or errors now finalize + to STOPPED with error context stored separately (e.g., strategy_metadata). + """ RUNNING = "running" - PAUSED = "paused" STOPPED = "stopped" + + +class StopReason(str, Enum): + """Canonical stop reasons recorded in strategy metadata. + + Stored values are the enum `.value` strings so other services (DB, repos) + can compare without importing the enum if necessary, but code should + prefer using the enum when available. + """ + + NORMAL_EXIT = "normal_exit" + CANCELLED = "cancelled" ERROR = "error" + ERROR_CLOSING_POSITIONS = "error_closing_positions" class Constraints(BaseModel): diff --git a/python/valuecell/server/api/app.py b/python/valuecell/server/api/app.py index 0332c68fd..61b9c0957 100644 --- a/python/valuecell/server/api/app.py +++ b/python/valuecell/server/api/app.py @@ -7,6 +7,7 @@ from fastapi import FastAPI from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware +from loguru import logger from ...adapters.assets import get_adapter_manager from ...utils.env import ensure_system_env_dir, get_system_env_path @@ -96,48 +97,48 @@ def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): # Startup - print( + logger.info( f"ValueCell Server starting up on {settings.API_HOST}:{settings.API_PORT}..." ) # Initialize database tables try: - print("Initializing database tables...") + logger.info("Initializing database tables...") success = init_database(force=False) if success: - print("✓ Database initialized") + logger.info("✓ Database initialized") else: - print("✗ Database initialization reported failure") + logger.info("✗ Database initialization reported failure") except Exception as e: - print(f"✗ Database initialization error: {e}") + logger.info(f"✗ Database initialization error: {e}") # Initialize and configure adapters try: - print("Configuring data adapters...") + logger.info("Configuring data adapters...") manager = get_adapter_manager() # Configure Yahoo Finance (free, no API key required) try: manager.configure_yfinance() - print("✓ Yahoo Finance adapter configured") + logger.info("✓ Yahoo Finance adapter configured") except Exception as e: - print(f"✗ Yahoo Finance adapter failed: {e}") + logger.info(f"✗ Yahoo Finance adapter failed: {e}") # Configure AKShare (free, no API key required, optimized) try: manager.configure_akshare() - print("✓ AKShare adapter configured (optimized)") + logger.info("✓ AKShare adapter configured (optimized)") except Exception as e: - print(f"✗ AKShare adapter failed: {e}") + logger.info(f"✗ AKShare adapter failed: {e}") - print("Data adapters configuration completed") + logger.info("Data adapters configuration completed") except Exception as e: - print(f"Error configuring adapters: {e}") + logger.info(f"Error configuring adapters: {e}") yield # Shutdown - print("ValueCell Server shutting down...") + logger.info("ValueCell Server shutting down...") app = FastAPI( title="ValueCell Server API", @@ -239,7 +240,7 @@ async def health_check(): app.include_router(create_trading_router(), prefix=API_PREFIX) except Exception as e: - print(f"Skip trading router because of import error: {e}") + logger.info(f"Skip trading router because of import error: {e}") # For uvicorn diff --git a/python/valuecell/server/api/routers/strategy_agent.py b/python/valuecell/server/api/routers/strategy_agent.py index 8bb8ffb13..e995b8d3a 100644 --- a/python/valuecell/server/api/routers/strategy_agent.py +++ b/python/valuecell/server/api/routers/strategy_agent.py @@ -23,6 +23,7 @@ # Note: Strategy type is now part of TradingConfig in the request body. from valuecell.server.db.connection import get_db from valuecell.server.db.repositories import get_strategy_repository +from valuecell.server.services.strategy_autoresume import auto_resume_strategies from valuecell.utils.uuid import generate_conversation_id, generate_uuid @@ -32,6 +33,14 @@ def create_strategy_agent_router() -> APIRouter: router = APIRouter(prefix="/strategies", tags=["strategies"]) orchestrator = AgentOrchestrator() + @router.on_event("startup") + async def _startup_auto_resume() -> None: + """Schedule strategy auto-resume on FastAPI startup.""" + try: + await auto_resume_strategies(orchestrator) + except Exception: + logger.warning("Failed to schedule strategy auto-resume startup task") + @router.post("/create") async def create_strategy_agent( request: UserRequest, diff --git a/python/valuecell/server/db/repositories/strategy_repository.py b/python/valuecell/server/db/repositories/strategy_repository.py index 970b0694b..f212fa766 100644 --- a/python/valuecell/server/db/repositories/strategy_repository.py +++ b/python/valuecell/server/db/repositories/strategy_repository.py @@ -48,6 +48,31 @@ def get_strategy_by_strategy_id(self, strategy_id: str) -> Optional[Strategy]: if not self.db_session: session.close() + def list_strategies_by_status( + self, statuses: list[str], limit: Optional[int] = None + ) -> list[Strategy]: + """Return strategies whose status is in the provided list. + + Used by auto-resume logic to identify strategies that should be resumed + after a process restart. Best-effort: errors return empty list. + """ + if not statuses: + return [] + session = self._get_session() + try: + q = session.query(Strategy).filter(Strategy.status.in_(statuses)) + if limit: + q = q.limit(limit) + items = q.all() + for item in items: + session.expunge(item) + return items + except Exception: + return [] + finally: + if not self.db_session: + session.close() + def upsert_strategy( self, strategy_id: str, diff --git a/python/valuecell/server/services/agent_stream_service.py b/python/valuecell/server/services/agent_stream_service.py index b79419134..2e2e3976c 100644 --- a/python/valuecell/server/services/agent_stream_service.py +++ b/python/valuecell/server/services/agent_stream_service.py @@ -2,15 +2,14 @@ Agent stream service for handling streaming agent interactions. """ -import logging from typing import AsyncGenerator, Optional +from loguru import logger + from valuecell.core.coordinate.orchestrator import AgentOrchestrator from valuecell.core.types import UserInput, UserInputMetadata from valuecell.utils.uuid import generate_conversation_id -logger = logging.getLogger(__name__) - class AgentStreamService: """Service for handling streaming agent queries.""" diff --git a/python/valuecell/server/services/strategy_autoresume.py b/python/valuecell/server/services/strategy_autoresume.py new file mode 100644 index 000000000..9228fdd70 --- /dev/null +++ b/python/valuecell/server/services/strategy_autoresume.py @@ -0,0 +1,141 @@ +"""Server-side strategy auto-resume logic. + +This module scans persisted strategies with status 'running' on process +startup and dispatches them through the existing AgentOrchestrator using +their stored configuration. The core orchestrator remains unaware of +auto-resume concerns per design (separation of coordination vs runtime ops). + +Resume Semantics: + - Strategies whose status == 'running' (previous session crashed) are resumed. + - Strategies whose status == 'stopped' with metadata.stop_reason == 'cancelled' + (gracefully cancelled but intended to auto-resume) are also resumed. + - Each strategy's original config dict is parsed into a UserRequest. + - The stored strategy_id is injected into TradingConfig.strategy_id so the + underlying runtime reuses portfolio state (idempotent initial snapshot). + - Streaming responses are consumed and discarded (fire-and-forget). External + observers can implement their own hooks if needed. + +Failures during individual strategy resume are logged and skipped without +impacting other candidates. +""" + +from __future__ import annotations + +import asyncio +from typing import Optional + +from loguru import logger + +from valuecell.agents.common.trading.models import ( + StopReason, + StrategyStatus, + UserRequest, +) +from valuecell.core.coordinate.orchestrator import AgentOrchestrator +from valuecell.core.types import UserInput, UserInputMetadata +from valuecell.server.db.models.strategy import Strategy +from valuecell.server.db.repositories.strategy_repository import get_strategy_repository +from valuecell.utils.uuid import generate_conversation_id + +_AUTORESUME_STARTED = False + + +async def auto_resume_strategies( + orchestrator: AgentOrchestrator, + max_strategies: Optional[int] = None, +) -> None: + """Dispatch background resume tasks for persisted running strategies. + + Args: + orchestrator: Existing AgentOrchestrator instance. + max_strategies: Optional limit to number of strategies resumed. + """ + global _AUTORESUME_STARTED + if _AUTORESUME_STARTED: + return + _AUTORESUME_STARTED = True + + try: + repo = get_strategy_repository() + rows = repo.list_strategies_by_status( + [StrategyStatus.RUNNING.value, StrategyStatus.STOPPED.value], + limit=max_strategies, + ) + candidates = [s for s in rows if _should_resume(s)] + if not candidates: + logger.info("Auto-resume: no eligible strategies found") + return + logger.info("Auto-resume: found {} eligible strategies", len(candidates)) + # Create tasks for each resume and keep them running. We await the + # gathered tasks so that when this coroutine is run with + # `asyncio.run(...)` (background thread) the loop stays alive until + # the resumed strategies finish. When scheduled on an already-running + # loop, this will run as background tasks concurrently as well. + tasks = [asyncio.create_task(_resume_one(orchestrator, s)) for s in candidates] + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Auto-resume scan failed") + + +async def _resume_one(orchestrator: AgentOrchestrator, strategy_row: Strategy) -> None: + strategy_id = strategy_row.strategy_id + try: + config_dict = strategy_row.config or {} + metadata = strategy_row.strategy_metadata or {} + agent_name = metadata.get("agent_name") + + # Parse request; tolerate partial configs + request = UserRequest.model_validate(config_dict) + if request.trading_config.strategy_id is None and strategy_id: + request.trading_config.strategy_id = strategy_id + + user_input = UserInput( + query=request.model_dump_json(), + target_agent_name=agent_name, + meta=UserInputMetadata( + user_id=strategy_row.user_id, + conversation_id=generate_conversation_id(), + ), + ) + + async for chunk in orchestrator.process_user_input(user_input): + logger.debug("Auto-resume chunk for strategy_id={}: {}", strategy_id, chunk) + if chunk.event == "component_generator": + logger.info( + "Auto-resume dispatched strategy_id={} agent={}", + strategy_id, + agent_name, + ) + return + + except asyncio.CancelledError: + raise + except Exception: + logger.exception( + "Auto-resume failed for strategy_id={}", strategy_id or "" + ) + + +def _should_resume(strategy_row: Strategy) -> bool: + """Return True if strategy should be auto-resumed based on status/metadata.""" + status_raw = strategy_row.status or "" + metadata = strategy_row.strategy_metadata or {} + try: + status_enum = StrategyStatus(status_raw) + except Exception: + # Unknown/invalid status - skip + return False + + if status_enum == StrategyStatus.RUNNING: + return True + + if ( + status_enum == StrategyStatus.STOPPED + and metadata.get("stop_reason") == StopReason.CANCELLED.value + ): + return True + + return False diff --git a/python/valuecell/server/services/strategy_persistence.py b/python/valuecell/server/services/strategy_persistence.py index 2a05ad19c..70c6b9b0e 100644 --- a/python/valuecell/server/services/strategy_persistence.py +++ b/python/valuecell/server/services/strategy_persistence.py @@ -243,7 +243,13 @@ def persist_strategy_summary(summary: agent_models.StrategySummary) -> bool: return False existing_meta = strategy.strategy_metadata or {} - meta = {**dict(existing_meta), **summary.model_dump(exclude_none=True)} + meta = { + **dict(existing_meta), + **summary.model_dump( + exclude_none=True, + exclude={"strategy_id", "status"}, + ), + } updated = repo.upsert_strategy(strategy_id, metadata=meta) return updated is not None except Exception: From 68a697577eee400dc16d197610f8493fa579f179 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Mon, 24 Nov 2025 23:43:08 +0800 Subject: [PATCH 2/6] feat: refactor strategy agent to run decision loop in background task for proper lifecycle management --- .../agents/common/trading/base_agent.py | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/python/valuecell/agents/common/trading/base_agent.py b/python/valuecell/agents/common/trading/base_agent.py index 1308bc591..4f5af58f6 100644 --- a/python/valuecell/agents/common/trading/base_agent.py +++ b/python/valuecell/agents/common/trading/base_agent.py @@ -171,8 +171,46 @@ async def stream( component_type=ComponentType.STATUS.value, ) + # Run the remainder of the stream (decision loop and finalization) in + # a background task so the HTTP/streaming response can return immediately + # after sending the initial status. The background runner will wait for + # the persistence layer to mark the strategy as running before proceeding. + # Start background task and don't await it so HTTP responder can finish + bg_task = asyncio.create_task( + self._run_background_decision(controller, runtime) + ) + + # Add a done callback to surface exceptions to logs + def _bg_done_callback(t: asyncio.Task): + try: + t.result() + except asyncio.CancelledError: + logger.info("Background task for strategy {} cancelled", strategy_id) + except Exception as exc: + logger.exception( + "Background task for strategy {} failed: {}", strategy_id, exc + ) + + bg_task.add_done_callback(_bg_done_callback) + + # Return the initial payload and immediately close the stream + yield streaming.done() + + async def _run_background_decision( + self, + controller: StreamController, + runtime: StrategyRuntime, + ) -> None: + """Background runner for the decision loop and finalization. + + This method was extracted from the `stream()` function so it can be + referenced and tested independently, and so supervisors can cancel it + if needed. + """ # Wait until strategy is marked as running in persistence layer await controller.wait_running() + strategy_id = runtime.strategy_id + request = runtime.request # Call user hook for custom initialization try: @@ -231,8 +269,7 @@ async def stream( raise except Exception as err: # noqa: BLE001 stop_reason = StopReason.ERROR - logger.exception("StrategyAgent stream failed: {}", err) - yield streaming.message_chunk(f"StrategyAgent error: {err}") + logger.exception("StrategyAgent background run failed: {}", err) finally: # Enforce position closure on normal stop (e.g., user clicked stop) if stop_reason == StopReason.NORMAL_EXIT: @@ -268,7 +305,6 @@ async def stream( # Finalize: close resources and mark stopped/paused/error await controller.finalize(runtime, reason=stop_reason) - yield streaming.done() async def _create_runtime( self, request: UserRequest, strategy_id_override: str | None = None From 771385f9aebabacf8fb9082333e5bef9087f65ee Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 09:56:15 +0800 Subject: [PATCH 3/6] feat: refactor strategy agent methods to be asynchronous for improved performance --- .../valuecell/agents/common/trading/README.md | 22 ++++++++--------- .../agents/common/trading/base_agent.py | 24 ++++++++++--------- .../valuecell/agents/grid_agent/grid_agent.py | 6 +++-- .../agents/prompt_strategy_agent/core.py | 6 +++-- 4 files changed, 32 insertions(+), 26 deletions(-) diff --git a/python/valuecell/agents/common/trading/README.md b/python/valuecell/agents/common/trading/README.md index 2fa610683..111de7d76 100644 --- a/python/valuecell/agents/common/trading/README.md +++ b/python/valuecell/agents/common/trading/README.md @@ -297,12 +297,12 @@ class MyFeaturesPipeline(BaseFeaturesPipeline): class MyCustomAgent(BaseStrategyAgent): """Agent with custom feature computation.""" - def _build_features_pipeline( + async def _build_features_pipeline( self, request: UserRequest ) -> BaseFeaturesPipeline | None: return MyFeaturesPipeline(request) - def _create_decision_composer(self, request: UserRequest): + async def _create_decision_composer(self, request: UserRequest): # Use default LLM composer return None ``` @@ -345,11 +345,11 @@ class RuleBasedComposer(BaseComposer): class RuleBasedAgent(BaseStrategyAgent): """Agent using rule-based decisions.""" - def _build_features_pipeline(self, request: UserRequest): + async def _build_features_pipeline(self, request: UserRequest): # Use default pipeline return None - def _create_decision_composer(self, request: UserRequest): + async def _create_decision_composer(self, request: UserRequest): return RuleBasedComposer(request) ``` @@ -359,12 +359,12 @@ class RuleBasedAgent(BaseStrategyAgent): class MonitoredAgent(BaseStrategyAgent): """Agent with custom monitoring and logging.""" - def _on_start(self, runtime, request): + async def _on_start(self, runtime, request): """Called once after runtime creation.""" self.cycle_count = 0 print(f"Strategy {runtime.strategy_id} starting...") - def _on_cycle_result(self, result, runtime, request): + async def _on_cycle_result(self, result, runtime, request): """Called after each cycle completes.""" self.cycle_count += 1 print(f"Cycle {self.cycle_count}: " @@ -374,15 +374,15 @@ class MonitoredAgent(BaseStrategyAgent): # Send metrics to external monitoring # ... custom logic ... - def _on_stop(self, runtime, request, reason): + async def _on_stop(self, runtime, request, reason): """Called before finalization.""" print(f"Strategy stopping: {reason}") print(f"Total cycles: {self.cycle_count}") - def _build_features_pipeline(self, request): + async def _build_features_pipeline(self, request): return None # Use defaults - def _create_decision_composer(self, request): + async def _create_decision_composer(self, request): return None # Use defaults ``` @@ -422,11 +422,11 @@ from .features import MyFeaturesPipeline # if custom from .composer import MyComposer # if custom class MyAgent(BaseStrategyAgent): - def _build_features_pipeline(self, request: UserRequest): + async def _build_features_pipeline(self, request: UserRequest): # Return custom pipeline or None for default return MyFeaturesPipeline(request) - def _create_decision_composer(self, request: UserRequest): + async def _create_decision_composer(self, request: UserRequest): # Return custom composer or None for default return MyComposer(request) ``` diff --git a/python/valuecell/agents/common/trading/base_agent.py b/python/valuecell/agents/common/trading/base_agent.py index 4f5af58f6..c6335389b 100644 --- a/python/valuecell/agents/common/trading/base_agent.py +++ b/python/valuecell/agents/common/trading/base_agent.py @@ -23,7 +23,7 @@ DecisionCycleResult, StrategyRuntime, ) - from valuecell.agents.common.trading.decision import Composer + from valuecell.agents.common.trading.decision import BaseComposer from valuecell.agents.common.trading.features.interfaces import BaseFeaturesPipeline @@ -44,7 +44,7 @@ class BaseStrategyAgent(BaseAgent, ABC): """ @abstractmethod - def _build_features_pipeline( + async def _build_features_pipeline( self, request: UserRequest ) -> BaseFeaturesPipeline | None: """Build the features pipeline for the strategy. @@ -61,7 +61,9 @@ def _build_features_pipeline( """ raise NotImplementedError - def _create_decision_composer(self, request: UserRequest) -> Composer | None: + async def _create_decision_composer( + self, request: UserRequest + ) -> BaseComposer | None: """Build the decision composer for the strategy. Override to provide a custom composer. Return None to use default LLM composer. @@ -74,7 +76,7 @@ def _create_decision_composer(self, request: UserRequest) -> Composer | None: """ return None - def _on_start(self, runtime: StrategyRuntime, request: UserRequest) -> None: + async def _on_start(self, runtime: StrategyRuntime, request: UserRequest) -> None: """Hook called after runtime creation, before first cycle. Use for custom initialization, caching, or metric registration. @@ -86,7 +88,7 @@ def _on_start(self, runtime: StrategyRuntime, request: UserRequest) -> None: """ pass - def _on_cycle_result( + async def _on_cycle_result( self, result: DecisionCycleResult, runtime: StrategyRuntime, @@ -104,7 +106,7 @@ def _on_cycle_result( """ pass - def _on_stop( + async def _on_stop( self, runtime: StrategyRuntime, request: UserRequest, reason: StopReason | str ) -> None: """Hook called before finalization when strategy stops. @@ -214,7 +216,7 @@ async def _run_background_decision( # Call user hook for custom initialization try: - self._on_start(runtime, request) + await self._on_start(runtime, request) except Exception: logger.exception("Error in _on_start hook for strategy {}", strategy_id) @@ -244,7 +246,7 @@ async def _run_background_decision( # Call user hook for post-cycle logic try: - self._on_cycle_result(result, runtime, request) + await self._on_cycle_result(result, runtime, request) except Exception: logger.exception( "Error in _on_cycle_result hook for strategy {}", strategy_id @@ -290,7 +292,7 @@ async def _run_background_decision( # Call user hook before finalization try: - self._on_stop(runtime, request, stop_reason) + await self._on_stop(runtime, request, stop_reason) except Exception: logger.exception("Error in _on_stop hook for strategy {}", strategy_id) @@ -321,12 +323,12 @@ async def _create_runtime( StrategyRuntime instance """ # Let user build custom composer (or None for default) - composer = self._create_decision_composer(request) + composer = await self._create_decision_composer(request) # Let user build custom features pipeline (or None for default) # The coordinator invokes this pipeline each cycle to fetch data # and compute the feature vectors consumed by the decision step. - features_pipeline = self._build_features_pipeline(request) + features_pipeline = await self._build_features_pipeline(request) # Create runtime with custom components # The runtime factory will use defaults if composer/features are None diff --git a/python/valuecell/agents/grid_agent/grid_agent.py b/python/valuecell/agents/grid_agent/grid_agent.py index 929fda8d8..d8d0ad665 100644 --- a/python/valuecell/agents/grid_agent/grid_agent.py +++ b/python/valuecell/agents/grid_agent/grid_agent.py @@ -30,12 +30,14 @@ class GridStrategyAgent(BaseStrategyAgent): add long on down moves; reduce on reversals. """ - def _build_features_pipeline( + async def _build_features_pipeline( self, request: UserRequest ) -> BaseFeaturesPipeline | None: return DefaultFeaturesPipeline.from_request(request) - def _create_decision_composer(self, request: UserRequest) -> BaseComposer | None: + async def _create_decision_composer( + self, request: UserRequest + ) -> BaseComposer | None: # Adjust step_pct / max_steps / base_fraction as needed return GridComposer( request=request, diff --git a/python/valuecell/agents/prompt_strategy_agent/core.py b/python/valuecell/agents/prompt_strategy_agent/core.py index e25c0fcfd..8f844c2e3 100644 --- a/python/valuecell/agents/prompt_strategy_agent/core.py +++ b/python/valuecell/agents/prompt_strategy_agent/core.py @@ -38,14 +38,16 @@ def _build_features_pipeline(self, request): return MyCustomPipeline(request) """ - def _build_features_pipeline( + async def _build_features_pipeline( self, request: UserRequest ) -> BaseFeaturesPipeline | None: """Use the default features pipeline built from the user request.""" return DefaultFeaturesPipeline.from_request(request) - def _create_decision_composer(self, request: UserRequest) -> BaseComposer | None: + async def _create_decision_composer( + self, request: UserRequest + ) -> BaseComposer | None: """Use default LLM-based composer.""" return LlmComposer(request=request) From 79582a9af06d3985f045526f2219700ba01e7ccd Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 10:07:50 +0800 Subject: [PATCH 4/6] feat: enhance auto-resume logic by integrating strategy status updates --- .../valuecell/server/services/strategy_autoresume.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/valuecell/server/services/strategy_autoresume.py b/python/valuecell/server/services/strategy_autoresume.py index 9228fdd70..0636095b4 100644 --- a/python/valuecell/server/services/strategy_autoresume.py +++ b/python/valuecell/server/services/strategy_autoresume.py @@ -29,12 +29,14 @@ from valuecell.agents.common.trading.models import ( StopReason, StrategyStatus, + StrategyStatusContent, UserRequest, ) from valuecell.core.coordinate.orchestrator import AgentOrchestrator -from valuecell.core.types import UserInput, UserInputMetadata +from valuecell.core.types import CommonResponseEvent, UserInput, UserInputMetadata from valuecell.server.db.models.strategy import Strategy from valuecell.server.db.repositories.strategy_repository import get_strategy_repository +from valuecell.server.services import strategy_persistence from valuecell.utils.uuid import generate_conversation_id _AUTORESUME_STARTED = False @@ -103,12 +105,18 @@ async def _resume_one(orchestrator: AgentOrchestrator, strategy_row: Strategy) - async for chunk in orchestrator.process_user_input(user_input): logger.debug("Auto-resume chunk for strategy_id={}: {}", strategy_id, chunk) - if chunk.event == "component_generator": + if chunk.event == CommonResponseEvent.COMPONENT_GENERATOR: logger.info( "Auto-resume dispatched strategy_id={} agent={}", strategy_id, agent_name, ) + status_content = StrategyStatusContent.model_validate_json( + chunk.data.payload.content + ) + strategy_persistence.set_strategy_status( + strategy_id, status_content.status.value + ) return except asyncio.CancelledError: From 58648223786c429060f8f2a2c0996d87a567b26b Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 10:40:14 +0800 Subject: [PATCH 5/6] feat: add initial capital override for strategy runtime and implement portfolio snapshot retrieval --- .../common/trading/_internal/runtime.py | 5 ++- .../trading/_internal/stream_controller.py | 17 +++++++++ .../agents/common/trading/base_agent.py | 35 ++++++++++++++----- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/python/valuecell/agents/common/trading/_internal/runtime.py b/python/valuecell/agents/common/trading/_internal/runtime.py index d556d16ba..655cb0fe8 100644 --- a/python/valuecell/agents/common/trading/_internal/runtime.py +++ b/python/valuecell/agents/common/trading/_internal/runtime.py @@ -65,6 +65,7 @@ async def create_strategy_runtime( composer: Optional[BaseComposer] = None, features_pipeline: Optional[BaseFeaturesPipeline] = None, strategy_id_override: Optional[str] = None, + initial_capital_override: Optional[float] = None, ) -> StrategyRuntime: """Create a strategy runtime with async initialization (supports both paper and live trading). @@ -110,7 +111,9 @@ async def create_strategy_runtime( # Create strategy runtime components strategy_id = strategy_id_override or generate_uuid("strategy") - initial_capital = request.trading_config.initial_capital or 0.0 + initial_capital = ( + initial_capital_override or request.trading_config.initial_capital or 0.0 + ) constraints = Constraints( max_positions=request.trading_config.max_positions, max_leverage=request.trading_config.max_leverage, diff --git a/python/valuecell/agents/common/trading/_internal/stream_controller.py b/python/valuecell/agents/common/trading/_internal/stream_controller.py index 7c866f660..00c5f3dde 100644 --- a/python/valuecell/agents/common/trading/_internal/stream_controller.py +++ b/python/valuecell/agents/common/trading/_internal/stream_controller.py @@ -140,6 +140,23 @@ def has_initial_state(self) -> bool: ) return False + def get_latest_portfolio_snapshot(self): + """Return the latest stored portfolio snapshot or None. + + This is a convenience wrapper around the repository call so callers + can inspect persisted initial state (for resume semantics). + """ + try: + repo = get_strategy_repository() + snap = repo.get_latest_portfolio_snapshot(self.strategy_id) + return snap + except Exception: + logger.warning( + "Failed to fetch latest portfolio snapshot for strategy {}", + self.strategy_id, + ) + return None + def persist_cycle_results(self, result: DecisionCycleResult) -> None: """Persist trades, portfolio view, and strategy summary for a cycle. diff --git a/python/valuecell/agents/common/trading/base_agent.py b/python/valuecell/agents/common/trading/base_agent.py index c6335389b..159e9548b 100644 --- a/python/valuecell/agents/common/trading/base_agent.py +++ b/python/valuecell/agents/common/trading/base_agent.py @@ -8,6 +8,7 @@ from valuecell.agents.common.trading._internal.runtime import create_strategy_runtime from valuecell.agents.common.trading._internal.stream_controller import StreamController +from valuecell.server.db.repositories.strategy_repository import get_strategy_repository from valuecell.agents.common.trading.models import ( ComponentType, StopReason, @@ -223,14 +224,8 @@ async def _run_background_decision( stop_reason = StopReason.NORMAL_EXIT try: logger.info("Starting decision loop for strategy_id={}", strategy_id) - # Idempotent initial state persistence: skip if already exists (resume). - if not controller.has_initial_state(): - controller.persist_initial_state(runtime) - else: - logger.info( - "Detected existing initial state; skipping duplicate persist for strategy_id={}", - strategy_id, - ) + # Always attempt to persist an initial state (idempotent write). + controller.persist_initial_state(runtime) # Main decision loop while controller.is_running(): @@ -322,6 +317,29 @@ async def _create_runtime( Returns: StrategyRuntime instance """ + # If a strategy id override is provided (resume case), try to + # initialize the request's initial_capital from the persisted + # portfolio snapshot so the runtime's portfolio service will be + # constructed with the persisted equity. + initial_capital_override = None + if strategy_id_override: + try: + repo = get_strategy_repository() + snap = repo.get_latest_portfolio_snapshot(strategy_id_override) + if snap is not None: + initial_capital_override = float( + snap.total_value or snap.cash or 0.0 + ) + logger.info( + "Initialized request.trading_config.initial_capital from persisted snapshot for strategy_id={}", + strategy_id_override, + ) + except Exception: + logger.exception( + "Failed to initialize initial_capital from persisted snapshot for strategy_id={}", + strategy_id_override, + ) + # Let user build custom composer (or None for default) composer = await self._create_decision_composer(request) @@ -337,4 +355,5 @@ async def _create_runtime( composer=composer, features_pipeline=features_pipeline, strategy_id_override=strategy_id_override, + initial_capital_override=initial_capital_override, ) From f067e7620aea4522885b0cbfe5f27ecd89534fe9 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Tue, 25 Nov 2025 10:42:14 +0800 Subject: [PATCH 6/6] make format --- python/valuecell/agents/common/trading/base_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/valuecell/agents/common/trading/base_agent.py b/python/valuecell/agents/common/trading/base_agent.py index 159e9548b..a7aed98b8 100644 --- a/python/valuecell/agents/common/trading/base_agent.py +++ b/python/valuecell/agents/common/trading/base_agent.py @@ -8,7 +8,6 @@ from valuecell.agents.common.trading._internal.runtime import create_strategy_runtime from valuecell.agents.common.trading._internal.stream_controller import StreamController -from valuecell.server.db.repositories.strategy_repository import get_strategy_repository from valuecell.agents.common.trading.models import ( ComponentType, StopReason, @@ -18,6 +17,7 @@ ) from valuecell.core.agent.responses import streaming from valuecell.core.types import BaseAgent, StreamResponse +from valuecell.server.db.repositories.strategy_repository import get_strategy_repository if TYPE_CHECKING: from valuecell.agents.common.trading._internal.runtime import (