Skip to content
5 changes: 2 additions & 3 deletions python/valuecell/adapters/models/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
4. Supports fallback providers for reliability
"""

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

from valuecell.config.manager import ConfigManager, ProviderConfig, get_config_manager
from loguru import logger

logger = logging.getLogger(__name__)
from valuecell.config.manager import ConfigManager, ProviderConfig, get_config_manager


class ModelProvider(ABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ async def finalize(
self,
runtime: StrategyRuntime,
reason: agent_models.StopReason | str = agent_models.StopReason.NORMAL_EXIT,
reason_detail: str | None = None,
) -> None:
"""Finalize strategy: close resources and mark as stopped.

Expand Down Expand Up @@ -286,7 +287,7 @@ async def finalize(
self.strategy_id,
getattr(reason, "value", reason),
)
self._record_stop_reason(reason)
self._record_stop_reason(reason, reason_detail)

def is_running(self) -> bool:
"""Check if strategy is still running according to persistence layer."""
Expand Down Expand Up @@ -318,7 +319,9 @@ def persist_trades(self, trades: list) -> None:
"Error persisting ad-hoc trades for strategy {}", self.strategy_id
)

def _record_stop_reason(self, reason: agent_models.StopReason | str) -> None:
def _record_stop_reason(
self, reason: agent_models.StopReason | str, reason_detail: str | None = None
) -> None:
"""Persist last stop reason inside strategy metadata for resume decisions.

Accept either a StopReason enum or a raw string; store the normalized
Expand All @@ -330,10 +333,11 @@ def _record_stop_reason(self, reason: agent_models.StopReason | str) -> None:
if strategy is None:
return
metadata = dict(strategy.strategy_metadata or {})
normalized = getattr(reason, "value", reason)
if metadata.get("stop_reason") == normalized:
return
metadata["stop_reason"] = normalized
metadata["stop_reason"] = getattr(reason, "value", reason)
if reason_detail is not None:
metadata["stop_reason_detail"] = reason_detail
else:
metadata.pop("stop_reason_detail", None)
repo.upsert_strategy(strategy_id=self.strategy_id, metadata=metadata)
except Exception:
logger.warning(
Expand Down
53 changes: 46 additions & 7 deletions python/valuecell/agents/common/trading/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from valuecell.core.agent.responses import streaming
from valuecell.core.types import BaseAgent, StreamResponse
from valuecell.server.db.repositories.strategy_repository import get_strategy_repository
from valuecell.utils import generate_uuid

if TYPE_CHECKING:
from valuecell.agents.common.trading._internal.runtime import (
Expand Down Expand Up @@ -141,18 +142,45 @@ async def stream(
# Parse and validate request
try:
request = UserRequest.model_validate_json(query)
except ValueError as exc:
except Exception as exc:
logger.exception("StrategyAgent received invalid payload")
yield streaming.message_chunk(str(exc))
# Emit structured status with error reason then close the stream
status_payload = StrategyStatusContent(
strategy_id=generate_uuid("invalid-strategy"),
status=StrategyStatus.STOPPED,
stop_reason=StopReason.ERROR,
stop_reason_detail=str(exc),
)
yield streaming.component_generator(
content=status_payload.model_dump_json(),
component_type=ComponentType.STATUS.value,
)
yield streaming.done()
return

# Create runtime (calls _build_decision, _build_features_pipeline internally)
# Reuse externally supplied strategy_id if present for continuation semantics.
strategy_id_override = request.trading_config.strategy_id
runtime = await self._create_runtime(
request, strategy_id_override=strategy_id_override
)
try:
runtime = await self._create_runtime(
request, strategy_id_override=strategy_id_override
)
except Exception as exc:
# Runtime creation failed — surface structured status and close the stream
logger.exception("Failed to create strategy runtime: {}", exc)
status_payload = StrategyStatusContent(
strategy_id=strategy_id_override or generate_uuid("invalid-strategy"),
status=StrategyStatus.STOPPED,
stop_reason=StopReason.ERROR,
stop_reason_detail=str(exc),
)
yield streaming.component_generator(
content=status_payload.model_dump_json(),
component_type=ComponentType.STATUS.value,
)
yield streaming.done()
return

strategy_id = runtime.strategy_id
logger.info(
"Created runtime for strategy_id={} conversation={} task={}",
Expand Down Expand Up @@ -222,6 +250,7 @@ async def _run_background_decision(
logger.exception("Error in _on_start hook for strategy {}", strategy_id)

stop_reason = StopReason.NORMAL_EXIT
stop_reason_detail: Optional[str] = None
try:
logger.info("Starting decision loop for strategy_id={}", strategy_id)
# Always attempt to persist an initial state (idempotent write).
Expand Down Expand Up @@ -252,7 +281,14 @@ async def _run_background_decision(
strategy_id,
request.trading_config.decide_interval,
)
await asyncio.sleep(request.trading_config.decide_interval)

# Sleep in 1s increments so we can react to controller stop
# and to cancellation promptly instead of blocking for the
# whole interval at once.
for _ in range(request.trading_config.decide_interval):
if not controller.is_running():
break
await asyncio.sleep(1)

logger.info(
"Strategy_id={} is no longer running, exiting decision loop",
Expand All @@ -267,6 +303,7 @@ async def _run_background_decision(
except Exception as err: # noqa: BLE001
stop_reason = StopReason.ERROR
logger.exception("StrategyAgent background run failed: {}", err)
stop_reason_detail = str(err)
finally:
# Enforce position closure on normal stop (e.g., user clicked stop)
if stop_reason == StopReason.NORMAL_EXIT:
Expand Down Expand Up @@ -301,7 +338,9 @@ async def _run_background_decision(
)

# Finalize: close resources and mark stopped/paused/error
await controller.finalize(runtime, reason=stop_reason)
await controller.finalize(
runtime, reason=stop_reason, reason_detail=stop_reason_detail
)

async def _create_runtime(
self, request: UserRequest, strategy_id_override: str | None = None
Expand Down
8 changes: 8 additions & 0 deletions python/valuecell/agents/common/trading/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,14 @@ class StrategyStatusContent(BaseModel):

strategy_id: str
status: StrategyStatus
# Optional stop reason and human-readable detail for terminal states/errors
stop_reason: Optional[StopReason] = Field(
default=None, description="Canonical stop reason for the strategy"
)
stop_reason_detail: Optional[str] = Field(
default=None,
description="Optional human-readable detail about stop reason or error",
)


class ComposeResult(BaseModel):
Expand Down
4 changes: 1 addition & 3 deletions python/valuecell/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@
config = loader.load_agent_config("research_agent")
"""

