Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions python/valuecell/agents/common/trading/_internal/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List
from typing import List, Optional

from loguru import logger

Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
self._symbols = list(dict.fromkeys(request.trading_config.symbols))
self._realized_pnl: float = 0.0
self._unrealized_pnl: float = 0.0
self._cycle_index: int = 0
self.cycle_index: int = 0
self._strategy_name = request.trading_config.strategy_name or strategy_id

async def run_once(self) -> DecisionCycleResult:
Expand Down Expand Up @@ -174,8 +174,8 @@ async def run_once(self) -> DecisionCycleResult:
logger.info(
f" ExecutionGateway type: {type(self._execution_gateway).__name__}"
)
tx_results = await self._execution_gateway.execute(
instructions, market_features
tx_results = await self.execute_instructions(
instructions, market_features=market_features
)
logger.info(f"✅ ExecutionGateway returned {len(tx_results)} results")

Expand Down Expand Up @@ -221,13 +221,13 @@ async def run_once(self) -> DecisionCycleResult:
self._history_recorder.record(record)

digest = self._digest_builder.build(self._history_recorder.get_records())
self._cycle_index += 1
self.cycle_index += 1

portfolio = self.portfolio_service.get_view()
return DecisionCycleResult(
compose_id=compose_id,
timestamp_ms=timestamp_ms,
cycle_index=self._cycle_index,
cycle_index=self.cycle_index,
rationale=rationale,
strategy_summary=summary,
instructions=instructions,
Expand Down Expand Up @@ -539,12 +539,17 @@ def _create_history_records(
]

async def execute_instructions(
self, instructions: List[TradeInstruction]
self,
instructions: List[TradeInstruction],
*,
market_features: Optional[List[FeatureVector]] = None,
) -> List[TxResult]:
"""Execute a list of instructions directly via the gateway."""
if not instructions:
return []
return await self._execution_gateway.execute(instructions)
return await self._execution_gateway.execute(
instructions, market_features=market_features
)

async def close_all_positions(self) -> List[TradeHistoryEntry]:
"""Close all open positions for the strategy.
Expand Down Expand Up @@ -602,8 +607,23 @@ async def close_all_positions(self) -> List[TradeHistoryEntry]:

logger.info("Executing {} close instructions", len(instructions))

# Fetch market features for pricing if possible
market_features: List[FeatureVector] = []
if self._request.exchange_config.trading_mode == TradingMode.VIRTUAL:
try:
pipeline_result = await self._features_pipeline.build()
market_features = extract_market_snapshot_features(
pipeline_result.features or []
)
except Exception:
logger.exception(
"Failed to build market features for closing positions"
)

# Execute instructions
tx_results = await self.execute_instructions(instructions)
tx_results = await self.execute_instructions(
instructions, market_features=market_features
)

# Create trades and apply to portfolio
trades = self._create_trades(tx_results, compose_id, timestamp_ms)
Expand Down
41 changes: 40 additions & 1 deletion python/valuecell/agents/common/trading/_internal/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from loguru import logger

from valuecell.server.db.repositories.strategy_repository import get_strategy_repository
from valuecell.utils.uuid import generate_uuid

from ..decision import BaseComposer, LlmComposer
Expand Down Expand Up @@ -65,7 +66,6 @@ async def create_strategy_runtime(
composer: Optional[BaseComposer] = None,
features_pipeline: Optional[BaseFeaturesPipeline] = None,
strategy_id_override: Optional[str] = None,
initial_capital_override: Optional[float] = None,
) -> StrategyRuntime:
"""Create a strategy runtime with async initialization (supports both paper and live trading).

Expand Down Expand Up @@ -111,6 +111,28 @@ async def create_strategy_runtime(

# Create strategy runtime components
strategy_id = strategy_id_override or generate_uuid("strategy")

# If an initial capital override wasn't provided, and this is a resume
# of an existing strategy, attempt to initialize from the persisted
# portfolio snapshot so the in-memory portfolio starts with the
# previously recorded equity.
initial_capital_override = None
if strategy_id_override:
try:
repo = get_strategy_repository()
snap = repo.get_latest_portfolio_snapshot(strategy_id_override)
if snap is not None:
initial_capital_override = float(snap.total_value or snap.cash or 0.0)
logger.info(
"Initialized runtime initial_capital from persisted snapshot for strategy_id=%s",
strategy_id_override,
)
except Exception:
logger.exception(
"Failed to initialize initial_capital from persisted snapshot for strategy_id=%s",
strategy_id_override,
)

initial_capital = (
initial_capital_override or request.trading_config.initial_capital or 0.0
)
Expand Down Expand Up @@ -147,6 +169,23 @@ async def create_strategy_runtime(
digest_builder=digest_builder,
)

# If resuming an existing strategy, initialize coordinator cycle index
# from the latest persisted compose cycle so the in-memory coordinator
# continues numbering without overlap.
if strategy_id_override:
try:
repo = get_strategy_repository()
cycles = repo.get_cycles(strategy_id, limit=1)
if cycles:
latest = cycles[0]
if latest.cycle_index is not None:
coordinator.cycle_index = int(latest.cycle_index)
except Exception:
logger.exception(
"Failed to initialize coordinator cycle_index from DB for strategy_id=%s",
strategy_id,
)

return StrategyRuntime(
request=request,
strategy_id=strategy_id,
Expand Down
54 changes: 14 additions & 40 deletions python/valuecell/agents/common/trading/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
)
from valuecell.core.agent.responses import streaming
from valuecell.core.types import BaseAgent, StreamResponse
from valuecell.server.db.repositories.strategy_repository import get_strategy_repository
from valuecell.utils import generate_uuid

if TYPE_CHECKING:
Expand Down Expand Up @@ -306,21 +305,20 @@ async def _run_background_decision(
stop_reason_detail = str(err)
finally:
# Enforce position closure on normal stop (e.g., user clicked stop)
if stop_reason == StopReason.NORMAL_EXIT:
try:
trades = await runtime.coordinator.close_all_positions()
if trades:
controller.persist_trades(trades)
except Exception:
logger.exception(
"Error closing positions on stop for strategy {}", strategy_id
)
# If closing positions fails, we should consider this an error state
# to prevent the strategy from being marked as cleanly stopped if it still has positions.
# However, the user intent was to stop.
# Let's log it and proceed, but maybe mark status as ERROR instead of STOPPED?
# For now, we stick to STOPPED but log the error clearly.
stop_reason = StopReason.ERROR_CLOSING_POSITIONS
try:
trades = await runtime.coordinator.close_all_positions()
if trades:
controller.persist_trades(trades)
except Exception:
logger.exception(
"Error closing positions on stop for strategy {}", strategy_id
)
# If closing positions fails, we should consider this an error state
# to prevent the strategy from being marked as cleanly stopped if it still has positions.
# However, the user intent was to stop.
# Let's log it and proceed, but maybe mark status as ERROR instead of STOPPED?
# For now, we stick to STOPPED but log the error clearly.
stop_reason = StopReason.ERROR_CLOSING_POSITIONS

# Call user hook before finalization
try:
Expand Down Expand Up @@ -356,29 +354,6 @@ async def _create_runtime(
Returns:
StrategyRuntime instance
"""
# If a strategy id override is provided (resume case), try to
# initialize the request's initial_capital from the persisted
# portfolio snapshot so the runtime's portfolio service will be
# constructed with the persisted equity.
initial_capital_override = None
if strategy_id_override:
try:
repo = get_strategy_repository()
snap = repo.get_latest_portfolio_snapshot(strategy_id_override)
if snap is not None:
initial_capital_override = float(
snap.total_value or snap.cash or 0.0
)
logger.info(
"Initialized request.trading_config.initial_capital from persisted snapshot for strategy_id={}",
strategy_id_override,
)
except Exception:
logger.exception(
"Failed to initialize initial_capital from persisted snapshot for strategy_id={}",
strategy_id_override,
)

# Let user build custom composer (or None for default)
composer = await self._create_decision_composer(request)

Expand All @@ -394,5 +369,4 @@ async def _create_runtime(
composer=composer,
features_pipeline=features_pipeline,
strategy_id_override=strategy_id_override,
initial_capital_override=initial_capital_override,
)