diff --git a/python/valuecell/adapters/models/factory.py b/python/valuecell/adapters/models/factory.py index d0a30ac68..11c88965a 100644 --- a/python/valuecell/adapters/models/factory.py +++ b/python/valuecell/adapters/models/factory.py @@ -8,13 +8,12 @@ 4. Supports fallback providers for reliability """ -import logging from abc import ABC, abstractmethod from typing import Any, Dict, Optional -from valuecell.config.manager import ConfigManager, ProviderConfig, get_config_manager +from loguru import logger -logger = logging.getLogger(__name__) +from valuecell.config.manager import ConfigManager, ProviderConfig, get_config_manager class ModelProvider(ABC): diff --git a/python/valuecell/agents/common/trading/_internal/stream_controller.py b/python/valuecell/agents/common/trading/_internal/stream_controller.py index 8a364b5c5..c623e7c97 100644 --- a/python/valuecell/agents/common/trading/_internal/stream_controller.py +++ b/python/valuecell/agents/common/trading/_internal/stream_controller.py @@ -244,6 +244,7 @@ async def finalize( self, runtime: StrategyRuntime, reason: agent_models.StopReason | str = agent_models.StopReason.NORMAL_EXIT, + reason_detail: str | None = None, ) -> None: """Finalize strategy: close resources and mark as stopped. @@ -286,7 +287,7 @@ async def finalize( self.strategy_id, getattr(reason, "value", reason), ) - self._record_stop_reason(reason) + self._record_stop_reason(reason, reason_detail) def is_running(self) -> bool: """Check if strategy is still running according to persistence layer.""" @@ -318,7 +319,9 @@ def persist_trades(self, trades: list) -> None: "Error persisting ad-hoc trades for strategy {}", self.strategy_id ) - def _record_stop_reason(self, reason: agent_models.StopReason | str) -> None: + def _record_stop_reason( + self, reason: agent_models.StopReason | str, reason_detail: str | None = None + ) -> None: """Persist last stop reason inside strategy metadata for resume decisions. Accept either a StopReason enum or a raw string; store the normalized @@ -330,10 +333,11 @@ def _record_stop_reason(self, reason: agent_models.StopReason | str) -> None: if strategy is None: return metadata = dict(strategy.strategy_metadata or {}) - normalized = getattr(reason, "value", reason) - if metadata.get("stop_reason") == normalized: - return - metadata["stop_reason"] = normalized + metadata["stop_reason"] = getattr(reason, "value", reason) + if reason_detail is not None: + metadata["stop_reason_detail"] = reason_detail + else: + metadata.pop("stop_reason_detail", None) repo.upsert_strategy(strategy_id=self.strategy_id, metadata=metadata) except Exception: logger.warning( diff --git a/python/valuecell/agents/common/trading/base_agent.py b/python/valuecell/agents/common/trading/base_agent.py index a7aed98b8..0ccb94162 100644 --- a/python/valuecell/agents/common/trading/base_agent.py +++ b/python/valuecell/agents/common/trading/base_agent.py @@ -18,6 +18,7 @@ from valuecell.core.agent.responses import streaming from valuecell.core.types import BaseAgent, StreamResponse from valuecell.server.db.repositories.strategy_repository import get_strategy_repository +from valuecell.utils import generate_uuid if TYPE_CHECKING: from valuecell.agents.common.trading._internal.runtime import ( @@ -141,18 +142,45 @@ async def stream( # Parse and validate request try: request = UserRequest.model_validate_json(query) - except ValueError as exc: + except Exception as exc: logger.exception("StrategyAgent received invalid payload") - yield streaming.message_chunk(str(exc)) + # Emit structured status with error reason then close the stream + status_payload = StrategyStatusContent( + strategy_id=generate_uuid("invalid-strategy"), + status=StrategyStatus.STOPPED, + stop_reason=StopReason.ERROR, + stop_reason_detail=str(exc), + ) + yield streaming.component_generator( + content=status_payload.model_dump_json(), + component_type=ComponentType.STATUS.value, + ) yield streaming.done() return # Create runtime (calls _build_decision, _build_features_pipeline internally) # Reuse externally supplied strategy_id if present for continuation semantics. strategy_id_override = request.trading_config.strategy_id - runtime = await self._create_runtime( - request, strategy_id_override=strategy_id_override - ) + try: + runtime = await self._create_runtime( + request, strategy_id_override=strategy_id_override + ) + except Exception as exc: + # Runtime creation failed — surface structured status and close the stream + logger.exception("Failed to create strategy runtime: {}", exc) + status_payload = StrategyStatusContent( + strategy_id=strategy_id_override or generate_uuid("invalid-strategy"), + status=StrategyStatus.STOPPED, + stop_reason=StopReason.ERROR, + stop_reason_detail=str(exc), + ) + yield streaming.component_generator( + content=status_payload.model_dump_json(), + component_type=ComponentType.STATUS.value, + ) + yield streaming.done() + return + strategy_id = runtime.strategy_id logger.info( "Created runtime for strategy_id={} conversation={} task={}", @@ -222,6 +250,7 @@ async def _run_background_decision( logger.exception("Error in _on_start hook for strategy {}", strategy_id) stop_reason = StopReason.NORMAL_EXIT + stop_reason_detail: Optional[str] = None try: logger.info("Starting decision loop for strategy_id={}", strategy_id) # Always attempt to persist an initial state (idempotent write). @@ -252,7 +281,14 @@ async def _run_background_decision( strategy_id, request.trading_config.decide_interval, ) - await asyncio.sleep(request.trading_config.decide_interval) + + # Sleep in 1s increments so we can react to controller stop + # and to cancellation promptly instead of blocking for the + # whole interval at once. + for _ in range(request.trading_config.decide_interval): + if not controller.is_running(): + break + await asyncio.sleep(1) logger.info( "Strategy_id={} is no longer running, exiting decision loop", @@ -267,6 +303,7 @@ async def _run_background_decision( except Exception as err: # noqa: BLE001 stop_reason = StopReason.ERROR logger.exception("StrategyAgent background run failed: {}", err) + stop_reason_detail = str(err) finally: # Enforce position closure on normal stop (e.g., user clicked stop) if stop_reason == StopReason.NORMAL_EXIT: @@ -301,7 +338,9 @@ async def _run_background_decision( ) # Finalize: close resources and mark stopped/paused/error - await controller.finalize(runtime, reason=stop_reason) + await controller.finalize( + runtime, reason=stop_reason, reason_detail=stop_reason_detail + ) async def _create_runtime( self, request: UserRequest, strategy_id_override: str | None = None diff --git a/python/valuecell/agents/common/trading/models.py b/python/valuecell/agents/common/trading/models.py index 1d378c0ee..057438d6e 100644 --- a/python/valuecell/agents/common/trading/models.py +++ b/python/valuecell/agents/common/trading/models.py @@ -876,6 +876,14 @@ class StrategyStatusContent(BaseModel): strategy_id: str status: StrategyStatus + # Optional stop reason and human-readable detail for terminal states/errors + stop_reason: Optional[StopReason] = Field( + default=None, description="Canonical stop reason for the strategy" + ) + stop_reason_detail: Optional[str] = Field( + default=None, + description="Optional human-readable detail about stop reason or error", + ) class ComposeResult(BaseModel): diff --git a/python/valuecell/config/loader.py b/python/valuecell/config/loader.py index d0dba2f5a..b8b146155 100644 --- a/python/valuecell/config/loader.py +++ b/python/valuecell/config/loader.py @@ -24,18 +24,16 @@ config = loader.load_agent_config("research_agent") """ -import logging import os import re from pathlib import Path from typing import Any, Dict, List, Optional import yaml +from loguru import logger from .constants import CONFIG_DIR -logger = logging.getLogger(__name__) - class ConfigLoader: """ diff --git a/python/valuecell/server/api/routers/strategy.py b/python/valuecell/server/api/routers/strategy.py index d296d0100..57cf4a4e9 100644 --- a/python/valuecell/server/api/routers/strategy.py +++ b/python/valuecell/server/api/routers/strategy.py @@ -149,11 +149,22 @@ def normalize_strategy_type( for s in strategies: meta = s.strategy_metadata or {} cfg = s.config or {} + status = map_status(s.status) + stop_reason_display = "" + if status == "stopped": + stop_reason = meta.get("stop_reason") + stop_reason_detail = meta.get("stop_reason_detail") + stop_reason_display = ( + f"{'(' + stop_reason + ')' if stop_reason else ''}" + f"{stop_reason_detail if stop_reason_detail else ''}".strip() + ) or "..." + item = StrategySummaryData( strategy_id=s.strategy_id, strategy_name=s.name, strategy_type=normalize_strategy_type(meta, cfg), - status=map_status(s.status), + status=status, + stop_reason=stop_reason_display, trading_mode=normalize_trading_mode(meta, cfg), unrealized_pnl=to_optional_float(meta.get("unrealized_pnl", 0.0)), unrealized_pnl_pct=to_optional_float( diff --git a/python/valuecell/server/api/routers/strategy_agent.py b/python/valuecell/server/api/routers/strategy_agent.py index e995b8d3a..48142b8c4 100644 --- a/python/valuecell/server/api/routers/strategy_agent.py +++ b/python/valuecell/server/api/routers/strategy_agent.py @@ -167,25 +167,22 @@ async def create_strategy_agent( "model_provider": request.llm_model_config.provider, "model_id": request.llm_model_config.model_id, "exchange_id": request.exchange_config.exchange_id, - "trading_mode": ( - request.exchange_config.trading_mode.value - if hasattr( - request.exchange_config.trading_mode, "value" - ) - else str(request.exchange_config.trading_mode) - ), + "trading_mode": request.exchange_config.trading_mode.value, } - status_value = ( - status_content.status.value - if hasattr(status_content.status, "value") - else str(status_content.status) - ) + status = status_content.status + if status == StrategyStatus.STOPPED: + metadata["stop_reason"] = ( + status_content.stop_reason.value + ) + metadata["stop_reason_detail"] = ( + status_content.stop_reason_detail + ) repo.upsert_strategy( strategy_id=status_content.strategy_id, name=name, description=None, user_id=user_input_meta.user_id, - status=status_value, + status=status.value, config=request.model_dump(), metadata=metadata, ) @@ -214,6 +211,8 @@ async def create_strategy_agent( else str(request.exchange_config.trading_mode) ), "fallback": True, + "stop_reason": "error", + "stop_reason_detail": "No status event from orchestrator", } repo.upsert_strategy( strategy_id=fallback_strategy_id, @@ -230,7 +229,7 @@ async def create_strategy_agent( return StrategyStatusContent( strategy_id=fallback_strategy_id, status="stopped" ) - except Exception: + except Exception as exc: # Orchestrator failed; fallback to direct DB creation fallback_strategy_id = generate_uuid("strategy") try: @@ -250,6 +249,8 @@ async def create_strategy_agent( else str(request.exchange_config.trading_mode) ), "fallback": True, + "stop_reason": "error", + "stop_reason_detail": str(exc), } repo.upsert_strategy( strategy_id=fallback_strategy_id, @@ -289,7 +290,8 @@ async def create_strategy_agent( else str(request.exchange_config.trading_mode) ), "fallback": True, - "error": str(e), + "stop_reason": "error", + "stop_reason_detail": str(e), } repo.upsert_strategy( strategy_id=fallback_strategy_id, diff --git a/python/valuecell/server/api/schemas/strategy.py b/python/valuecell/server/api/schemas/strategy.py index a8ac4f9d3..845939c62 100644 --- a/python/valuecell/server/api/schemas/strategy.py +++ b/python/valuecell/server/api/schemas/strategy.py @@ -28,6 +28,7 @@ class StrategySummaryData(BaseModel): description="Strategy type identifier: 'prompt based strategy' or 'grid strategy'", ) status: Literal["running", "stopped"] = Field(..., description="Strategy status") + stop_reason: Optional[str] = Field(None, description="Reason for strategy stop") trading_mode: Optional[Literal["live", "virtual"]] = Field( None, description="Trading mode: live or virtual" )