From 8f30aa22a8f18aa583e064bfebd6435ef0084e4c Mon Sep 17 00:00:00 2001 From: Kelu Date: Sat, 29 Nov 2025 14:46:48 +0700 Subject: [PATCH 1/2] feat: Initialize Portfolio with Existing Positions and Enhance Data Fetching Reliability This pull request introduces two major enhancements to the portfolio initialization process: 1. Initial Position Loading: The system now correctly loads and integrates any existing open positions into the PortfolioView at startup. 2. Live Position Syncing: The system would sync the current position on each cycle for live tradings, to enable human interventions during the strategy execution. 3. Robust Data Fetching: A retry mechanism has been implemented for fetching portfolio data to improve reliability against transient network or API errors. --- .../valuecell/agents/common/trading/README.md | 6 +- .../common/trading/_internal/coordinator.py | 18 ++-- .../common/trading/_internal/runtime.py | 54 ++++++----- .../trading/_internal/stream_controller.py | 49 +++++----- .../valuecell/agents/common/trading/models.py | 11 ++- .../common/trading/portfolio/in_memory.py | 37 +++++--- .../valuecell/agents/common/trading/utils.py | 90 ++++++++++++++++++- .../valuecell/server/api/routers/strategy.py | 2 +- .../server/api/routers/strategy_agent.py | 5 ++ .../server/services/strategy_persistence.py | 40 +++++---- .../server/services/strategy_service.py | 25 ++++-- 11 files changed, 247 insertions(+), 90 deletions(-) diff --git a/python/valuecell/agents/common/trading/README.md b/python/valuecell/agents/common/trading/README.md index 111de7d76..c4df6e5c6 100644 --- a/python/valuecell/agents/common/trading/README.md +++ b/python/valuecell/agents/common/trading/README.md @@ -120,7 +120,8 @@ UserRequest TradingConfig - `strategy_name?: str` — Display name -- `initial_capital: float` — Starting capital (USD) +- `initial_capital: float` — Starting total cash in USD +- `initial_free_cash: float` — Starting free cash in USD (same as initial_capital if there's no initial positions) - `max_leverage: float` — Maximum leverage allowed - `max_positions: int` — Concurrent position limit - `symbols: List[str]` — Instruments to trade (e.g., `["BTC-USDT", "ETH-USDT"]`) @@ -459,7 +460,8 @@ For live trading with real exchanges: **The runtime automatically:** - Fetches real account balance -- Sets `initial_capital` to available cash +- Sets `initial_capital` to account total cash +- Sets `initial_free_cash` to available cash - Uses `CCXTExecutionGateway` for order submission **Always test on testnet first** before using real funds diff --git a/python/valuecell/agents/common/trading/_internal/coordinator.py b/python/valuecell/agents/common/trading/_internal/coordinator.py index 0932c6493..da362fb2f 100644 --- a/python/valuecell/agents/common/trading/_internal/coordinator.py +++ b/python/valuecell/agents/common/trading/_internal/coordinator.py @@ -38,6 +38,7 @@ from ..utils import ( extract_market_snapshot_features, fetch_free_cash_from_gateway, + fetch_positions_from_gateway, ) # Core interfaces for orchestration and portfolio service. @@ -132,7 +133,11 @@ async def run_once(self) -> DecisionCycleResult: portfolio.buying_power = float(free_cash) # Also update free_cash field in view if it exists portfolio.free_cash = float(free_cash) - + positions = await fetch_positions_from_gateway(self._execution_gateway) + if len(positions) == 0: + logger.warning("No position available, skipping sync") + else: + portfolio.positions = positions except Exception: # If syncing fails, continue with existing portfolio view logger.warning( @@ -466,16 +471,17 @@ def build_summary( # Fallback to internal tracking if portfolio service is unavailable unrealized = float(self._unrealized_pnl or 0.0) # Fallback equity uses initial capital when view is unavailable - equity = float(self._request.trading_config.initial_capital or 0.0) + equity = float( + (self._request.trading_config.initial_capital + unrealized) + if self._request.trading_config.initial_capital is not None + else 0.0 + ) # Keep internal state in sync (allow negative unrealized PnL) self._unrealized_pnl = float(unrealized) - initial_capital = self._request.trading_config.initial_capital or 0.0 pnl_pct = ( - (self._realized_pnl + self._unrealized_pnl) / initial_capital - if initial_capital - else None + (self._realized_pnl + self._unrealized_pnl) / equity if equity else None ) # Strategy-level unrealized percent: percent of equity (if equity is available) diff --git a/python/valuecell/agents/common/trading/_internal/runtime.py b/python/valuecell/agents/common/trading/_internal/runtime.py index 573570ae4..1ac2c0780 100644 --- a/python/valuecell/agents/common/trading/_internal/runtime.py +++ b/python/valuecell/agents/common/trading/_internal/runtime.py @@ -17,7 +17,7 @@ ) from ..models import Constraints, DecisionCycleResult, TradingMode, UserRequest from ..portfolio.in_memory import InMemoryPortfolioService -from ..utils import fetch_free_cash_from_gateway +from ..utils import fetch_free_cash_from_gateway, fetch_positions_from_gateway from .coordinator import DefaultDecisionCoordinator @@ -28,22 +28,26 @@ async def _create_execution_gateway(request: UserRequest) -> BaseExecutionGatewa # In LIVE mode, fetch exchange balance and set initial capital from free cash try: if request.exchange_config.trading_mode == TradingMode.LIVE: - free_cash, _ = await fetch_free_cash_from_gateway( + free_cash, total_cash = await fetch_free_cash_from_gateway( execution_gateway, request.trading_config.symbols ) - request.trading_config.initial_capital = float(free_cash) + request.trading_config.initial_free_cash = float(free_cash) + request.trading_config.initial_capital = float(total_cash) + request.trading_config.initial_positions = ( + await fetch_positions_from_gateway(execution_gateway) + ) except Exception: - # Log the error but continue - user might have set initial_capital manually + # Log the error but continue - user might have set initial portfolio manually logger.exception( - "Failed to fetch exchange balance for LIVE mode. Will use configured initial_capital instead." + "Failed to fetch exchange balance for LIVE mode. Will use configured initial portfolio instead." ) # Validate initial capital for LIVE mode if request.exchange_config.trading_mode == TradingMode.LIVE: - initial_cap = request.trading_config.initial_capital or 0.0 - if initial_cap <= 0: + initial_total_cash = request.trading_config.initial_capital or 0.0 + if initial_total_cash <= 0: logger.error( - f"LIVE trading mode has initial_capital={initial_cap}. " + f"LIVE trading mode has initial_total_cash={initial_total_cash}. " "This usually means balance fetch failed or account has no funds. " "Strategy will not be able to trade without capital." ) @@ -100,6 +104,7 @@ async def create_strategy_runtime( ... trading_config=TradingConfig( ... symbols=['BTC-USDT', 'ETH-USDT'], ... initial_capital=10000.0, + ... initial_free_cash=10000.0, ... max_leverage=10.0, ... max_positions=5, ... ) @@ -112,36 +117,45 @@ async def create_strategy_runtime( # Create strategy runtime components strategy_id = strategy_id_override or generate_uuid("strategy") - # If an initial capital override wasn't provided, and this is a resume - # of an existing strategy, attempt to initialize from the persisted - # portfolio snapshot so the in-memory portfolio starts with the - # previously recorded equity. - initial_capital_override = None + # If this is a resume of an existing strategy, + # attempt to initialize from the persisted portfolio snapshot + # so the in-memory portfolio starts with the previously recorded equity. + free_cash_override = None + total_cash_override = None if strategy_id_override: try: repo = get_strategy_repository() snap = repo.get_latest_portfolio_snapshot(strategy_id_override) if snap is not None: - initial_capital_override = float(snap.total_value or snap.cash or 0.0) + free_cash_override = float(snap.cash or 0.0) + total_cash_override = float( + snap.total_value - snap.total_unrealized_pnl + if ( + snap.total_value is not None + and snap.total_unrealized_pnl is not None + ) + else 0.0 + ) logger.info( - "Initialized runtime initial_capital from persisted snapshot for strategy_id=%s", + "Initialized runtime initial capital from persisted snapshot for strategy_id=%s", strategy_id_override, ) except Exception: logger.exception( - "Failed to initialize initial_capital from persisted snapshot for strategy_id=%s", + "Failed to initialize initial capital from persisted snapshot for strategy_id=%s", strategy_id_override, ) - initial_capital = ( - initial_capital_override or request.trading_config.initial_capital or 0.0 - ) + free_cash = free_cash_override or request.trading_config.initial_free_cash or 0.0 + total_cash = total_cash_override or request.trading_config.initial_capital or 0.0 constraints = Constraints( max_positions=request.trading_config.max_positions, max_leverage=request.trading_config.max_leverage, ) portfolio_service = InMemoryPortfolioService( - initial_capital=initial_capital, + free_cash=free_cash, + total_cash=total_cash, + initial_positions=request.trading_config.initial_positions, trading_mode=request.exchange_config.trading_mode, market_type=request.exchange_config.market_type, constraints=constraints, diff --git a/python/valuecell/agents/common/trading/_internal/stream_controller.py b/python/valuecell/agents/common/trading/_internal/stream_controller.py index ddbc35009..5f692fcdf 100644 --- a/python/valuecell/agents/common/trading/_internal/stream_controller.py +++ b/python/valuecell/agents/common/trading/_internal/stream_controller.py @@ -123,7 +123,7 @@ def persist_initial_state(self, runtime: StrategyRuntime) -> None: self.strategy_id, ) - # When running in LIVE mode, update DB config.initial_capital to exchange available balance + # When running in LIVE mode, update DB config.initial_free_cash to exchange available balance # and record initial capital into strategy metadata for fast access by APIs. # Only perform this on the first snapshot to avoid overwriting user edits or restarts. try: @@ -132,51 +132,54 @@ def persist_initial_state(self, runtime: StrategyRuntime) -> None: ) is_live = trading_mode == agent_models.TradingMode.LIVE if is_live and is_first_snapshot: - initial_cash = getattr(initial_portfolio, "free_cash", None) - if initial_cash is None: - initial_cash = getattr( - initial_portfolio, "account_balance", None - ) - if initial_cash is None: - initial_cash = getattr( - runtime.request.trading_config, "initial_capital", None + initial_free_cash = ( + initial_portfolio.free_cash + or initial_portfolio.account_balance + or runtime.request.trading_config.initial_free_cash + ) + initial_total_cash = ( + initial_portfolio.total_value + - initial_portfolio.total_unrealized_pnl + if ( + initial_portfolio.total_value is not None + and initial_portfolio.total_unrealized_pnl is not None ) - - if initial_cash is not None: + else runtime.request.trading_config.initial_free_cash + ) + if initial_free_cash is not None: if strategy_persistence.update_initial_capital( - self.strategy_id, float(initial_cash) + self.strategy_id, + float(initial_free_cash), + float(initial_total_cash), ): - logger.info( - "Updated DB initial_capital to {} for strategy={} (LIVE mode)", - initial_cash, - self.strategy_id, - ) try: # Also persist metadata for initial capital to avoid repeated first-snapshot queries strategy_persistence.set_initial_capital_metadata( strategy_id=self.strategy_id, - initial_capital=float(initial_cash), + initial_free_cash=float(initial_free_cash), + initial_total_cash=float(initial_total_cash), source="live_snapshot_cash", ts_ms=timestamp_ms, ) logger.info( - "Recorded initial_capital_live={} (source=live_snapshot_cash) in metadata for strategy={}", - initial_cash, + "Recorded initial_free_cash={} initial_total_cash={} (source=live_snapshot_cash) in metadata for strategy={}", + initial_free_cash, + initial_total_cash, self.strategy_id, ) except Exception: logger.exception( - "Failed to set initial_capital metadata for {}", + "Failed to set initial capital metadata for {}", self.strategy_id, ) else: logger.warning( - "Failed to update DB initial_capital for strategy={} (LIVE mode)", + "Failed to update DB initial capital for strategy={} (LIVE mode)", self.strategy_id, ) except Exception: logger.exception( - "Error while updating DB initial_capital from live balance for {}", + "Error while updating DB initial capital from live balance for {}", self.strategy_id, ) except Exception: diff --git a/python/valuecell/agents/common/trading/models.py b/python/valuecell/agents/common/trading/models.py index 983123674..e89fca469 100644 --- a/python/valuecell/agents/common/trading/models.py +++ b/python/valuecell/agents/common/trading/models.py @@ -210,9 +210,18 @@ class TradingConfig(BaseModel): ) initial_capital: Optional[float] = Field( default=DEFAULT_INITIAL_CAPITAL, - description="Initial capital for trading in USD", + description="Initial total cash for trading in USD", gt=0, ) + initial_free_cash: Optional[float] = Field( + default=DEFAULT_INITIAL_CAPITAL, + description="Initial free cash for trading in USD", + gt=0, + ) + initial_positions: Dict[str, "PositionSnapshot"] = Field( + default={}, + description="Initial positions in portfolio", + ) max_leverage: float = Field( default=DEFAULT_MAX_LEVERAGE, description="Maximum leverage", diff --git a/python/valuecell/agents/common/trading/portfolio/in_memory.py b/python/valuecell/agents/common/trading/portfolio/in_memory.py index 1d32627ea..857bb8d18 100644 --- a/python/valuecell/agents/common/trading/portfolio/in_memory.py +++ b/python/valuecell/agents/common/trading/portfolio/in_memory.py @@ -1,5 +1,5 @@ from datetime import datetime, timezone -from typing import List, Optional +from typing import Dict, List, Optional from valuecell.agents.common.trading.models import ( Constraints, @@ -36,7 +36,9 @@ class InMemoryPortfolioService(BasePortfolioService): def __init__( self, - initial_capital: float, + free_cash: float, + total_cash: float, + initial_positions: Dict[str, PositionSnapshot], trading_mode: TradingMode, market_type: MarketType, constraints: Optional[Constraints] = None, @@ -45,19 +47,34 @@ def __init__( # Store owning strategy id on the view so downstream components # always see which strategy this portfolio belongs to. self._strategy_id = strategy_id + net_exposure = sum( + value.notional + for value in initial_positions.values() + if value.notional is not None + ) + gross_exposure = sum( + abs(value.notional) + for value in initial_positions.values() + if value.notional is not None + ) + total_unrealized_pnl = sum( + value.unrealized_pnl + for value in initial_positions.values() + if value.unrealized_pnl is not None + ) self._view = PortfolioView( strategy_id=strategy_id, ts=int(datetime.now(timezone.utc).timestamp() * 1000), - account_balance=initial_capital, - positions={}, - gross_exposure=0.0, - net_exposure=0.0, + account_balance=free_cash, + positions=initial_positions, + gross_exposure=gross_exposure, + net_exposure=net_exposure, constraints=constraints or None, - total_value=initial_capital, - total_unrealized_pnl=0.0, + total_value=total_cash + total_unrealized_pnl, + total_unrealized_pnl=total_unrealized_pnl, total_realized_pnl=0.0, - buying_power=initial_capital, - free_cash=initial_capital, + buying_power=free_cash, + free_cash=free_cash, ) self._trading_mode = trading_mode self._market_type = market_type diff --git a/python/valuecell/agents/common/trading/utils.py b/python/valuecell/agents/common/trading/utils.py index 6742cead2..2b9fce167 100644 --- a/python/valuecell/agents/common/trading/utils.py +++ b/python/valuecell/agents/common/trading/utils.py @@ -1,4 +1,6 @@ +import asyncio import os +import random from typing import Dict, List, Optional, Tuple import ccxt.pro as ccxtpro @@ -9,11 +11,16 @@ FEATURE_GROUP_BY_KEY, FEATURE_GROUP_BY_MARKET_SNAPSHOT, ) -from valuecell.agents.common.trading.models import FeatureVector +from valuecell.agents.common.trading.models import ( + FeatureVector, + InstrumentRef, + PositionSnapshot, + TradeType, +) async def fetch_free_cash_from_gateway( - execution_gateway, symbols: list[str] + execution_gateway, symbols: list[str], retry_cnt: int = 0, max_retries: int = 3 ) -> Tuple[float, float]: """Fetch exchange balance via `execution_gateway.fetch_balance()` and aggregate free cash for the given `symbols` (quote currencies). @@ -26,7 +33,21 @@ async def fetch_free_cash_from_gateway( if not hasattr(execution_gateway, "fetch_balance"): return 0.0, 0.0 balance = await execution_gateway.fetch_balance() - except Exception: + except Exception as e: + if retry_cnt < max_retries: + logger.warning( + f"Failed to fetch free cash from exchange, retrying... ({retry_cnt + 1}/{max_retries})" + ) + await asyncio.sleep( + random.uniform(retry_cnt, retry_cnt + 1) + ) # jitter to prevent mass retry at the same time + return await fetch_free_cash_from_gateway( + execution_gateway, symbols, retry_cnt + 1, max_retries + ) + logger.error( + f"Failed to fetch free cash from exchange after {max_retries} retries, returning 0.0", + exception=e, + ) return 0.0, 0.0 logger.info(f"Raw balance response: {balance}") @@ -94,6 +115,69 @@ async def fetch_free_cash_from_gateway( return float(free_cash), float(total_cash) +async def fetch_positions_from_gateway( + execution_gateway, retry_cnt: int = 0, max_retries: int = 3 +) -> Dict[str, PositionSnapshot]: + """Fetch positions from exchange.""" + logger.info("Fetching positions for LIVE trading mode") + try: + if not hasattr(execution_gateway, "fetch_positions"): + return {} + raw_positions = await execution_gateway.fetch_positions() + except Exception as e: + if retry_cnt < max_retries: + logger.warning( + f"Failed to fetch positions from exchange, retrying... ({retry_cnt + 1}/{max_retries})" + ) + await asyncio.sleep( + random.uniform(retry_cnt, retry_cnt + 1) + ) # jitter to prevent mass retry at the same time + return await fetch_positions_from_gateway( + execution_gateway, retry_cnt + 1, max_retries + ) + logger.error( + f"Failed to fetch positions from exchange after {max_retries} retries, returning no positions", + exception=e, + ) + return {} + + logger.debug(f"Raw positions response: {raw_positions}") + positions = {} + for position in raw_positions: + if "symbol" in position: + symbol = position.get("symbol").split(":")[0] + position = PositionSnapshot( + instrument=InstrumentRef( + exchange_id=execution_gateway.exchange_id, + symbol=symbol, + ), + quantity=( + position.get("contracts") + if position.get("side") == "long" + else -position.get("contracts") + ), + avg_price=position.get("entryPrice"), + mark_price=position.get("markPrice"), + unrealized_pnl=position.get("unrealizedPnl"), + notional=position.get("notional"), + leverage=position.get("leverage") or 1, + entry_ts=position.get("timestamp"), + trade_type=( + TradeType.LONG + if position.get("side") == "long" + else TradeType.SHORT + ), + ) + if position.notional is not None and position.notional != 0: + position.unrealized_pnl_pct = ( + position.unrealized_pnl / position.notional + ) + positions[symbol] = position + logger.info(f"Fetched positions: {positions}") + + return positions + + def extract_market_snapshot_features( features: List[FeatureVector], ) -> List[FeatureVector]: diff --git a/python/valuecell/server/api/routers/strategy.py b/python/valuecell/server/api/routers/strategy.py index 001c63ea9..efcb0073b 100644 --- a/python/valuecell/server/api/routers/strategy.py +++ b/python/valuecell/server/api/routers/strategy.py @@ -214,7 +214,7 @@ def normalize_strategy_type( response_model=StrategyPerformanceResponse, summary="Get strategy performance and configuration overview", description=( - "Return ROI strictly from portfolio view equity (total_value) relative to initial_capital; model/provider; and final prompt strictly from templates (no fallback)." + "Return ROI strictly from portfolio view equity (total_value) relative to initial capital; model/provider; and final prompt strictly from templates (no fallback)." ), ) async def get_strategy_performance( diff --git a/python/valuecell/server/api/routers/strategy_agent.py b/python/valuecell/server/api/routers/strategy_agent.py index 1a122584d..fc3133590 100644 --- a/python/valuecell/server/api/routers/strategy_agent.py +++ b/python/valuecell/server/api/routers/strategy_agent.py @@ -68,6 +68,11 @@ def _safe_config_dump(req: UserRequest) -> dict: } ) + # Assign initial_capital value to initial_free_cash. + # Only used for paper tradings, the system would use account portfolio data for LIVE tradings. + request.trading_config.initial_free_cash = ( + request.trading_config.initial_capital + ) # Ensure we only serialize the core UserRequest fields, excluding conversation_id user_request = UserRequest( llm_model_config=request.llm_model_config, diff --git a/python/valuecell/server/services/strategy_persistence.py b/python/valuecell/server/services/strategy_persistence.py index 49b882b53..0526bca90 100644 --- a/python/valuecell/server/services/strategy_persistence.py +++ b/python/valuecell/server/services/strategy_persistence.py @@ -371,8 +371,11 @@ def persist_instructions( return inserted -def update_initial_capital(strategy_id: str, new_initial_capital: float) -> bool: - """Update `strategies.config.trading_config.initial_capital` for a strategy. +def update_initial_capital( + strategy_id: str, new_initial_free_cash: float, new_initial_total_cash: float +) -> bool: + """Update `strategies.config.trading_config.initial_capital` and + `strategies.config.trading_config.initial_free_cash` for a strategy. This writes the provided value into the persisted strategy config so that downstream performance APIs that read from `strategies.config` reflect the @@ -385,7 +388,7 @@ def update_initial_capital(strategy_id: str, new_initial_capital: float) -> bool strategy = repo.get_strategy_by_strategy_id(strategy_id) if strategy is None: logger.info( - "Skip updating initial_capital: strategy={} not found (possibly deleted)", + "Skip updating initial capital: strategy={} not found (possibly deleted)", strategy_id, ) return False @@ -395,24 +398,27 @@ def update_initial_capital(strategy_id: str, new_initial_capital: float) -> bool tr = dict((cfg.get("trading_config") or {}) or {}) try: - tr["initial_capital"] = float(new_initial_capital) + tr["initial_free_cash"] = float(new_initial_free_cash) + tr["initial_total_cash"] = float(new_initial_total_cash) except Exception: # If conversion fails, keep raw value - tr["initial_capital"] = new_initial_capital + tr["initial_free_cash"] = new_initial_free_cash + tr["initial_total_cash"] = new_initial_total_cash cfg["trading_config"] = tr updated = repo.upsert_strategy(strategy_id=strategy_id, config=cfg) if updated is None: logger.warning( - "Failed to update initial_capital in config for strategy={}", + "Failed to update initial capital in config for strategy={}", strategy_id, ) return False logger.info( - "Updated strategies.config.trading_config.initial_capital to {} for strategy={}", - tr.get("initial_capital"), + "Updated strategies.config.trading_config.initial_free_cash to {}, initial_total_cash to {} for strategy={}", + tr.get("initial_free_cash"), + tr.get("initial_total_cash"), strategy_id, ) return True @@ -423,7 +429,8 @@ def update_initial_capital(strategy_id: str, new_initial_capital: float) -> bool def set_initial_capital_metadata( strategy_id: str, - initial_capital: float, + initial_free_cash: float, + initial_total_cash: float, *, source: str | None = None, ts_ms: int | None = None, @@ -440,16 +447,18 @@ def set_initial_capital_metadata( strategy = repo.get_strategy_by_strategy_id(strategy_id) if strategy is None: logger.info( - "Skip setting initial_capital metadata: strategy={} not found", + "Skip setting initial capital metadata: strategy={} not found", strategy_id, ) return False meta = dict(strategy.strategy_metadata or {}) try: - meta["initial_capital_live"] = float(initial_capital) + meta["initial_free_cash_live"] = float(initial_free_cash) + meta["initial_total_cash_live"] = float(initial_total_cash) except Exception: - meta["initial_capital_live"] = initial_capital + meta["initial_free_cash_live"] = initial_free_cash + meta["initial_total_cash_live"] = initial_total_cash if source is not None: meta["initial_capital_source"] = source if ts_ms is not None: @@ -461,14 +470,15 @@ def set_initial_capital_metadata( updated = repo.upsert_strategy(strategy_id=strategy_id, metadata=meta) if updated is None: logger.warning( - "Failed to set initial_capital metadata for strategy={}", + "Failed to set initial capital metadata for strategy={}", strategy_id, ) return False logger.info( - "Set initial_capital_live={} (source={}) in metadata for strategy={}", - meta.get("initial_capital_live"), + "Set initial_free_cash_live={} initial_total_cash_live={} (source={}) in metadata for strategy={}", + meta.get("initial_free_cash_live"), + meta.get("initial_total_cash_live"), meta.get("initial_capital_source"), strategy_id, ) diff --git a/python/valuecell/server/services/strategy_service.py b/python/valuecell/server/services/strategy_service.py index 2d13940e7..416ed9ff6 100644 --- a/python/valuecell/server/services/strategy_service.py +++ b/python/valuecell/server/services/strategy_service.py @@ -222,20 +222,23 @@ async def get_strategy_performance( if is_live_mode: # Fast path: read from metadata set on first LIVE snapshot - initial_capital = _to_optional_float(meta.get("initial_capital_live")) - if initial_capital is None: + initial_total_cash = _to_optional_float(meta.get("initial_total_cash_live")) + if initial_total_cash is None: # Rare path: metadata missing (older strategies); query first snapshot once try: first_snapshot = repo.get_first_portfolio_snapshot(strategy_id) - initial_capital = ( - _to_optional_float(getattr(first_snapshot, "cash", None)) + initial_total_cash = ( + _to_optional_float( + first_snapshot.total_value + - first_snapshot.total_unrealized_pnl + ) if first_snapshot else None ) except Exception: - initial_capital = None + initial_total_cash = None else: - initial_capital = _to_optional_float(tr.get("initial_capital")) + initial_total_cash = _to_optional_float(tr.get("initial_capital")) max_leverage = _to_optional_float(tr.get("max_leverage")) symbols = tr.get("symbols") if tr.get("symbols") is not None else None # Resolve final prompt strictly via template_id from strategy_prompts (no fallback) @@ -263,16 +266,20 @@ async def get_strategy_performance( return_rate_pct: Optional[float] = None try: - if initial_capital and initial_capital > 0 and total_value is not None: + if ( + initial_total_cash + and initial_total_cash > 0 + and total_value is not None + ): return_rate_pct = ( - (total_value - initial_capital) / initial_capital + (total_value - initial_total_cash) / initial_total_cash ) * 100.0 except Exception: return_rate_pct = None return StrategyPerformanceData( strategy_id=strategy_id, - initial_capital=initial_capital, + initial_capital=initial_total_cash, return_rate_pct=return_rate_pct, llm_provider=llm_provider, llm_model_id=llm_model_id, From 1703ff8a42d59031ceda537d053ade8ef65840ef Mon Sep 17 00:00:00 2001 From: Kelu Date: Fri, 5 Dec 2025 13:59:38 +0700 Subject: [PATCH 2/2] fix: Handle exceptions explicitly --- .../common/trading/_internal/coordinator.py | 19 ++++++++++++++++--- .../common/trading/_internal/runtime.py | 2 +- .../valuecell/agents/common/trading/utils.py | 9 ++++++--- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/python/valuecell/agents/common/trading/_internal/coordinator.py b/python/valuecell/agents/common/trading/_internal/coordinator.py index da362fb2f..40f04ccec 100644 --- a/python/valuecell/agents/common/trading/_internal/coordinator.py +++ b/python/valuecell/agents/common/trading/_internal/coordinator.py @@ -133,11 +133,24 @@ async def run_once(self) -> DecisionCycleResult: portfolio.buying_power = float(free_cash) # Also update free_cash field in view if it exists portfolio.free_cash = float(free_cash) - positions = await fetch_positions_from_gateway(self._execution_gateway) - if len(positions) == 0: - logger.warning("No position available, skipping sync") + try: + positions = await fetch_positions_from_gateway( + self._execution_gateway + ) + except Exception as e: + logger.warning( + "Failed to sync live positions: %s - skipping sync", e + ) else: portfolio.positions = positions + total_unrealized_pnl = sum( + value.unrealized_pnl + for value in positions.values() + if value.unrealized_pnl is not None + ) + portfolio.total_unrealized_pnl = total_unrealized_pnl + if total_cash > 0: + portfolio.total_value = total_cash + total_unrealized_pnl except Exception: # If syncing fails, continue with existing portfolio view logger.warning( diff --git a/python/valuecell/agents/common/trading/_internal/runtime.py b/python/valuecell/agents/common/trading/_internal/runtime.py index 1ac2c0780..515fb1ab0 100644 --- a/python/valuecell/agents/common/trading/_internal/runtime.py +++ b/python/valuecell/agents/common/trading/_internal/runtime.py @@ -39,7 +39,7 @@ async def _create_execution_gateway(request: UserRequest) -> BaseExecutionGatewa except Exception: # Log the error but continue - user might have set initial portfolio manually logger.exception( - "Failed to fetch exchange balance for LIVE mode. Will use configured initial portfolio instead." + "Failed to fetch exchange portfolio for LIVE mode. Will use configured initial portfolio instead." ) # Validate initial capital for LIVE mode diff --git a/python/valuecell/agents/common/trading/utils.py b/python/valuecell/agents/common/trading/utils.py index 2b9fce167..ec05c83a0 100644 --- a/python/valuecell/agents/common/trading/utils.py +++ b/python/valuecell/agents/common/trading/utils.py @@ -122,7 +122,10 @@ async def fetch_positions_from_gateway( logger.info("Fetching positions for LIVE trading mode") try: if not hasattr(execution_gateway, "fetch_positions"): - return {} + raise AttributeError( + f"Execution gateway {execution_gateway.__class__.__name__} " + "does not implement the required 'fetch_positions' method." + ) raw_positions = await execution_gateway.fetch_positions() except Exception as e: if retry_cnt < max_retries: @@ -136,10 +139,10 @@ async def fetch_positions_from_gateway( execution_gateway, retry_cnt + 1, max_retries ) logger.error( - f"Failed to fetch positions from exchange after {max_retries} retries, returning no positions", + f"Failed to fetch positions from exchange after {max_retries} retries.", exception=e, ) - return {} + raise e logger.debug(f"Raw positions response: {raw_positions}") positions = {}