diff --git a/python/valuecell/agents/common/trading/_internal/coordinator.py b/python/valuecell/agents/common/trading/_internal/coordinator.py index 08d43f4ea..d3597d19d 100644 --- a/python/valuecell/agents/common/trading/_internal/coordinator.py +++ b/python/valuecell/agents/common/trading/_internal/coordinator.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import List +from typing import List, Optional from loguru import logger @@ -99,7 +99,7 @@ def __init__( self._symbols = list(dict.fromkeys(request.trading_config.symbols)) self._realized_pnl: float = 0.0 self._unrealized_pnl: float = 0.0 - self._cycle_index: int = 0 + self.cycle_index: int = 0 self._strategy_name = request.trading_config.strategy_name or strategy_id async def run_once(self) -> DecisionCycleResult: @@ -174,8 +174,8 @@ async def run_once(self) -> DecisionCycleResult: logger.info( f" ExecutionGateway type: {type(self._execution_gateway).__name__}" ) - tx_results = await self._execution_gateway.execute( - instructions, market_features + tx_results = await self.execute_instructions( + instructions, market_features=market_features ) logger.info(f"✅ ExecutionGateway returned {len(tx_results)} results") @@ -221,13 +221,13 @@ async def run_once(self) -> DecisionCycleResult: self._history_recorder.record(record) digest = self._digest_builder.build(self._history_recorder.get_records()) - self._cycle_index += 1 + self.cycle_index += 1 portfolio = self.portfolio_service.get_view() return DecisionCycleResult( compose_id=compose_id, timestamp_ms=timestamp_ms, - cycle_index=self._cycle_index, + cycle_index=self.cycle_index, rationale=rationale, strategy_summary=summary, instructions=instructions, @@ -539,12 +539,17 @@ def _create_history_records( ] async def execute_instructions( - self, instructions: List[TradeInstruction] + self, + instructions: List[TradeInstruction], + *, + market_features: Optional[List[FeatureVector]] = None, ) -> List[TxResult]: """Execute a list of instructions directly via the gateway.""" if not instructions: return [] - return await self._execution_gateway.execute(instructions) + return await self._execution_gateway.execute( + instructions, market_features=market_features + ) async def close_all_positions(self) -> List[TradeHistoryEntry]: """Close all open positions for the strategy. @@ -602,8 +607,23 @@ async def close_all_positions(self) -> List[TradeHistoryEntry]: logger.info("Executing {} close instructions", len(instructions)) + # Fetch market features for pricing if possible + market_features: List[FeatureVector] = [] + if self._request.exchange_config.trading_mode == TradingMode.VIRTUAL: + try: + pipeline_result = await self._features_pipeline.build() + market_features = extract_market_snapshot_features( + pipeline_result.features or [] + ) + except Exception: + logger.exception( + "Failed to build market features for closing positions" + ) + # Execute instructions - tx_results = await self.execute_instructions(instructions) + tx_results = await self.execute_instructions( + instructions, market_features=market_features + ) # Create trades and apply to portfolio trades = self._create_trades(tx_results, compose_id, timestamp_ms) diff --git a/python/valuecell/agents/common/trading/_internal/runtime.py b/python/valuecell/agents/common/trading/_internal/runtime.py index 655cb0fe8..573570ae4 100644 --- a/python/valuecell/agents/common/trading/_internal/runtime.py +++ b/python/valuecell/agents/common/trading/_internal/runtime.py @@ -3,6 +3,7 @@ from loguru import logger +from valuecell.server.db.repositories.strategy_repository import get_strategy_repository from valuecell.utils.uuid import generate_uuid from ..decision import BaseComposer, LlmComposer @@ -65,7 +66,6 @@ async def create_strategy_runtime( composer: Optional[BaseComposer] = None, features_pipeline: Optional[BaseFeaturesPipeline] = None, strategy_id_override: Optional[str] = None, - initial_capital_override: Optional[float] = None, ) -> StrategyRuntime: """Create a strategy runtime with async initialization (supports both paper and live trading). @@ -111,6 +111,28 @@ async def create_strategy_runtime( # Create strategy runtime components strategy_id = strategy_id_override or generate_uuid("strategy") + + # If an initial capital override wasn't provided, and this is a resume + # of an existing strategy, attempt to initialize from the persisted + # portfolio snapshot so the in-memory portfolio starts with the + # previously recorded equity. + initial_capital_override = None + if strategy_id_override: + try: + repo = get_strategy_repository() + snap = repo.get_latest_portfolio_snapshot(strategy_id_override) + if snap is not None: + initial_capital_override = float(snap.total_value or snap.cash or 0.0) + logger.info( + "Initialized runtime initial_capital from persisted snapshot for strategy_id=%s", + strategy_id_override, + ) + except Exception: + logger.exception( + "Failed to initialize initial_capital from persisted snapshot for strategy_id=%s", + strategy_id_override, + ) + initial_capital = ( initial_capital_override or request.trading_config.initial_capital or 0.0 ) @@ -147,6 +169,23 @@ async def create_strategy_runtime( digest_builder=digest_builder, ) + # If resuming an existing strategy, initialize coordinator cycle index + # from the latest persisted compose cycle so the in-memory coordinator + # continues numbering without overlap. + if strategy_id_override: + try: + repo = get_strategy_repository() + cycles = repo.get_cycles(strategy_id, limit=1) + if cycles: + latest = cycles[0] + if latest.cycle_index is not None: + coordinator.cycle_index = int(latest.cycle_index) + except Exception: + logger.exception( + "Failed to initialize coordinator cycle_index from DB for strategy_id=%s", + strategy_id, + ) + return StrategyRuntime( request=request, strategy_id=strategy_id, diff --git a/python/valuecell/agents/common/trading/base_agent.py b/python/valuecell/agents/common/trading/base_agent.py index 0ccb94162..c1776e3b8 100644 --- a/python/valuecell/agents/common/trading/base_agent.py +++ b/python/valuecell/agents/common/trading/base_agent.py @@ -17,7 +17,6 @@ ) from valuecell.core.agent.responses import streaming from valuecell.core.types import BaseAgent, StreamResponse -from valuecell.server.db.repositories.strategy_repository import get_strategy_repository from valuecell.utils import generate_uuid if TYPE_CHECKING: @@ -306,21 +305,20 @@ async def _run_background_decision( stop_reason_detail = str(err) finally: # Enforce position closure on normal stop (e.g., user clicked stop) - if stop_reason == StopReason.NORMAL_EXIT: - try: - trades = await runtime.coordinator.close_all_positions() - if trades: - controller.persist_trades(trades) - except Exception: - logger.exception( - "Error closing positions on stop for strategy {}", strategy_id - ) - # If closing positions fails, we should consider this an error state - # to prevent the strategy from being marked as cleanly stopped if it still has positions. - # However, the user intent was to stop. - # Let's log it and proceed, but maybe mark status as ERROR instead of STOPPED? - # For now, we stick to STOPPED but log the error clearly. - stop_reason = StopReason.ERROR_CLOSING_POSITIONS + try: + trades = await runtime.coordinator.close_all_positions() + if trades: + controller.persist_trades(trades) + except Exception: + logger.exception( + "Error closing positions on stop for strategy {}", strategy_id + ) + # If closing positions fails, we should consider this an error state + # to prevent the strategy from being marked as cleanly stopped if it still has positions. + # However, the user intent was to stop. + # Let's log it and proceed, but maybe mark status as ERROR instead of STOPPED? + # For now, we stick to STOPPED but log the error clearly. + stop_reason = StopReason.ERROR_CLOSING_POSITIONS # Call user hook before finalization try: @@ -356,29 +354,6 @@ async def _create_runtime( Returns: StrategyRuntime instance """ - # If a strategy id override is provided (resume case), try to - # initialize the request's initial_capital from the persisted - # portfolio snapshot so the runtime's portfolio service will be - # constructed with the persisted equity. - initial_capital_override = None - if strategy_id_override: - try: - repo = get_strategy_repository() - snap = repo.get_latest_portfolio_snapshot(strategy_id_override) - if snap is not None: - initial_capital_override = float( - snap.total_value or snap.cash or 0.0 - ) - logger.info( - "Initialized request.trading_config.initial_capital from persisted snapshot for strategy_id={}", - strategy_id_override, - ) - except Exception: - logger.exception( - "Failed to initialize initial_capital from persisted snapshot for strategy_id={}", - strategy_id_override, - ) - # Let user build custom composer (or None for default) composer = await self._create_decision_composer(request) @@ -394,5 +369,4 @@ async def _create_runtime( composer=composer, features_pipeline=features_pipeline, strategy_id_override=strategy_id_override, - initial_capital_override=initial_capital_override, )