diff --git a/python/valuecell/agents/strategy_agent/core.py b/python/valuecell/agents/strategy_agent/core.py index ad4253fe9..e7e7d3ba8 100644 --- a/python/valuecell/agents/strategy_agent/core.py +++ b/python/valuecell/agents/strategy_agent/core.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timezone -from typing import Callable, Dict, List +from typing import Callable, List from loguru import logger @@ -78,17 +78,6 @@ def _default_clock() -> datetime: return datetime.now(timezone.utc) -def _build_market_snapshot(features: List[FeatureVector]) -> Dict[str, float]: - """Derive latest market snapshot from feature vectors.""" - - snapshot: Dict[str, float] = {} - for vector in features: - price = vector.values.get("close") - if price is not None: - snapshot[vector.instrument.symbol] = float(price) - return snapshot - - class DefaultDecisionCoordinator(DecisionCoordinator): """Default implementation that wires the full decision pipeline.""" @@ -186,17 +175,27 @@ async def run_once(self) -> DecisionCycleResult: portfolio.buying_power = max(0.0, float(portfolio.cash)) # Use fixed 1-second interval and lookback of 3 minutes (60 * 3 seconds) - candles = await self._market_data_source.get_recent_candles( + candles_1s = await self._market_data_source.get_recent_candles( self._symbols, "1s", 60 * 3 ) - features = self._feature_computer.compute_features(candles=candles) - market_snapshot = _build_market_snapshot(features) + # Compute micro (1s) features with meta preserved + micro_features = self._feature_computer.compute_features(candles=candles_1s) + # Use fixed 1-minute interval and lookback of 4 hours (60 * 4 minutes) - candles = await self._market_data_source.get_recent_candles( + candles_1m = await self._market_data_source.get_recent_candles( self._symbols, "1m", 60 * 4 ) - features.extend(self._feature_computer.compute_features(candles=candles)) + minute_features = self._feature_computer.compute_features(candles=candles_1m) + + # Compose full features list: minute-level features (structural) then micro-level (freshness). + features = [] + features.extend(minute_features) + features.extend(micro_features) + # Ask the data source for an authoritative market snapshot (exchange-ticker based) + market_snapshot = await self._market_data_source.get_market_snapshot( + self._symbols + ) digest = self._digest_builder.build(list(self._history_records)) context = ComposeContext( @@ -514,6 +513,7 @@ def _build_summary( unrealized_pnl=self._unrealized_pnl, unrealized_pnl_pct=unrealized_pnl_pct, pnl_pct=pnl_pct, + total_value=equity, last_updated_ts=timestamp_ms, ) diff --git a/python/valuecell/agents/strategy_agent/data/interfaces.py b/python/valuecell/agents/strategy_agent/data/interfaces.py index 31ac1d406..ee2702a7e 100644 --- a/python/valuecell/agents/strategy_agent/data/interfaces.py +++ b/python/valuecell/agents/strategy_agent/data/interfaces.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from typing import List -from ..models import Candle +from ..models import Candle, MarketSnapShotType # Contracts for market data sources (module-local abstract interfaces). # These are plain ABCs (not Pydantic models) so implementations can be @@ -25,8 +25,20 @@ async def get_recent_candles( """Return recent candles (OHLCV) for the given symbols/interval. Args: - symbols: list of symbols (e.g., ["BTCUSDT", "ETHUSDT"]) + symbols: list of symbols (e.g., ["BTC/USDT", "ETH/USDT"]) interval: candle interval string (e.g., "1m", "5m") lookback: number of bars to retrieve """ raise NotImplementedError + + @abstractmethod + async def get_market_snapshot(self, symbols: List[str]) -> MarketSnapShotType: + """Return a lightweight market snapshot mapping symbol -> price. + + Implementations may call exchange endpoints (ticker, funding, open + interest) to build an authoritative latest-price mapping. The return + value should be a dict where keys are symbol strings and values are + latest price floats (or absent if not available). + """ + + raise NotImplementedError diff --git a/python/valuecell/agents/strategy_agent/data/market.py b/python/valuecell/agents/strategy_agent/data/market.py index 6fd90b7ee..546b30ec8 100644 --- a/python/valuecell/agents/strategy_agent/data/market.py +++ b/python/valuecell/agents/strategy_agent/data/market.py @@ -1,10 +1,10 @@ from collections import defaultdict from typing import Dict, List, Optional -import ccxt.pro as ccxtpro from loguru import logger -from ..models import Candle, InstrumentRef +from ..models import Candle, InstrumentRef, MarketSnapShotType +from ..utils import get_exchange_cls, normalize_symbol from .interfaces import MarketDataSource @@ -34,11 +34,7 @@ async def get_recent_candles( ) -> List[Candle]: async def _fetch(symbol: str) -> List[List]: # instantiate exchange class by name (e.g., ccxtpro.kraken) - exchange_cls = getattr(ccxtpro, self._exchange_id, None) - if exchange_cls is None: - raise RuntimeError( - f"Exchange '{self._exchange_id}' not found in ccxt.pro" - ) + exchange_cls = get_exchange_cls(self._exchange_id) exchange = exchange_cls({"newUpdates": False, **self._ccxt_options}) try: # ccxt.pro uses async fetch_ohlcv @@ -66,7 +62,7 @@ async def _fetch(symbol: str) -> List[List]: instrument=InstrumentRef( symbol=symbol, exchange_id=self._exchange_id, - quote_ccy="USD", + # quote_ccy="USD", ), open=float(open_v), high=float(high_v), @@ -83,3 +79,132 @@ async def _fetch(symbol: str) -> List[List]: self._exchange_id, ) return candles + + async def get_market_snapshot(self, symbols: List[str]) -> MarketSnapShotType: + """Fetch latest prices for the given symbols using exchange endpoints. + + The method tries to use the exchange's `fetch_ticker` (and optionally + `fetch_open_interest` / `fetch_funding_rate` when available) to build + a mapping symbol -> last price. On any failure for a symbol, it will + fall back to `base_prices` if provided or omit the symbol. + Example: + ``` + "BTC/USDT": { + "price": { + "symbol": "BTC/USDT:USDT", + "timestamp": 1762930517943, + "datetime": "2025-11-12T06:55:17.943Z", + "high": 105464.2, + "low": 102400.0, + "vwap": 103748.56, + "open": 105107.1, + "close": 103325.0, + "last": 103325.0, + "change": -1782.1, + "percentage": -1.696, + "average": 104216.0, + "baseVolume": 105445.427, + "quoteVolume": 10939811519.57, + "info": { + "symbol": "BTCUSDT", + "priceChange": "-1782.10", + "priceChangePercent": "-1.696", + "weightedAvgPrice": "103748.56", + "lastPrice": "103325.00", + "lastQty": "0.002", + "openPrice": "105107.10", + "highPrice": "105464.20", + "lowPrice": "102400.00", + "volume": "105445.427", + "quoteVolume": "10939811519.57", + "openTime": 1762844100000, + "closeTime": 1762930517943, + "firstId": 6852533393, + "lastId": 6856484055, + "count": 3942419 + } + }, + "open_interest": { + "symbol": "BTC/USDT:USDT", + "baseVolume": 85179.147, + "openInterestAmount": 85179.147, + "timestamp": 1762930517944, + "datetime": "2025-11-12T06:55:17.944Z", + "info": { + "symbol": "BTCUSDT", + "openInterest": "85179.147", + "time": 1762930517944 + } + }, + "funding_rate": { + "info": { + "symbol": "BTCUSDT", + "markPrice": "103325.10000000", + "indexPrice": "103382.54282609", + "estimatedSettlePrice": "103477.58650543", + "lastFundingRate": "0.00000967", + "interestRate": "0.00010000", + "nextFundingTime": 1762934400000, + "time": 1762930523000 + }, + "symbol": "BTC/USDT:USDT", + "markPrice": 103325.1, + "indexPrice": 103382.54282609, + "interestRate": 0.0001, + "estimatedSettlePrice": 103477.58650543, + "timestamp": 1762930523000, + "datetime": "2025-11-12T06:55:23.000Z", + "fundingRate": 9.67e-06, + "fundingTimestamp": 1762934400000, + "fundingDatetime": "2025-11-12T08:00:00.000Z" + } + } + ``` + """ + snapshot = defaultdict(dict) + + exchange_cls = get_exchange_cls(self._exchange_id) + exchange = exchange_cls({"newUpdates": False, **self._ccxt_options}) + try: + for symbol in symbols: + sym = normalize_symbol(symbol) + try: + ticker = await exchange.fetch_ticker(sym) + snapshot[symbol]["price"] = ticker + + # best-effort: warm other endpoints (open interest / funding) + try: + oi = await exchange.fetch_open_interest(sym) + snapshot[symbol]["open_interest"] = oi + except Exception: + logger.exception( + "Failed to fetch open interest for {} at {}", + symbol, + self._exchange_id, + ) + + try: + fr = await exchange.fetch_funding_rate(sym) + snapshot[symbol]["funding_rate"] = fr + except Exception: + logger.exception( + "Failed to fetch funding rate for {} at {}", + symbol, + self._exchange_id, + ) + except Exception: + logger.exception( + "Failed to fetch market snapshot for {} at {}", + symbol, + self._exchange_id, + ) + finally: + try: + await exchange.close() + except Exception: + logger.exception( + "Failed to close exchange connection for {}", + self._exchange_id, + ) + + return dict(snapshot) diff --git a/python/valuecell/agents/strategy_agent/decision/composer.py b/python/valuecell/agents/strategy_agent/decision/composer.py index cd2c32014..f1a443526 100644 --- a/python/valuecell/agents/strategy_agent/decision/composer.py +++ b/python/valuecell/agents/strategy_agent/decision/composer.py @@ -22,6 +22,7 @@ TradeSide, UserRequest, ) +from ..utils import extract_price_map from .interfaces import Composer from .system_prompt import SYSTEM_PROMPT @@ -76,196 +77,152 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]: # ------------------------------------------------------------------ # Prompt + LLM helpers - def _build_llm_prompt(self, context: ComposeContext) -> str: - """Serialize a concise, structured prompt for the LLM (low-noise). + @staticmethod + def _prune_none(obj): + """Recursively remove None, empty dict, and empty list values.""" + if isinstance(obj, dict): + pruned = { + k: LlmComposer._prune_none(v) for k, v in obj.items() if v is not None + } + return {k: v for k, v in pruned.items() if v not in (None, {}, [])} + if isinstance(obj, list): + pruned = [LlmComposer._prune_none(v) for v in obj] + return [v for v in pruned if v not in (None, {}, [])] + return obj + + def _compact_market_snapshot(self, snapshot: Dict) -> Dict: + """Extract decision-critical fields from market snapshot. - Design goals (inspired by the prompt doc): - - Keep only the most actionable state: prices, compact tech signals, positions, constraints - - Avoid verbose/raw dumps; drop nulls and unused fields - - Encourage risk-aware decisions and allow NOOP when no edge - - Preserve our output contract (LlmPlanProposal) + Reduces ~70% token usage while preserving key signals. """ + compact = {} + for symbol, data in snapshot.items(): + if not isinstance(data, dict): + continue + + entry = {} + # Price action + if price := data.get("price"): + if isinstance(price, dict): + entry["last"] = price.get("last") or price.get("close") + entry["change_pct"] = price.get("percentage") + entry["volume_24h"] = price.get("baseVolume") + + # Open interest + if oi := data.get("open_interest"): + if isinstance(oi, dict): + entry["open_interest"] = oi.get("openInterestAmount") or oi.get( + "baseVolume" + ) - # Helper: recursively drop keys with None values and empty dict/list - def _prune_none(obj): - if isinstance(obj, dict): - pruned = {k: _prune_none(v) for k, v in obj.items() if v is not None} - return {k: v for k, v in pruned.items() if v not in (None, {}, [])} - if isinstance(obj, list): - pruned = [_prune_none(v) for v in obj] - return [v for v in pruned if v not in (None, {}, [])] - return obj - - # Compact portfolio snapshot - pv = context.portfolio - positions = [] - for sym, snap in pv.positions.items(): - positions.append( - _prune_none( - { - "symbol": sym, - "qty": float(snap.quantity), - "avg_px": snap.avg_price, - "mark_px": snap.mark_price, - "unrealized_pnl": snap.unrealized_pnl, - "lev": snap.leverage, - "entry_ts": snap.entry_ts, - "type": getattr(snap, "trade_type", None), - } - ) - ) + # Funding rate + if fr := data.get("funding_rate"): + if isinstance(fr, dict): + entry["funding_rate"] = fr.get("fundingRate") + entry["mark_price"] = fr.get("markPrice") - # Constraints (only non-empty) - constraints = ( - pv.constraints.model_dump(mode="json", exclude_none=True) - if pv and pv.constraints - else {} - ) + if entry: + compact[symbol] = {k: v for k, v in entry.items() if v is not None} - # --- Summary & Risk Flags --- - # Aggregate win_rate across instruments (weighted by trade_count) - total_trades = 0 - weighted_win = 0.0 - for entry in (context.digest.by_instrument or {}).values(): - tc = int(getattr(entry, "trade_count", 0) or 0) - wr = getattr(entry, "win_rate", None) - if tc and wr is not None: - total_trades += tc - weighted_win += float(wr) * tc - agg_win_rate = (weighted_win / total_trades) if total_trades > 0 else None - - # Active positions - active_positions = sum( - 1 - for snap in pv.positions.values() - if abs(float(getattr(snap, "quantity", 0.0) or 0.0)) > 0.0 - ) + return compact - # Unrealized pnl pct relative to total_value (if available) - unrealized = getattr(pv, "total_unrealized_pnl", None) - total_value = getattr(pv, "total_value", None) - unrealized_pct = ( - (float(unrealized) / float(total_value) * 100.0) - if (unrealized is not None and total_value) - else None - ) + def _organize_features(self, features: List) -> Dict: + """Organize features by interval and remove redundant metadata. - # Buying power and leverage risk assessment - risk_flags: List[str] = [] - try: - equity, allowed_lev, constraints_typed, projected_gross, price_map2 = ( - self._init_buying_power_context(context) - ) - max_positions_cfg = constraints.get("max_positions") - if max_positions_cfg: - try: - if active_positions / float(max_positions_cfg) >= 0.8: - risk_flags.append("approaching_max_positions") - except Exception: - pass - - avail_bp = max( - 0.0, float(equity) * float(allowed_lev) - float(projected_gross) - ) - denom = ( - float(equity) * float(allowed_lev) if equity and allowed_lev else None - ) - if denom and denom > 0: - bp_ratio = avail_bp / denom - if bp_ratio <= 0.1: - risk_flags.append("low_buying_power") - - # High leverage usage check per-position against max_leverage - max_lev_cfg = constraints.get("max_leverage") - if max_lev_cfg: - try: - max_used_ratio = 0.0 - for snap in pv.positions.values(): - lev = getattr(snap, "leverage", None) - if lev is not None and float(max_lev_cfg) > 0: - max_used_ratio = max( - max_used_ratio, float(lev) / float(max_lev_cfg) - ) - if max_used_ratio >= 0.8: - risk_flags.append("high_leverage_usage") - except Exception: - pass - except Exception: - # If any issue computing context, skip risk flags additions silently - pass - - summary = _prune_none( - { - "active_positions": active_positions, - "max_positions": constraints.get("max_positions"), - "total_value": total_value, - "cash": pv.cash, - "unrealized_pnl": unrealized, - "unrealized_pnl_pct": unrealized_pct, - "win_rate": agg_win_rate, - "trade_count": total_trades, - # Include available buying power if computed - # This helps the model adjust aggressiveness - } - ) + Dynamically groups features by their interval (e.g., 1s, 1m, 5m, 15m). + """ + by_interval = {} + + for fv in features: + data = fv.model_dump(mode="json") + interval = data.get("meta", {}).get("interval", "") + + if not interval: + continue - # Digest (minimal useful stats) - digest_compact: Dict[str, dict] = {} - for sym, entry in (context.digest.by_instrument or {}).items(): - digest_compact[sym] = _prune_none( - { - "trade_count": entry.trade_count, - "realized_pnl": entry.realized_pnl, - "win_rate": entry.win_rate, - "avg_holding_ms": entry.avg_holding_ms, - "last_trade_ts": entry.last_trade_ts, + # Remove window timestamps (not useful for LLM) + if "meta" in data: + data["meta"] = { + "interval": interval, + "count": data["meta"].get("count"), } - ) - # Environment summary - env = _prune_none( + # Group by interval + if interval not in by_interval: + by_interval[interval] = [] + by_interval[interval].append(data) + + return by_interval + + def _build_summary(self, context: ComposeContext) -> Dict: + """Build portfolio summary with risk metrics.""" + pv = context.portfolio + + return { + "active_positions": sum( + 1 + for snap in pv.positions.values() + if abs(float(getattr(snap, "quantity", 0.0) or 0.0)) > 0.0 + ), + "total_value": getattr(pv, "total_value", None), + "cash": pv.cash, + "unrealized_pnl": getattr(pv, "total_unrealized_pnl", None), + "sharpe_ratio": getattr(context.digest, "sharpe_ratio", None), + } + + def _build_llm_prompt(self, context: ComposeContext) -> str: + """Build structured prompt for LLM decision-making. + + Produces a compact JSON with: + - summary: portfolio metrics + risk signals + - market: compacted price/OI/funding data + - features: organized by interval (1m structural, 1s realtime) + - portfolio: current positions + - digest: per-symbol historical performance + """ + pv = context.portfolio + + # Build components + summary = self._build_summary(context) + market = self._compact_market_snapshot(context.market_snapshot or {}) + features = self._organize_features(context.features) + + # Portfolio positions + positions = [ { - "exchange_id": self._request.exchange_config.exchange_id, - "trading_mode": str(self._request.exchange_config.trading_mode), - "max_leverage": constraints.get("max_leverage"), - "max_positions": constraints.get("max_positions"), + "symbol": sym, + "qty": float(snap.quantity), + "avg_px": snap.avg_price, + "unrealized_pnl": snap.unrealized_pnl, } - ) + for sym, snap in pv.positions.items() + if abs(float(snap.quantity)) > 0 + ] - # Preserve original feature structure (do not prune fields inside FeatureVector) - features_payload = [fv.model_dump(mode="json") for fv in context.features] + # Constraints + constraints = ( + pv.constraints.model_dump(mode="json", exclude_none=True) + if pv.constraints + else {} + ) - payload = _prune_none( + payload = self._prune_none( { "strategy_prompt": context.prompt_text, "summary": summary, - "risk_flags": risk_flags or None, - "env": env, - "compose_id": context.compose_id, - "ts": context.ts, - "market": context.market_snapshot, - "features": features_payload, - "portfolio": _prune_none( - { - "strategy_id": context.strategy_id, - "cash": pv.cash, - "total_value": getattr(pv, "total_value", None), - "total_unrealized_pnl": getattr( - pv, "total_unrealized_pnl", None - ), - "positions": positions, - } - ), + "market": market, + "features": features, + "positions": positions, "constraints": constraints, - "digest": digest_compact, } ) instructions = ( - "Per-cycle guidance: Read the Context JSON and form a concise plan. " - "If any arrays appear, they are ordered OLDEST → NEWEST (last = most recent). " - "Respect constraints, buying power, and risk_flags; prefer NOOP when edge is unclear. " - "Manage existing positions first; propose new exposure only with clear, trend-aligned confluence and within limits. Keep rationale brief." + "Read Context and decide. " + "features.1m = structural trends (240 periods), features.1s = realtime signals (180 periods). " + "market.funding_rate: positive = longs pay shorts. " + "Respect constraints and risk_flags. Prefer NOOP when edge unclear. " + "Output JSON with items array." ) return f"{instructions}\n\nContext:\n{json.dumps(payload, ensure_ascii=False)}" @@ -342,7 +299,7 @@ def _init_buying_power_context( ) # Initialize projected gross exposure - price_map = context.market_snapshot or {} + price_map = extract_price_map(context.market_snapshot or {}) if getattr(context.portfolio, "gross_exposure", None) is not None: projected_gross = float(context.portfolio.gross_exposure or 0.0) else: diff --git a/python/valuecell/agents/strategy_agent/decision/system_prompt.py b/python/valuecell/agents/strategy_agent/decision/system_prompt.py index 21e14b8b4..b32dc694b 100644 --- a/python/valuecell/agents/strategy_agent/decision/system_prompt.py +++ b/python/valuecell/agents/strategy_agent/decision/system_prompt.py @@ -20,7 +20,7 @@ CONSTRAINTS & VALIDATION - Respect max_positions, max_leverage, max_position_qty, quantity_step, min_trade_qty, max_order_qty, min_notional, and available buying power. - Keep leverage positive if provided. Confidence must be in [0,1]. -- If arrays appear in Context, they are ordered: OLDEST → NEWEST (last isthe most recent). +- If arrays appear in Context, they are ordered: OLDEST - NEWEST (last isthe most recent). - If risk_flags contain low_buying_power or high_leverage_usage, prefer reducing size or choosing noop. If approaching_max_positions is set, prioritize managing existing positions over opening new ones. - When estimating quantity, account for estimated fees (e.g., 1%) and potential market movement; reserve a small buffer so executed size does not exceed intended risk after fees/slippage. @@ -29,4 +29,43 @@ 2) Only propose new exposure when constraints and buying power allow. 3) Prefer fewer, higher-quality actions when signals are mixed. 4) When in doubt or edge is weak, choose noop. + +MARKET SNAPSHOT +The `market_snapshot` provided in the Context is an authoritative, per-cycle reference issued by the data source. It is a mapping of symbol -> object with lightweight numeric fields (when available): + +- `price`: a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market +- `open_interest`: open interest value (float) when available from the exchange (contracts or quote-ccy depending on exchange). Use it as a signal for liquidity and positioning interest, but treat units as exchange-specific. +- `funding_rate`: latest funding rate (decimal, e.g., 0.0001) when available. Use it to reason about carry costs for leveraged positions. + +PERFORMANCE FEEDBACK & ADAPTIVE BEHAVIOR +You will receive a Sharpe Ratio at each invocation (in Context.summary.sharpe_ratio): + +Sharpe Ratio = (Average Return - Risk-Free Rate) / Standard Deviation of Returns + +Interpretation: +- < 0: Losing money on average (net negative after risk adjustment) +- 0 to 1: Positive returns but high volatility relative to gains +- 1 to 2: Good risk-adjusted performance +- > 2: Excellent risk-adjusted performance + +Behavioral Guidelines Based on Sharpe Ratio: +- Sharpe < -0.5: + - STOP trading immediately. Choose noop for at least 6 cycles (18+ minutes). + - Reflect deeply: Are you overtrading (>2 trades/hour)? Exiting too early (<30min hold)? Using weak signals (confidence <75)? + +- Sharpe -0.5 to 0: + - Tighten entry criteria: only trade when confidence >80. + - Reduce frequency: max 1 new position per hour. + - Hold positions longer: aim for 30+ minute hold times before considering exit. + +- Sharpe 0 to 0.7: + - Maintain current discipline. Do not overtrade. + +- Sharpe > 0.7: + - Current strategy is working well. Maintain discipline and consider modest size increases + within constraints. + +Key Insight: Sharpe Ratio naturally penalizes overtrading and premature exits. +High-frequency, small P&L trades increase volatility without proportional return gains, +directly harming your Sharpe. Patience and selectivity are rewarded. """ diff --git a/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py b/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py index aad64fdbb..7542cf598 100644 --- a/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py +++ b/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py @@ -16,6 +16,7 @@ from loguru import logger from ..models import ( + MarketSnapShotType, PriceMode, TradeInstruction, TradeSide, @@ -348,7 +349,7 @@ async def _check_minimums( async def execute( self, instructions: List[TradeInstruction], - market_snapshot: Optional[Dict[str, float]] = None, + market_snapshot: Optional[MarketSnapShotType] = None, ) -> List[TxResult]: """Execute trade instructions on the real exchange via CCXT. diff --git a/python/valuecell/agents/strategy_agent/execution/interfaces.py b/python/valuecell/agents/strategy_agent/execution/interfaces.py index ce665372b..706a51f57 100644 --- a/python/valuecell/agents/strategy_agent/execution/interfaces.py +++ b/python/valuecell/agents/strategy_agent/execution/interfaces.py @@ -1,9 +1,9 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Dict, List, Optional +from typing import List, Optional -from ..models import TradeInstruction, TxResult +from ..models import MarketSnapShotType, TradeInstruction, TxResult # Contracts for execution gateways (module-local abstract interfaces). # An implementation may route to a real exchange or a paper broker. @@ -16,7 +16,7 @@ class ExecutionGateway(ABC): async def execute( self, instructions: List[TradeInstruction], - market_snapshot: Optional[Dict[str, float]] = None, + market_snapshot: Optional[MarketSnapShotType] = None, ) -> List[TxResult]: """Execute the provided instructions and return TxResult items. diff --git a/python/valuecell/agents/strategy_agent/execution/paper_trading.py b/python/valuecell/agents/strategy_agent/execution/paper_trading.py index a6dfbcec9..f2a26d33b 100644 --- a/python/valuecell/agents/strategy_agent/execution/paper_trading.py +++ b/python/valuecell/agents/strategy_agent/execution/paper_trading.py @@ -1,6 +1,7 @@ -from typing import Dict, List, Optional +from typing import List, Optional -from ..models import TradeInstruction, TradeSide, TxResult +from ..models import MarketSnapShotType, TradeInstruction, TradeSide, TxResult +from ..utils import extract_price_map from .interfaces import ExecutionGateway @@ -19,10 +20,10 @@ def __init__(self, fee_bps: float = 10.0) -> None: async def execute( self, instructions: List[TradeInstruction], - market_snapshot: Optional[Dict[str, float]] = None, + market_snapshot: Optional[MarketSnapShotType] = None, ) -> List[TxResult]: results: List[TxResult] = [] - price_map = market_snapshot or {} + price_map = extract_price_map(market_snapshot or {}) for inst in instructions: self.executed.append(inst) ref_price = float(price_map.get(inst.instrument.symbol, 0.0) or 0.0) diff --git a/python/valuecell/agents/strategy_agent/models.py b/python/valuecell/agents/strategy_agent/models.py index 0edc5a295..b59d71205 100644 --- a/python/valuecell/agents/strategy_agent/models.py +++ b/python/valuecell/agents/strategy_agent/models.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field, field_validator, model_validator @@ -289,9 +289,9 @@ class InstrumentRef(BaseModel): exchange_id: Optional[str] = Field( default=None, description="exchange identifier (e.g., binance)" ) - quote_ccy: Optional[str] = Field( - default=None, description="Quote currency (e.g., USDT)" - ) + # quote_ccy: Optional[str] = Field( + # default=None, description="Quote currency (e.g., USDT)" + # ) class Candle(BaseModel): @@ -573,6 +573,9 @@ class PortfolioValueSeries(BaseModel): points: List[MetricPoint] = Field(default_factory=list) +MarketSnapShotType = Dict[str, Dict[str, Any]] + + class ComposeContext(BaseModel): """Context assembled for the LLM-driven composer.""" @@ -589,7 +592,7 @@ class ComposeContext(BaseModel): portfolio: PortfolioView digest: "TradeDigest" prompt_text: str = Field(..., description="Strategy/style prompt text") - market_snapshot: Optional[Dict[str, float]] = Field( + market_snapshot: MarketSnapShotType = Field( default=None, description="Optional map symbol -> current reference price" ) @@ -664,6 +667,15 @@ class TradeDigest(BaseModel): ts: int by_instrument: Dict[str, TradeDigestEntry] = Field(default_factory=dict) + sharpe_ratio: Optional[float] = Field( + default=None, + description=( + "Sharpe Ratio computed from recent equity curve. " + "Formula: (avg_return - risk_free_rate) / std_dev_returns. " + "Interpretation: <0 losing; 0-1 positive but volatile; " + "1-2 good; >2 excellent risk-adjusted performance." + ), + ) class StrategySummary(BaseModel): @@ -693,6 +705,10 @@ class StrategySummary(BaseModel): unrealized_pnl_pct: Optional[float] = Field( default=None, description="Unrealized P&L as a percent of position value" ) + total_value: Optional[float] = Field( + default=None, + description="Total portfolio value (equity) including cash and positions", + ) last_updated_ts: Optional[int] = Field(default=None) diff --git a/python/valuecell/agents/strategy_agent/portfolio/in_memory.py b/python/valuecell/agents/strategy_agent/portfolio/in_memory.py index cdc87ca39..d085cf06b 100644 --- a/python/valuecell/agents/strategy_agent/portfolio/in_memory.py +++ b/python/valuecell/agents/strategy_agent/portfolio/in_memory.py @@ -1,8 +1,9 @@ from datetime import datetime, timezone -from typing import Dict, List, Optional +from typing import List, Optional from ..models import ( Constraints, + MarketSnapShotType, MarketType, PortfolioView, PositionSnapshot, @@ -11,6 +12,7 @@ TradeType, TradingMode, ) +from ..utils import extract_price_map from .interfaces import PortfolioService @@ -64,7 +66,7 @@ def get_view(self) -> PortfolioView: return self._view def apply_trades( - self, trades: List[TradeHistoryEntry], market_snapshot: Dict[str, float] + self, trades: List[TradeHistoryEntry], market_snapshot: MarketSnapShotType ) -> None: """Apply trades and update portfolio positions and aggregates. @@ -75,9 +77,12 @@ def apply_trades( backward compatibility) - portfolio aggregates: gross_exposure, net_exposure, total_value (equity), total_unrealized_pnl, buying_power """ + # Extract price map from new market snapshot structure + price_map = extract_price_map(market_snapshot) + for trade in trades: symbol = trade.instrument.symbol - price = float(trade.entry_price or market_snapshot.get(symbol, 0.0) or 0.0) + price = float(trade.entry_price or price_map.get(symbol, 0.0) or 0.0) delta = float(trade.quantity or 0.0) quantity_delta = delta if trade.side == TradeSide.BUY else -delta @@ -199,8 +204,8 @@ def apply_trades( sym = pos.instrument.symbol except Exception: sym = None - if sym and sym in market_snapshot: - snap_px = float(market_snapshot.get(sym) or 0.0) + if sym and sym in price_map: + snap_px = float(price_map.get(sym) or 0.0) if snap_px > 0: pos.mark_price = snap_px diff --git a/python/valuecell/agents/strategy_agent/portfolio/interfaces.py b/python/valuecell/agents/strategy_agent/portfolio/interfaces.py index e08360a3b..fdb4dc1cf 100644 --- a/python/valuecell/agents/strategy_agent/portfolio/interfaces.py +++ b/python/valuecell/agents/strategy_agent/portfolio/interfaces.py @@ -1,9 +1,9 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Dict, List, Optional +from typing import List, Optional -from ..models import PortfolioView, TradeHistoryEntry +from ..models import MarketSnapShotType, PortfolioView, TradeHistoryEntry class PortfolioService(ABC): @@ -18,7 +18,7 @@ def get_view(self) -> PortfolioView: raise NotImplementedError def apply_trades( - self, trades: List[TradeHistoryEntry], market_snapshot: Dict[str, float] + self, trades: List[TradeHistoryEntry], market_snapshot: MarketSnapShotType ) -> None: """Apply executed trades to the portfolio view (optional). diff --git a/python/valuecell/agents/strategy_agent/trading_history/digest.py b/python/valuecell/agents/strategy_agent/trading_history/digest.py index fec26f689..525183953 100644 --- a/python/valuecell/agents/strategy_agent/trading_history/digest.py +++ b/python/valuecell/agents/strategy_agent/trading_history/digest.py @@ -1,9 +1,17 @@ from datetime import datetime, timezone from typing import Dict, List +import numpy as np + from ..models import HistoryRecord, InstrumentRef, TradeDigest, TradeDigestEntry from .interfaces import DigestBuilder +# Risk-free rate for Sharpe Ratio calculation (annualized, 3% for this example) +RISK_FREE_RATE = 0.03 + +# Number of seconds per year (365 days * 24 hours * 3600 seconds/hour) +SECONDS_PER_YEAR = 365 * 24 * 3600 + class RollingDigestBuilder(DigestBuilder): """Builds a lightweight digest from recent execution records.""" @@ -135,4 +143,94 @@ def build(self, records: List[HistoryRecord]) -> TradeDigest: except Exception: entry.avg_holding_ms = None - return TradeDigest(ts=timestamp, by_instrument=by_instrument) + # Calculate Sharpe Ratio from equity curve + sharpe_ratio = self._calculate_sharpe_ratio(recent) + + return TradeDigest( + ts=timestamp, by_instrument=by_instrument, sharpe_ratio=sharpe_ratio + ) + + def _calculate_sharpe_ratio(self, records: List[HistoryRecord]) -> float | None: + """Calculate Sharpe Ratio from equity curve in history records. + + Extracts portfolio equity from compose records and computes risk-adjusted + return as: (mean_return - risk_free_rate) / std_dev_returns. + + Args: + records: Recent history records (should include compose records) + + Returns: + Sharpe Ratio (float) or None if insufficient data + """ + if len(records) < 2: + return None + + # Extract equity values and timestamps from compose records + equities: List[float] = [] + timestamps: List[int] = [] + for record in records: + if record.kind == "compose": + payload = record.payload or {} + summary = payload.get("summary") or {} + # StrategySummary may have total_value in different representations + # Try to extract equity (total portfolio value) + equity = None + # Attempt 1: summary is already a dict with total_value + if isinstance(summary, dict): + equity = summary.get("total_value") + # Attempt 2: summary might be serialized; check for equity-like fields + if equity is None and isinstance(summary, dict): + # Fallback: try to compute from realized + unrealized + initial + # For now, we'll rely on total_value being present + pass + if equity is not None: + try: + eq_val = float(equity) + if eq_val > 0: + equities.append(eq_val) + timestamps.append(record.ts) + except (ValueError, TypeError): + pass + + if len(equities) < 2 or len(timestamps) < 2: + return None + + # Calculate average period in seconds + intervals = [] + for i in range(1, len(timestamps)): + interval = ( + timestamps[i] - timestamps[i - 1] + ) / 1000.0 # Convert ms to seconds + if interval > 0: + intervals.append(interval) + if not intervals: + return None + avg_period_seconds = sum(intervals) / len(intervals) + + # Calculate periods per year + periods_per_year = SECONDS_PER_YEAR / avg_period_seconds + + # Calculate period returns + returns: List[float] = [] + for i in range(1, len(equities)): + if equities[i - 1] > 0: + period_return = (equities[i] - equities[i - 1]) / equities[i - 1] + returns.append(period_return) + + if len(returns) < 2: + return None + + # Compute mean and standard deviation + returns_arr = np.array(returns) + mean_return = float(np.mean(returns_arr)) + std_return = float(np.std(returns_arr, ddof=1)) # Sample std deviation + + # Sharpe Ratio + if std_return > 0: + # Adjust risk-free rate per period based on actual check interval + period_rf = RISK_FREE_RATE / periods_per_year + sharpe_ratio = (mean_return - period_rf) / std_return + return float(sharpe_ratio) + + # If std is zero, no volatility -> undefined Sharpe + return None diff --git a/python/valuecell/agents/strategy_agent/utils.py b/python/valuecell/agents/strategy_agent/utils.py new file mode 100644 index 000000000..dee41a322 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/utils.py @@ -0,0 +1,71 @@ +from typing import Dict + +import ccxt.pro as ccxtpro +from loguru import logger + + +def extract_price_map(market_snapshot: Dict) -> Dict[str, float]: + """Extract a simple symbol -> price mapping from market snapshot structure. + + The market snapshot structure is: + { + "BTC/USDT:USDT": { + "price": {ticker dict with "last", "close", etc.}, + "open_interest": {...}, + "funding_rate": {...} + } + } + + Returns: + Dict[symbol, last_price] for internal use in quantity normalization. + """ + price_map: Dict[str, float] = {} + for symbol, data in market_snapshot.items(): + if not isinstance(data, dict): + continue + price_obj = data.get("price") + if isinstance(price_obj, dict): + # Prefer "last" over "close" for real-time pricing + last_price = price_obj.get("last") or price_obj.get("close") + if last_price is not None: + try: + price_map[symbol] = float(last_price) + except (ValueError, TypeError): + logger.warning( + "Failed to parse price for {}: {}", symbol, last_price + ) + return price_map + + +def normalize_symbol(symbol: str) -> str: + """Normalize symbol format for CCXT. + + Examples: + BTC-USD -> BTC/USD:USD (spot) + BTC-USDT -> BTC/USDT:USDT (USDT futures on colon exchanges) + ETH-USD -> ETH/USD:USD (USD futures on colon exchanges) + + Args: + symbol: Symbol in format 'BTC-USD', 'BTC-USDT', etc. + + Returns: + Normalized CCXT symbol + """ + # Replace dash with slash + base_symbol = symbol.replace("-", "/") + + if ":" not in base_symbol: + parts = base_symbol.split("/") + if len(parts) == 2: + base_symbol = f"{parts[0]}/{parts[1]}:{parts[1]}" + + return base_symbol + + +def get_exchange_cls(exchange_id: str): + """Get CCXT exchange class by exchange ID.""" + + exchange_cls = getattr(ccxtpro, exchange_id, None) + if exchange_cls is None: + raise RuntimeError(f"Exchange '{exchange_id}' not found in ccxt.pro") + return exchange_cls diff --git a/python/valuecell/server/api/routers/strategy_agent.py b/python/valuecell/server/api/routers/strategy_agent.py index a6411036e..480cf9ae8 100644 --- a/python/valuecell/server/api/routers/strategy_agent.py +++ b/python/valuecell/server/api/routers/strategy_agent.py @@ -80,12 +80,12 @@ async def create_strategy_agent( if content: user_request.trading_config.prompt_text = content logger.info( - "Resolved prompt_id=%s to prompt_text for strategy creation", + "Resolved prompt_id={} to prompt_text for strategy creation", prompt_id, ) except Exception: logger.exception( - "Failed to load prompt for prompt_id=%s; continuing without resolved prompt", + "Failed to load prompt for prompt_id={}; continuing without resolved prompt", prompt_id, ) except Exception: