diff --git a/python/valuecell/agents/strategy_agent/core.py b/python/valuecell/agents/strategy_agent/core.py index 458e1f334..ad4253fe9 100644 --- a/python/valuecell/agents/strategy_agent/core.py +++ b/python/valuecell/agents/strategy_agent/core.py @@ -184,12 +184,19 @@ async def run_once(self) -> DecisionCycleResult: if self._request.exchange_config.trading_mode == TradingMode.VIRTUAL: if self._request.exchange_config.market_type == MarketType.SPOT: portfolio.buying_power = max(0.0, float(portfolio.cash)) - # Use fixed 1-minute interval and lookback of 4 hours (60 * 4 minutes) + + # Use fixed 1-second interval and lookback of 3 minutes (60 * 3 seconds) candles = await self._market_data_source.get_recent_candles( - self._symbols, "1m", 60 * 4 + self._symbols, "1s", 60 * 3 ) features = self._feature_computer.compute_features(candles=candles) market_snapshot = _build_market_snapshot(features) + # Use fixed 1-minute interval and lookback of 4 hours (60 * 4 minutes) + candles = await self._market_data_source.get_recent_candles( + self._symbols, "1m", 60 * 4 + ) + features.extend(self._feature_computer.compute_features(candles=candles)) + digest = self._digest_builder.build(list(self._history_records)) context = ComposeContext( diff --git a/python/valuecell/agents/strategy_agent/decision/composer.py b/python/valuecell/agents/strategy_agent/decision/composer.py index 774dbbb69..cd2c32014 100644 --- a/python/valuecell/agents/strategy_agent/decision/composer.py +++ b/python/valuecell/agents/strategy_agent/decision/composer.py @@ -4,9 +4,13 @@ import math from typing import Dict, List, Optional +from agno.agent import Agent as AgnoAgent from loguru import logger from pydantic import ValidationError +from valuecell.utils import env as env_utils +from valuecell.utils import model as model_utils + from ..models import ( ComposeContext, Constraints, @@ -19,6 +23,7 @@ UserRequest, ) from .interfaces import Composer +from .system_prompt import SYSTEM_PROMPT class LlmComposer(Composer): @@ -50,16 +55,13 @@ def __init__( async def compose(self, context: ComposeContext) -> List[TradeInstruction]: prompt = self._build_llm_prompt(context) - logger.debug( - "Built LLM prompt for compose_id={}: {}", - context.compose_id, - prompt, - ) try: plan = await self._call_llm(prompt) if not plan.items: - logger.error( - "LLM returned empty plan for compose_id={}", context.compose_id + logger.info( + "LLM returned empty plan for compose_id={} with rationale={}", + context.compose_id, + plan.rationale, ) return [] except ValidationError as exc: @@ -75,32 +77,198 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]: # Prompt + LLM helpers def _build_llm_prompt(self, context: ComposeContext) -> str: - """Serialize compose context into a textual prompt for the LLM.""" - - payload = { - "strategy_prompt": context.prompt_text, - "compose_id": context.compose_id, - "timestamp": context.ts, - "portfolio": context.portfolio.model_dump(mode="json"), - "market_snapshot": context.market_snapshot or {}, - "digest": context.digest.model_dump(mode="json"), - "features": [vector.model_dump(mode="json") for vector in context.features], - # Constraints live on the portfolio view; prefer typed model_dump when present - "constraints": ( - context.portfolio.constraints.model_dump(mode="json", exclude_none=True) - if context.portfolio and context.portfolio.constraints - else {} - ), - } + """Serialize a concise, structured prompt for the LLM (low-noise). + + Design goals (inspired by the prompt doc): + - Keep only the most actionable state: prices, compact tech signals, positions, constraints + - Avoid verbose/raw dumps; drop nulls and unused fields + - Encourage risk-aware decisions and allow NOOP when no edge + - Preserve our output contract (LlmPlanProposal) + """ + + # Helper: recursively drop keys with None values and empty dict/list + def _prune_none(obj): + if isinstance(obj, dict): + pruned = {k: _prune_none(v) for k, v in obj.items() if v is not None} + return {k: v for k, v in pruned.items() if v not in (None, {}, [])} + if isinstance(obj, list): + pruned = [_prune_none(v) for v in obj] + return [v for v in pruned if v not in (None, {}, [])] + return obj + + # Compact portfolio snapshot + pv = context.portfolio + positions = [] + for sym, snap in pv.positions.items(): + positions.append( + _prune_none( + { + "symbol": sym, + "qty": float(snap.quantity), + "avg_px": snap.avg_price, + "mark_px": snap.mark_price, + "unrealized_pnl": snap.unrealized_pnl, + "lev": snap.leverage, + "entry_ts": snap.entry_ts, + "type": getattr(snap, "trade_type", None), + } + ) + ) + + # Constraints (only non-empty) + constraints = ( + pv.constraints.model_dump(mode="json", exclude_none=True) + if pv and pv.constraints + else {} + ) + + # --- Summary & Risk Flags --- + # Aggregate win_rate across instruments (weighted by trade_count) + total_trades = 0 + weighted_win = 0.0 + for entry in (context.digest.by_instrument or {}).values(): + tc = int(getattr(entry, "trade_count", 0) or 0) + wr = getattr(entry, "win_rate", None) + if tc and wr is not None: + total_trades += tc + weighted_win += float(wr) * tc + agg_win_rate = (weighted_win / total_trades) if total_trades > 0 else None + + # Active positions + active_positions = sum( + 1 + for snap in pv.positions.values() + if abs(float(getattr(snap, "quantity", 0.0) or 0.0)) > 0.0 + ) + + # Unrealized pnl pct relative to total_value (if available) + unrealized = getattr(pv, "total_unrealized_pnl", None) + total_value = getattr(pv, "total_value", None) + unrealized_pct = ( + (float(unrealized) / float(total_value) * 100.0) + if (unrealized is not None and total_value) + else None + ) + + # Buying power and leverage risk assessment + risk_flags: List[str] = [] + try: + equity, allowed_lev, constraints_typed, projected_gross, price_map2 = ( + self._init_buying_power_context(context) + ) + max_positions_cfg = constraints.get("max_positions") + if max_positions_cfg: + try: + if active_positions / float(max_positions_cfg) >= 0.8: + risk_flags.append("approaching_max_positions") + except Exception: + pass + + avail_bp = max( + 0.0, float(equity) * float(allowed_lev) - float(projected_gross) + ) + denom = ( + float(equity) * float(allowed_lev) if equity and allowed_lev else None + ) + if denom and denom > 0: + bp_ratio = avail_bp / denom + if bp_ratio <= 0.1: + risk_flags.append("low_buying_power") + + # High leverage usage check per-position against max_leverage + max_lev_cfg = constraints.get("max_leverage") + if max_lev_cfg: + try: + max_used_ratio = 0.0 + for snap in pv.positions.values(): + lev = getattr(snap, "leverage", None) + if lev is not None and float(max_lev_cfg) > 0: + max_used_ratio = max( + max_used_ratio, float(lev) / float(max_lev_cfg) + ) + if max_used_ratio >= 0.8: + risk_flags.append("high_leverage_usage") + except Exception: + pass + except Exception: + # If any issue computing context, skip risk flags additions silently + pass + + summary = _prune_none( + { + "active_positions": active_positions, + "max_positions": constraints.get("max_positions"), + "total_value": total_value, + "cash": pv.cash, + "unrealized_pnl": unrealized, + "unrealized_pnl_pct": unrealized_pct, + "win_rate": agg_win_rate, + "trade_count": total_trades, + # Include available buying power if computed + # This helps the model adjust aggressiveness + } + ) + + # Digest (minimal useful stats) + digest_compact: Dict[str, dict] = {} + for sym, entry in (context.digest.by_instrument or {}).items(): + digest_compact[sym] = _prune_none( + { + "trade_count": entry.trade_count, + "realized_pnl": entry.realized_pnl, + "win_rate": entry.win_rate, + "avg_holding_ms": entry.avg_holding_ms, + "last_trade_ts": entry.last_trade_ts, + } + ) + + # Environment summary + env = _prune_none( + { + "exchange_id": self._request.exchange_config.exchange_id, + "trading_mode": str(self._request.exchange_config.trading_mode), + "max_leverage": constraints.get("max_leverage"), + "max_positions": constraints.get("max_positions"), + } + ) + + # Preserve original feature structure (do not prune fields inside FeatureVector) + features_payload = [fv.model_dump(mode="json") for fv in context.features] + + payload = _prune_none( + { + "strategy_prompt": context.prompt_text, + "summary": summary, + "risk_flags": risk_flags or None, + "env": env, + "compose_id": context.compose_id, + "ts": context.ts, + "market": context.market_snapshot, + "features": features_payload, + "portfolio": _prune_none( + { + "strategy_id": context.strategy_id, + "cash": pv.cash, + "total_value": getattr(pv, "total_value", None), + "total_unrealized_pnl": getattr( + pv, "total_unrealized_pnl", None + ), + "positions": positions, + } + ), + "constraints": constraints, + "digest": digest_compact, + } + ) instructions = ( - "You are a trading strategy planner. Analyze the JSON context and " - "produce a structured plan that aligns with the LlmPlanProposal " - "schema (items array with instrument, action, target_qty, rationale, " - "confidence). Focus on risk-aware, executable decisions." + "Per-cycle guidance: Read the Context JSON and form a concise plan. " + "If any arrays appear, they are ordered OLDEST → NEWEST (last = most recent). " + "Respect constraints, buying power, and risk_flags; prefer NOOP when edge is unclear. " + "Manage existing positions first; propose new exposure only with clear, trend-aligned confluence and within limits. Keep rationale brief." ) - return f"{instructions}\n\nContext:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" + return f"{instructions}\n\nContext:\n{json.dumps(payload, ensure_ascii=False)}" async def _call_llm(self, prompt: str) -> LlmPlanProposal: """Invoke an LLM asynchronously and parse the response into LlmPlanProposal. @@ -112,19 +280,22 @@ async def _call_llm(self, prompt: str) -> LlmPlanProposal: `LlmPlanProposal`. """ - from agno.agent import Agent as AgnoAgent - - from valuecell.utils.model import create_model_with_provider - cfg = self._request.llm_model_config - model = create_model_with_provider( + model = model_utils.create_model_with_provider( provider=cfg.provider, model_id=cfg.model_id, api_key=cfg.api_key, ) # Wrap model in an Agent (consistent with parser_agent usage) - agent = AgnoAgent(model=model, output_schema=LlmPlanProposal, markdown=False) + agent = AgnoAgent( + model=model, + output_schema=LlmPlanProposal, + markdown=False, + instructions=[SYSTEM_PROMPT], + use_json_mode=model_utils.model_should_use_json_mode(model), + debug_mode=env_utils.agent_debug_mode_enabled(), + ) response = await agent.arun(prompt) content = getattr(response, "content", None) or response logger.debug("Received LLM response {}", content) diff --git a/python/valuecell/agents/strategy_agent/decision/system_prompt.py b/python/valuecell/agents/strategy_agent/decision/system_prompt.py new file mode 100644 index 000000000..21e14b8b4 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/decision/system_prompt.py @@ -0,0 +1,32 @@ +"""System prompt for the Strategy Agent LLM planner. + +This prompt captures ONLY the agent's role, IO contract (schema), and +responsibilities around constraints and validation. Trading style and +heuristics live in strategy templates (e.g., templates/default.txt). + +It is passed to the LLM wrapper as a system/instruction message, while the +per-cycle JSON Context is provided as the user message by the composer. +""" + +SYSTEM_PROMPT: str = """ +ROLE & IDENTITY +You are an autonomous trading planner that outputs a structured plan for a crypto strategy executor. Your objective is to maximize risk-adjusted returns while preserving capital. You are stateless across cycles. + +ACTION SEMANTICS +- target_qty is the desired FINAL signed position quantity: >0 long, <0 short, 0 flat (close). The executor computes delta = target_qty − current_qty to create orders. +- To close, set target_qty to 0. Do not invent other action names. +- One item per symbol at most. No hedging (never propose both long and short exposure on the same symbol). + +CONSTRAINTS & VALIDATION +- Respect max_positions, max_leverage, max_position_qty, quantity_step, min_trade_qty, max_order_qty, min_notional, and available buying power. +- Keep leverage positive if provided. Confidence must be in [0,1]. +- If arrays appear in Context, they are ordered: OLDEST → NEWEST (last isthe most recent). +- If risk_flags contain low_buying_power or high_leverage_usage, prefer reducing size or choosing noop. If approaching_max_positions is set, prioritize managing existing positions over opening new ones. +- When estimating quantity, account for estimated fees (e.g., 1%) and potential market movement; reserve a small buffer so executed size does not exceed intended risk after fees/slippage. + +DECISION FRAMEWORK +1) Manage current positions first (reduce risk, close invalidated trades). +2) Only propose new exposure when constraints and buying power allow. +3) Prefer fewer, higher-quality actions when signals are mixed. +4) When in doubt or edge is weak, choose noop. +""" diff --git a/python/valuecell/agents/strategy_agent/features/interfaces.py b/python/valuecell/agents/strategy_agent/features/interfaces.py index 5ef4bc4bd..1702c3d4e 100644 --- a/python/valuecell/agents/strategy_agent/features/interfaces.py +++ b/python/valuecell/agents/strategy_agent/features/interfaces.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import List, Optional +from typing import Any, Dict, List, Optional from ..models import Candle, FeatureVector @@ -20,11 +20,15 @@ class FeatureComputer(ABC): def compute_features( self, candles: Optional[List[Candle]] = None, + meta: Optional[Dict[str, Any]] = None, ) -> List[FeatureVector]: """Build feature vectors from the given inputs. Args: candles: optional window of candles + meta: optional metadata about the input window (e.g., interval, + window_start_ts, window_end_ts, num_points). Implementations may + use this to populate FeatureVector.meta. Returns: A list of FeatureVector items, one or more per instrument. """ diff --git a/python/valuecell/agents/strategy_agent/features/simple.py b/python/valuecell/agents/strategy_agent/features/simple.py index d3b07a03f..ce2b90833 100644 --- a/python/valuecell/agents/strategy_agent/features/simple.py +++ b/python/valuecell/agents/strategy_agent/features/simple.py @@ -12,7 +12,9 @@ class SimpleFeatureComputer(FeatureComputer): """Computes basic momentum and volume features.""" def compute_features( - self, candles: Optional[List[Candle]] = None + self, + candles: Optional[List[Candle]] = None, + meta: Optional[Dict[str, object]] = None, ) -> List[FeatureVector]: if not candles: return [] @@ -127,13 +129,26 @@ def compute_features( ), } + # Build feature meta + window_start_ts = int(rows[0]["ts"]) if rows else int(last["ts"]) + window_end_ts = int(last["ts"]) + fv_meta = { + "interval": series[-1].interval, + "count": len(series), + "window_start_ts": window_start_ts, + "window_end_ts": window_end_ts, + } + if meta: + # Merge provided meta (doesn't overwrite core keys unless intended) + for k, v in meta.items(): + fv_meta.setdefault(k, v) + features.append( FeatureVector( ts=int(last["ts"]), instrument=series[-1].instrument, values=values, - meta={"interval": series[-1].interval, "count": len(series)}, + meta=fv_meta, ) ) - return features diff --git a/python/valuecell/agents/strategy_agent/models.py b/python/valuecell/agents/strategy_agent/models.py index 5c36bb7b3..0edc5a295 100644 --- a/python/valuecell/agents/strategy_agent/models.py +++ b/python/valuecell/agents/strategy_agent/models.py @@ -316,7 +316,13 @@ class FeatureVector(BaseModel): default_factory=dict, description="Feature name to numeric value" ) meta: Optional[Dict[str, float | int | str]] = Field( - default=None, description="Optional metadata (e.g., window lengths)" + default=None, + description=( + "Optional metadata about the source window: keys MAY include interval, " + "window_start_ts, window_end_ts (ms), count/num_points, and any feature " + "family identifiers. Feature computers SHOULD populate these so downstream " + "components can reason about freshness and coverage." + ), ) @@ -475,6 +481,9 @@ class LlmPlanProposal(BaseModel): ts: int items: List[LlmDecisionItem] = Field(default_factory=list) + rationale: Optional[str] = Field( + default=None, description="Optional natural language rationale" + ) class PriceMode(str, Enum): diff --git a/python/valuecell/agents/strategy_agent/templates/default.txt b/python/valuecell/agents/strategy_agent/templates/default.txt index 57b5fb4f8..26bac7307 100644 --- a/python/valuecell/agents/strategy_agent/templates/default.txt +++ b/python/valuecell/agents/strategy_agent/templates/default.txt @@ -1,63 +1,58 @@ +Goal +Produce steady, risk‑aware crypto trading decisions that aim for consistent small gains while protecting capital. -Goal: -Produce steady, risk-aware crypto trading decisions that aim for consistent small gains while protecting capital. - -Style & constraints: -- Focus on liquid major symbols (e.g., BTC-USD, ETH-USD). Avoid low-liquidity altcoins. -- Use conservative position sizing: target at most 1-2% of portfolio NAV per new trade (respecting `cap_factor`). -- Limit concurrent open positions to moderate number (use strategy config `max_positions`). +Style & constraints +- Focus on liquid majors (e.g., BTC-USD, ETH-USD). Avoid low-liquidity altcoins. +- Size conservatively: target at most 1–2% of NAV per new trade (respect cap_factor). +- Limit concurrent open positions (respect max_positions from constraints/config). - Prefer market or tight-limit entries on pullbacks; avoid chasing large, fast moves. -- Use clear stop-loss and profit-target logic (see Risk Management section below). -- Favor trend-aligned entries: if the short- to mid-term trend is bullish, prefer long entries; if bearish, prefer shorts or sit out. -- Avoid entering during major macro events, maintenance windows, or low-volume periods (e.g., holidays, weekends depending on instrument). - -Signals & decision heuristics: -- Trend detection: compute short EMA (e.g., 20) vs long EMA (e.g., 100). Require short EMA > long EMA for a bullish bias and vice versa for bearish bias. -- Momentum confirmation: require a momentum feature (e.g., RSI between 30-70 band moving toward oversold for entries) to avoid overbought entries. -- Volatility filter: if realized volatility in recent window is above a configurable threshold, reduce position size or skip signals. -- Pullback entries: prefer to enter on a pullback toward a moving average or a defined support zone rather than at local highs. -- Confluence: prefer signals with at least two confirming indicators (trend + momentum or trend + volume spike on breakout). - -Order sizing & execution: -- Determine notional for a trade = min( cap_factor * average_symbol_daily_volume_notional, requested_notional, available buying power ). -- Convert notional -> quantity using current mark price. -- Clamp size to `min_trade_qty` and `max_order_qty` from runtime constraints. -- Use market orders for small/frequent rebalances; use limit orders (near current spread) for larger entries to avoid slippage. -- If partial fills occur, allow reattempts up to a short retry limit, then treat as partial fill and update portfolio accordingly. - -Risk management (mandatory): -- Stop-loss: set a stop at a fixed percentage or ATR multiple (e.g., 1.5x ATR) below entry for longs (above for shorts). -- Take-profit: set a profit target at a risk:reward ratio of at least 1:1.5 (configurable). -- Trailing stop: optionally convert stop to trailing at meaningful profit thresholds (e.g., after 1x risk reached). -- Cap total portfolio risk: do not allow aggregated potential loss (sum of per-position risk) to exceed a configurable fraction of NAV. -- Fees: account for estimated fees when sizing orders and when evaluating profit/loss. - -Position management & lifecycle: -- If a position is opened, compute and record entry price, notional, leverage, and planned stop/take levels in the trade meta. -- Re-evaluate open positions each cycle: if stop or take conditions hit, close; if market regime flips (trend opposite), consider reducing size or closing. -- Avoid frequent flipping: prefer 'flip-by-flat' — close an opposite-direction position fully before opening a new one (do not net opposite directions in same symbol). - -Edge cases & guards: -- If the computed quantity is below `min_trade_qty`, skip the trade. -- If the current spread or slippage estimate is larger than an acceptable threshold, skip or reduce order size. -- If data for an instrument is stale (last candle older than 2x interval), skip trading that instrument this cycle. - -Rationale and explainability: -- For each suggested action, include a short rationale string: why the signal triggered, which indicators agreed, and the planned stop/take. -- For rejected/ignored signals, include a brief reason (e.g., "skipped: notional below min_trade_qty", "skipped: volatility too high"). - -Telemetry & meta: -- Attach these meta fields to each instruction: compose_id, strategy_id, timestamp, estimated_fee, estimated_notional, confidence_score. -- Confidence: normalize to [0,1]; reduce size proportionally to confidence if below a threshold (e.g., 0.5). - -Failure modes & safe-fallbacks: -- If execution gateway returns an error or rejects, do not keep trying indefinitely—mark instruction as ERROR and surface reason in logs and history. -- If critical internal errors occur, pause trading and emit a status update. - -Summary (one-sentence): -Be conservative and trend-aware: take small, well-sized positions on pullbacks or confirmed breakouts, protect capital with explicit stops, and prefer gradual, repeatable profits over large, risky bets. - -Examples (short): -- Bullish pullback: short EMA > long EMA, RSI dropped below 50 and turning up, enter long sized at 1% NAV, stop = 2% below entry, target = 3% above entry. -- Breakout: short EMA crosses above long EMA with volume spike, enter on a tight breakout candle close, stop under breakout low, R:R = 1:1.5. +- Favor trend‑aligned entries; if the mid‑term trend is unclear, sit out. +- Avoid entries during maintenance windows or very low volume periods. + +Signals & decision heuristics +- Trend detection: short EMA (e.g., 20) vs long EMA (e.g., 100); bias with the trend. +- Momentum confirmation: avoid overbought entries; prefer RSI turning up from balanced/oversold zones. +- Volatility filter: when realized volatility is high, reduce size or skip. +- Pullbacks: enter nearer to moving averages or support zones; reduce chasing breakouts. +- Confluence: require at least two agreeing signals (trend + momentum/volume/structure). + +Order sizing & execution +- Notional = min(cap_factor * equity, available buying power, venue caps). +- Convert notional → quantity using mark price; clamp to min_trade_qty, max_order_qty, and quantity_step. +- Use market orders for small rebalances; prefer tight-limit for larger orders to control slippage. +- If partial fills occur, retry briefly; then treat as partial and update portfolio. + +Risk management (mandatory) +- Stops: place below recent support/ATR multiples for longs; above resistance for shorts. +- Targets: risk:reward at least 1:1.5 by default (configurable). +- Trailing: consider converting to trailing after ~1x risk achieved. +- Portfolio risk cap: aggregated potential loss should not exceed a fraction of NAV. +- Fees: include estimated fees when sizing and evaluating P/L. + +Position management & lifecycle +- On open: record entry price, notional, leverage, and planned stop/target in trade meta. +- Re-evaluate each cycle: close on stop/target hit or regime flip; avoid frequent flip-flopping. +- Flip-by-flat: fully close before reversing direction on the same symbol. +- cautious full deployment + - Purpose: staged scale‑in for high‑conviction, well‑liquid opportunities. + - Entry criteria: high digest/win_rate + strong multi‑factor confluence; sufficient buying_power; no blocking risk_flags. + - Execution: scale in 2–3 tranches (e.g., 50/30/20); require confirmation between tranches; start with lower leverage. + - Safety: ensure liquidation distance > configured margin; abort remaining tranches on large slippage or failed fills. + - Post‑deploy: enable trailing stops and de‑risk if risk_flags flip. + +Edge cases & guards +- If computed quantity < min_trade_qty, skip. +- If spread/slippage estimate is large, skip or reduce size. +- If data is stale (last candle older than 2× interval), skip for that symbol. + +Rationale and explainability +- For each action, include a short rationale: which signals agreed and the stop/target idea. +- For skipped signals, include a brief reason (e.g., volatility too high, below min_notional). + +Telemetry & meta +- Attach: compose_id, strategy_id, timestamp, estimated_fee, estimated_notional, confidence_score. +- Confidence: normalize to [0,1]; proportionally reduce size when confidence is low (< 0.5). + +Summary (one sentence) +Be conservative and trend‑aware: take small, well‑sized positions on pullbacks or confirmed breakouts, protect capital with explicit stops, and favor repeatable gains over risky bets. diff --git a/python/valuecell/agents/strategy_agent/trading_history/digest.py b/python/valuecell/agents/strategy_agent/trading_history/digest.py index 832b31686..fec26f689 100644 --- a/python/valuecell/agents/strategy_agent/trading_history/digest.py +++ b/python/valuecell/agents/strategy_agent/trading_history/digest.py @@ -14,6 +14,7 @@ def __init__(self, window: int = 50) -> None: def build(self, records: List[HistoryRecord]) -> TradeDigest: recent = records[-self._window :] by_instrument: Dict[str, TradeDigestEntry] = {} + stats: Dict[str, Dict[str, float | int]] = {} for record in recent: if record.kind != "execution": @@ -32,14 +33,106 @@ def build(self, records: List[HistoryRecord]) -> TradeDigest: realized_pnl=0.0, ) by_instrument[symbol] = entry + stats[symbol] = { + "wins": 0, + "losses": 0, + "holding_ms_sum": 0, + "holding_ms_count": 0, + } entry.trade_count += 1 realized = float(trade_dict.get("realized_pnl") or 0.0) entry.realized_pnl += realized entry.last_trade_ts = trade_dict.get("trade_ts") or entry.last_trade_ts + # Win/loss counting: prefer closed trades (with exit fields). Fallback to realized only if it's a close. + try: + outcome_pnl = None + has_exit = ( + trade_dict.get("exit_ts") is not None + or trade_dict.get("exit_price") is not None + or trade_dict.get("notional_exit") is not None + ) + if has_exit: + # Try compute PnL sign from entry/exit where possible (more robust for partial closes) + etype = (trade_dict.get("type") or "").upper() + entry_px = trade_dict.get("entry_price") + exit_px = trade_dict.get("exit_price") + notional_exit = trade_dict.get("notional_exit") + close_qty = None + if exit_px and notional_exit: + try: + if float(exit_px) > 0: + close_qty = float(notional_exit) / float(exit_px) + except Exception: + close_qty = None + if close_qty is None: + # Fallback to recorded quantity + q = trade_dict.get("quantity") + close_qty = float(q) if q is not None else None + if entry_px and exit_px and close_qty and close_qty > 0: + if etype == "LONG": + outcome_pnl = ( + float(exit_px) - float(entry_px) + ) * float(close_qty) + elif etype == "SHORT": + outcome_pnl = ( + float(entry_px) - float(exit_px) + ) * float(close_qty) + if outcome_pnl is None: + # Fallback to realized if available + outcome_pnl = ( + realized + if trade_dict.get("realized_pnl") is not None + else None + ) + else: + # No exit fields: avoid counting pure opens (which may carry fee-only negative realized) + outcome_pnl = None + + if outcome_pnl is not None: + if outcome_pnl > 0: + stats[symbol]["wins"] = int(stats[symbol]["wins"]) + 1 + elif outcome_pnl < 0: + stats[symbol]["losses"] = int(stats[symbol]["losses"]) + 1 + except Exception: + pass + + # Holding time aggregation when present + try: + hms = trade_dict.get("holding_ms") + if hms is not None: + stats[symbol]["holding_ms_sum"] = int( + stats[symbol]["holding_ms_sum"] + ) + int(hms) + stats[symbol]["holding_ms_count"] = ( + int(stats[symbol]["holding_ms_count"]) + 1 + ) + except Exception: + pass + timestamp = ( recent[-1].ts if recent else int(datetime.now(timezone.utc).timestamp() * 1000) ) + + # Finalize derived stats (win_rate, avg_holding_ms) + for symbol, entry in by_instrument.items(): + st = stats.get(symbol) or {} + wins = int(st.get("wins", 0) or 0) + losses = int(st.get("losses", 0) or 0) + denom = wins + losses + if denom > 0: + try: + entry.win_rate = float(wins) / float(denom) + except Exception: + entry.win_rate = None + hsum = int(st.get("holding_ms_sum", 0) or 0) + hcnt = int(st.get("holding_ms_count", 0) or 0) + if hcnt > 0: + try: + entry.avg_holding_ms = int(hsum / hcnt) + except Exception: + entry.avg_holding_ms = None + return TradeDigest(ts=timestamp, by_instrument=by_instrument)