import logging
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional

import yaml
from loguru import logger

from .constants import CONFIG_DIR

logger = logging.getLogger(__name__)


class ConfigLoader:
"""
Expand Down
13 changes: 12 additions & 1 deletion python/valuecell/server/api/routers/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,22 @@ def normalize_strategy_type(
for s in strategies:
meta = s.strategy_metadata or {}
cfg = s.config or {}
status = map_status(s.status)
stop_reason_display = ""
if status == "stopped":
stop_reason = meta.get("stop_reason")
stop_reason_detail = meta.get("stop_reason_detail")
stop_reason_display = (
f"{'(' + stop_reason + ')' if stop_reason else ''}"
f"{stop_reason_detail if stop_reason_detail else ''}".strip()
) or "..."

item = StrategySummaryData(
strategy_id=s.strategy_id,
strategy_name=s.name,
strategy_type=normalize_strategy_type(meta, cfg),
status=map_status(s.status),
status=status,
stop_reason=stop_reason_display,
trading_mode=normalize_trading_mode(meta, cfg),
unrealized_pnl=to_optional_float(meta.get("unrealized_pnl", 0.0)),
unrealized_pnl_pct=to_optional_float(
Expand Down
32 changes: 17 additions & 15 deletions python/valuecell/server/api/routers/strategy_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,22 @@ async def create_strategy_agent(
"model_provider": request.llm_model_config.provider,
"model_id": request.llm_model_config.model_id,
"exchange_id": request.exchange_config.exchange_id,
"trading_mode": (
request.exchange_config.trading_mode.value
if hasattr(
request.exchange_config.trading_mode, "value"
)
else str(request.exchange_config.trading_mode)
),
"trading_mode": request.exchange_config.trading_mode.value,
}
status_value = (
status_content.status.value
if hasattr(status_content.status, "value")
else str(status_content.status)
)
status = status_content.status
if status == StrategyStatus.STOPPED:
metadata["stop_reason"] = (
status_content.stop_reason.value
)
metadata["stop_reason_detail"] = (
status_content.stop_reason_detail
)
repo.upsert_strategy(
strategy_id=status_content.strategy_id,
name=name,
description=None,
user_id=user_input_meta.user_id,
status=status_value,
status=status.value,
config=request.model_dump(),
metadata=metadata,
)
Expand Down Expand Up @@ -214,6 +211,8 @@ async def create_strategy_agent(
else str(request.exchange_config.trading_mode)
),
"fallback": True,
"stop_reason": "error",
"stop_reason_detail": "No status event from orchestrator",
}
repo.upsert_strategy(
strategy_id=fallback_strategy_id,
Expand All @@ -230,7 +229,7 @@ async def create_strategy_agent(
return StrategyStatusContent(
strategy_id=fallback_strategy_id, status="stopped"
)
except Exception:
except Exception as exc:
# Orchestrator failed; fallback to direct DB creation
fallback_strategy_id = generate_uuid("strategy")
try:
Expand All @@ -250,6 +249,8 @@ async def create_strategy_agent(
else str(request.exchange_config.trading_mode)
),
"fallback": True,
"stop_reason": "error",
"stop_reason_detail": str(exc),
}
repo.upsert_strategy(
strategy_id=fallback_strategy_id,
Expand Down Expand Up @@ -289,7 +290,8 @@ async def create_strategy_agent(
else str(request.exchange_config.trading_mode)
),
"fallback": True,
"error": str(e),
"stop_reason": "error",
"stop_reason_detail": str(e),
}
repo.upsert_strategy(
strategy_id=fallback_strategy_id,
Expand Down
1 change: 1 addition & 0 deletions python/valuecell/server/api/schemas/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StrategySummaryData(BaseModel):
description="Strategy type identifier: 'prompt based strategy' or 'grid strategy'",
)
status: Literal["running", "stopped"] = Field(..., description="Strategy status")
stop_reason: Optional[str] = Field(None, description="Reason for strategy stop")
trading_mode: Optional[Literal["live", "virtual"]] = Field(
None, description="Trading mode: live or virtual"
)
Expand Down