diff --git a/python/configs/agent_cards/grid_agent.json b/python/configs/agent_cards/grid_agent.json new file mode 100644 index 000000000..df255ab93 --- /dev/null +++ b/python/configs/agent_cards/grid_agent.json @@ -0,0 +1,20 @@ +{ + "name": "GridStrategyAgent", + "display_name": "Grid Strategy Agent", + "url": "http://localhost:10007/", + "description": "LLM-driven strategy composer that turns market features into normalized trade instructions. Includes a simple runtime for demo and testing.", + "capabilities": { + "streaming": true, + "push_notifications": true + }, + "skills": [], + "enabled": true, + "metadata": { + "planner_passthrough": true, + "version": "0.1.0", + "author": "ValueCell Team", + "tags": ["strategy", "trading", "llm", "demo"], + "notes": "This card is a lightweight example; replace model api_key and tune parameters for production use.", + "local_agent_class": "valuecell.agents.grid_agent.grid_agent:GridStrategyAgent" + } +} diff --git a/python/valuecell/agents/common/trading/decision/grid_composer/__init__.py b/python/valuecell/agents/common/trading/decision/grid_composer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/valuecell/agents/common/trading/decision/grid_composer/grid_composer.py b/python/valuecell/agents/common/trading/decision/grid_composer/grid_composer.py new file mode 100644 index 000000000..eaab8e8d0 --- /dev/null +++ b/python/valuecell/agents/common/trading/decision/grid_composer/grid_composer.py @@ -0,0 +1,274 @@ +from __future__ import annotations + +import math +from typing import List, Optional + +from loguru import logger + +from ...models import ( + ComposeContext, + ComposeResult, + InstrumentRef, + MarketType, + TradeDecisionAction, + TradeDecisionItem, + TradePlanProposal, + UserRequest, +) +from ..interfaces import BaseComposer + + +class GridComposer(BaseComposer): + """Rule-based grid strategy composer. + + Goal: avoid LLM usage by applying simple mean-reversion grid rules to + produce an `TradePlanProposal`, then reuse the parent normalization and + risk controls (`_normalize_plan`) to output executable `TradeInstruction`s. + + Key rules: + - Define grid step with `step_pct` (e.g., 0.5%). + - With positions: price falling ≥ 1 step vs average adds; rising ≥ 1 step + reduces (max `max_steps` per cycle). + - Without positions: use recent change percent (prefer 1s feature) to + trigger open; spot opens long only, perps can open both directions. + - Base size is `equity * base_fraction / price`; `_normalize_plan` later + clamps by filters and buying power. + """ + + def __init__( + self, + request: UserRequest, + *, + step_pct: float = 0.005, + max_steps: int = 3, + base_fraction: float = 0.08, + default_slippage_bps: int = 25, + quantity_precision: float = 1e-9, + ) -> None: + super().__init__( + request, + default_slippage_bps=default_slippage_bps, + quantity_precision=quantity_precision, + ) + self._step_pct = float(step_pct) + self._max_steps = int(max_steps) + self._base_fraction = float(base_fraction) + + async def compose(self, context: ComposeContext) -> ComposeResult: + # Prepare buying power/constraints/price map, then generate plan and reuse parent normalization + equity, allowed_lev, constraints, _projected_gross, price_map = ( + self._init_buying_power_context(context) + ) + + items: List[TradeDecisionItem] = [] + ts = int(context.ts) + + # Pre-fetch micro change percentage from features (prefer 1s, fallback 1m) + def latest_change_pct(symbol: str) -> Optional[float]: + best: Optional[float] = None + best_rank = 999 + for fv in context.features or []: + try: + if str(getattr(fv.instrument, "symbol", "")) != symbol: + continue + interval = (fv.meta or {}).get("interval") + change = fv.values.get("change_pct") + if change is None: + continue + rank = 0 if interval == "1s" else (1 if interval == "1m" else 2) + if rank < best_rank: + best = float(change) + best_rank = rank + except Exception: + continue + return best + + symbols = list(dict.fromkeys(self._request.trading_config.symbols)) + is_spot = self._request.exchange_config.market_type == MarketType.SPOT + + for symbol in symbols: + price = float(price_map.get(symbol) or 0.0) + if price <= 0: + logger.debug("Skip {} due to missing/invalid price", symbol) + continue + + pos = context.portfolio.positions.get(symbol) + qty = float(getattr(pos, "quantity", 0.0) or 0.0) + avg_px = float(getattr(pos, "avg_price", 0.0) or 0.0) + + # Base order size: equity fraction converted to quantity; parent applies risk controls + base_qty = max(0.0, (equity * self._base_fraction) / price) + if base_qty <= 0: + continue + + # Compute steps from average price when holding; without average, trigger one step + def steps_from_avg(px: float, avg: float) -> int: + if avg <= 0: + return 1 + move_pct = abs(px / avg - 1.0) + k = int(math.floor(move_pct / max(self._step_pct, 1e-9))) + return max(0, min(k, self._max_steps)) + + # No position: use latest change to trigger direction (spot long-only) + if abs(qty) <= self._quantity_precision: + chg = latest_change_pct(symbol) + if chg is None: + # If no change feature available, skip conservatively + continue + if chg <= -self._step_pct: + # Short-term drop → open long + items.append( + TradeDecisionItem( + instrument=InstrumentRef( + symbol=symbol, + exchange_id=self._request.exchange_config.exchange_id, + ), + action=TradeDecisionAction.OPEN_LONG, + target_qty=base_qty, + leverage=( + 1.0 + if is_spot + else min( + float( + self._request.trading_config.max_leverage or 1.0 + ), + float( + constraints.max_leverage + or self._request.trading_config.max_leverage + or 1.0 + ), + ) + ), + confidence=min(1.0, abs(chg) / (2 * self._step_pct)), + rationale=f"Grid open-long: change_pct={chg:.4f} ≤ -step={self._step_pct:.4f}", + ) + ) + elif (not is_spot) and chg >= self._step_pct: + # Short-term rise → open short (perpetual only) + items.append( + TradeDecisionItem( + instrument=InstrumentRef( + symbol=symbol, + exchange_id=self._request.exchange_config.exchange_id, + ), + action=TradeDecisionAction.OPEN_SHORT, + target_qty=base_qty, + leverage=min( + float(self._request.trading_config.max_leverage or 1.0), + float( + constraints.max_leverage + or self._request.trading_config.max_leverage + or 1.0 + ), + ), + confidence=min(1.0, abs(chg) / (2 * self._step_pct)), + rationale=f"Grid open-short: change_pct={chg:.4f} ≥ step={self._step_pct:.4f}", + ) + ) + # Otherwise NOOP + continue + + # With position: adjust around average using grid + k = steps_from_avg(price, avg_px) + if k <= 0: + # No grid step triggered → NOOP + continue + + # Long: add on down, reduce on up + if qty > 0: + down = (avg_px > 0) and (price <= avg_px * (1.0 - self._step_pct)) + up = (avg_px > 0) and (price >= avg_px * (1.0 + self._step_pct)) + if down: + items.append( + TradeDecisionItem( + instrument=InstrumentRef( + symbol=symbol, + exchange_id=self._request.exchange_config.exchange_id, + ), + action=TradeDecisionAction.OPEN_LONG, + target_qty=base_qty * k, + leverage=1.0 + if is_spot + else min( + float(self._request.trading_config.max_leverage or 1.0), + float( + constraints.max_leverage + or self._request.trading_config.max_leverage + or 1.0 + ), + ), + confidence=min(1.0, k / float(self._max_steps)), + rationale=f"Grid long add: price {price:.4f} ≤ avg {avg_px:.4f} by {k} steps", + ) + ) + elif up: + items.append( + TradeDecisionItem( + instrument=InstrumentRef( + symbol=symbol, + exchange_id=self._request.exchange_config.exchange_id, + ), + action=TradeDecisionAction.CLOSE_LONG, + target_qty=min(abs(qty), base_qty * k), + leverage=1.0, + confidence=min(1.0, k / float(self._max_steps)), + rationale=f"Grid long reduce: price {price:.4f} ≥ avg {avg_px:.4f} by {k} steps", + ) + ) + continue + + # Short: add on up, cover on down + if qty < 0: + up = (avg_px > 0) and (price >= avg_px * (1.0 + self._step_pct)) + down = (avg_px > 0) and (price <= avg_px * (1.0 - self._step_pct)) + if up and (not is_spot): + items.append( + TradeDecisionItem( + instrument=InstrumentRef( + symbol=symbol, + exchange_id=self._request.exchange_config.exchange_id, + ), + action=TradeDecisionAction.OPEN_SHORT, + target_qty=base_qty * k, + leverage=min( + float(self._request.trading_config.max_leverage or 1.0), + float( + constraints.max_leverage + or self._request.trading_config.max_leverage + or 1.0 + ), + ), + confidence=min(1.0, k / float(self._max_steps)), + rationale=f"Grid short add: price {price:.4f} ≥ avg {avg_px:.4f} by {k} steps", + ) + ) + elif down: + items.append( + TradeDecisionItem( + instrument=InstrumentRef( + symbol=symbol, + exchange_id=self._request.exchange_config.exchange_id, + ), + action=TradeDecisionAction.CLOSE_SHORT, + target_qty=min(abs(qty), base_qty * k), + leverage=1.0, + confidence=min(1.0, k / float(self._max_steps)), + rationale=f"Grid short cover: price {price:.4f} ≤ avg {avg_px:.4f} by {k} steps", + ) + ) + continue + + if not items: + logger.debug( + "GridComposer produced NOOP plan for compose_id={}", context.compose_id + ) + return ComposeResult(instructions=[], rationale="Grid NOOP") + + plan = TradePlanProposal( + ts=ts, + items=items, + rationale=f"Grid step={self._step_pct:.4f}, base_fraction={self._base_fraction:.3f}", + ) + # Reuse parent normalization: quantity filters, buying power, cap_factor, reduceOnly, etc. + normalized = self._normalize_plan(context, plan) + return ComposeResult(instructions=normalized, rationale=plan.rationale) diff --git a/python/valuecell/agents/common/trading/decision/interfaces.py b/python/valuecell/agents/common/trading/decision/interfaces.py index bb1752094..024e427d2 100644 --- a/python/valuecell/agents/common/trading/decision/interfaces.py +++ b/python/valuecell/agents/common/trading/decision/interfaces.py @@ -1,11 +1,23 @@ from __future__ import annotations +import math from abc import ABC, abstractmethod +from typing import Dict, List, Optional -from valuecell.agents.common.trading.models import ( +from loguru import logger + +from ..models import ( ComposeContext, ComposeResult, + Constraints, + MarketType, + TradeDecisionAction, + TradeInstruction, + TradePlanProposal, + TradeSide, + UserRequest, ) +from ..utils import extract_price_map # Contracts for decision making (module-local abstract interfaces). # Composer hosts the LLM call and guardrails, producing executable instructions. @@ -18,6 +30,17 @@ class BaseComposer(ABC): Output: TradeInstruction list """ + def __init__( + self, + request: UserRequest, + *, + default_slippage_bps: int = 25, + quantity_precision: float = 1e-9, + ) -> None: + self._request = request + self._default_slippage_bps = default_slippage_bps + self._quantity_precision = quantity_precision + @abstractmethod async def compose(self, context: ComposeContext) -> ComposeResult: """Produce normalized trade instructions given the current context. @@ -27,3 +50,524 @@ async def compose(self, context: ComposeContext) -> ComposeResult: a validated ComposeResult containing instructions and optional rationale. """ raise NotImplementedError + + def _init_buying_power_context( + self, + context: ComposeContext, + ) -> tuple: + """Initialize buying power tracking context. + + Returns: + (equity, allowed_lev, constraints, projected_gross, price_map) + """ + constraints = context.portfolio.constraints or Constraints( + max_positions=self._request.trading_config.max_positions, + max_leverage=self._request.trading_config.max_leverage, + ) + + # Compute equity based on market type: + if self._request.exchange_config.market_type == MarketType.SPOT: + # Spot: use available account_balance as equity + equity = float(context.portfolio.account_balance or 0.0) + else: + # Derivatives: use portfolio equity (account_balance + net exposure), or total_value if provided + if getattr(context.portfolio, "total_value", None) is not None: + equity = float(context.portfolio.total_value or 0.0) + else: + account_balance = float(context.portfolio.account_balance or 0.0) + net = float(context.portfolio.net_exposure or 0.0) + equity = account_balance + net + + # Market-type leverage policy: SPOT -> 1.0; Derivatives -> constraints + if self._request.exchange_config.market_type == MarketType.SPOT: + allowed_lev = 1.0 + else: + allowed_lev = ( + float(constraints.max_leverage) + if constraints.max_leverage is not None + else 1.0 + ) + + # Initialize projected gross exposure + price_map = extract_price_map(context.features) + if getattr(context.portfolio, "gross_exposure", None) is not None: + projected_gross = float(context.portfolio.gross_exposure or 0.0) + else: + projected_gross = 0.0 + for sym, snap in context.portfolio.positions.items(): + px = float( + price_map.get(sym) or getattr(snap, "mark_price", 0.0) or 0.0 + ) + projected_gross += abs(float(snap.quantity)) * px + + return equity, allowed_lev, constraints, projected_gross, price_map + + def _normalize_quantity( + self, + symbol: str, + quantity: float, + side: TradeSide, + current_qty: float, + constraints: Constraints, + equity: float, + allowed_lev: float, + projected_gross: float, + price_map: Dict[str, float], + ) -> tuple: + """Normalize quantity through all guardrails: filters, caps, and buying power. + + Returns: + (final_qty, consumed_buying_power_delta) + """ + qty = quantity + + # Step 1: per-order filters (step size, min notional, max order qty) + logger.debug(f"_normalize_quantity Step 1: {symbol} qty={qty} before filters") + qty = self._apply_quantity_filters( + symbol, + qty, + float(constraints.quantity_step or 0.0), + float(constraints.min_trade_qty or 0.0), + constraints.max_order_qty, + constraints.min_notional, + price_map, + ) + logger.debug(f"_normalize_quantity Step 1: {symbol} qty={qty} after filters") + + if qty <= self._quantity_precision: + logger.warning( + f"Post-filter quantity for {symbol} is {qty} <= precision {self._quantity_precision} -> returning 0" + ) + return 0.0, 0.0 + + # Step 2: notional/leverage cap (Phase 1 rules) + price = price_map.get(symbol) + if price is not None and price > 0: + # cap_factor controls how aggressively we allow position sizing by notional. + # Make it configurable via trading_config.cap_factor (strategy parameter). + cap_factor = float(self._request.trading_config.cap_factor or 1.5) + if constraints.quantity_step and constraints.quantity_step > 0: + cap_factor = max(cap_factor, 1.5) + + allowed_lev_cap = ( + allowed_lev if math.isfinite(allowed_lev) else float("inf") + ) + max_abs_by_factor = (cap_factor * equity) / float(price) + max_abs_by_lev = (allowed_lev_cap * equity) / float(price) + max_abs_final = min(max_abs_by_factor, max_abs_by_lev) + + desired_final = current_qty + (qty if side is TradeSide.BUY else -qty) + if math.isfinite(max_abs_final) and abs(desired_final) > max_abs_final: + target_abs = max_abs_final + new_qty = max(0.0, target_abs - abs(current_qty)) + if new_qty < qty: + logger.debug( + "Capping {} qty due to notional/leverage (price={}, cap_factor={}, old_qty={}, new_qty={})", + symbol, + price, + cap_factor, + qty, + new_qty, + ) + qty = new_qty + + if qty <= self._quantity_precision: + logger.debug( + "Post-cap quantity for {} is {} <= precision {} -> skipping", + symbol, + qty, + self._quantity_precision, + ) + return 0.0, 0.0 + + # Step 3: buying power clamp + px = price_map.get(symbol) + if px is None or px <= 0: + # Without a valid price, we cannot safely assess notional or buying power. + # Allow only de-risking (reductions/closures); block new/exposure-increasing trades. + is_reduction = (side is TradeSide.BUY and current_qty < 0) or ( + side is TradeSide.SELL and current_qty > 0 + ) + if is_reduction: + # Clamp to the current absolute position to avoid overshooting zero + final_qty = min(qty, abs(current_qty)) + logger.warning( + "Missing price for {} — allowing reduce-only trade: final_qty={} (current_qty={})", + symbol, + final_qty, + current_qty, + ) + else: + logger.warning( + "Missing price for {} — blocking exposure-increasing trade (side={}, qty={})", + symbol, + side, + qty, + ) + return 0.0, 0.0 + else: + if self._request.exchange_config.market_type == MarketType.SPOT: + # Spot: cash-only buying power + avail_bp = max(0.0, equity) + else: + # Derivatives: margin-based buying power + avail_bp = max(0.0, equity * allowed_lev - projected_gross) + # When buying power is exhausted, we should still allow reductions/closures. + # Set additional purchasable units to 0 but proceed with piecewise logic + # so that de-risking trades are not blocked. + a = abs(current_qty) + # Conservative buffer for expected slippage: assume execution price may move + # against us by `self._default_slippage_bps`. Use a higher effective price + # when computing how many units fit into available buying power so that + # planned increases don't accidentally exceed real-world costs. + slip_bps = float(self._default_slippage_bps or 0.0) + slip = slip_bps / 10000.0 + effective_px = float(px) * (1.0 + slip) + ap_units = (avail_bp / effective_px) if avail_bp > 0 else 0.0 + + # Piecewise: additional gross consumption must fit into available BP + if side is TradeSide.BUY: + if current_qty >= 0: + q_allowed = ap_units + else: + if qty <= 2 * a: + q_allowed = qty + else: + q_allowed = 2 * a + ap_units + else: # SELL + if current_qty <= 0: + q_allowed = ap_units + else: + if qty <= 2 * a: + q_allowed = qty + else: + q_allowed = 2 * a + ap_units + + final_qty = max(0.0, min(qty, q_allowed)) + + if final_qty <= self._quantity_precision: + logger.debug( + "Post-buying-power quantity for {} is {} <= precision {} -> skipping", + symbol, + final_qty, + self._quantity_precision, + ) + return 0.0, 0.0 + + # Compute consumed buying power delta + abs_before = abs(current_qty) + abs_after = abs( + current_qty + (final_qty if side is TradeSide.BUY else -final_qty) + ) + delta_abs = abs_after - abs_before + # Use effective price (with slippage) for consumed buying power to stay conservative + # If px was missing, we would have returned earlier for exposure-increasing trades; + # for reduction-only trades, treat consumed buying power as 0. + if px is None or px <= 0: + consumed_bp_delta = 0.0 + else: + # Recompute effective price consistently with the clamp + slip_bps = float(self._default_slippage_bps or 0.0) + slip = slip_bps / 10000.0 + effective_px = float(px) * (1.0 + slip) + consumed_bp_delta = (delta_abs * effective_px) if delta_abs > 0 else 0.0 + + return final_qty, consumed_bp_delta + + def _normalize_plan( + self, + context: ComposeContext, + plan: TradePlanProposal, + ) -> List[TradeInstruction]: + instructions: List[TradeInstruction] = [] + + # --- prepare state --- + projected_positions: Dict[str, float] = { + symbol: snapshot.quantity + for symbol, snapshot in context.portfolio.positions.items() + } + + def _count_active(pos_map: Dict[str, float]) -> int: + return sum(1 for q in pos_map.values() if abs(q) > self._quantity_precision) + + active_positions = _count_active(projected_positions) + + # Initialize buying power context + equity, allowed_lev, constraints, projected_gross, price_map = ( + self._init_buying_power_context(context) + ) + + max_positions = constraints.max_positions + max_position_qty = constraints.max_position_qty + + # --- process each planned item --- + for idx, item in enumerate(plan.items): + symbol = item.instrument.symbol + current_qty = projected_positions.get(symbol, 0.0) + + # determine the intended target quantity (clamped by max_position_qty) + target_qty = self._resolve_target_quantity( + item, current_qty, max_position_qty + ) + # SPOT long-only: do not allow negative target quantities + if ( + self._request.exchange_config.market_type == MarketType.SPOT + and target_qty < 0 + ): + target_qty = 0.0 + # Enforce: single-lot per symbol and no direct flip. If target flips side, + # split into two sub-steps: first flat to 0, then open to target side. + sub_targets: List[float] = [] + if current_qty * target_qty < 0: + sub_targets = [0.0, float(target_qty)] + else: + sub_targets = [float(target_qty)] + + local_current = float(current_qty) + for sub_i, sub_target in enumerate(sub_targets): + delta = sub_target - local_current + + if abs(delta) <= self._quantity_precision: + continue + + is_new_position = ( + abs(local_current) <= self._quantity_precision + and abs(sub_target) > self._quantity_precision + ) + if ( + is_new_position + and max_positions is not None + and active_positions >= int(max_positions) + ): + logger.warning( + "Skipping symbol {} due to max_positions constraint (active={} max={})", + symbol, + active_positions, + max_positions, + ) + continue + + side = TradeSide.BUY if delta > 0 else TradeSide.SELL + # requested leverage (default 1.0), clamped to constraints + requested_lev = ( + float(item.leverage) + if getattr(item, "leverage", None) is not None + else 1.0 + ) + allowed_lev_item = ( + float(constraints.max_leverage) + if constraints.max_leverage is not None + else requested_lev + ) + if self._request.exchange_config.market_type == MarketType.SPOT: + # Spot: long-only, no leverage + final_leverage = 1.0 + else: + final_leverage = max(1.0, min(requested_lev, allowed_lev_item)) + quantity = abs(delta) + + # Normalize quantity through all guardrails + logger.debug(f"Before normalize: {symbol} quantity={quantity}") + quantity, consumed_bp = self._normalize_quantity( + symbol, + quantity, + side, + local_current, + constraints, + equity, + allowed_lev, + projected_gross, + price_map, + ) + logger.debug( + f"After normalize: {symbol} quantity={quantity}, consumed_bp={consumed_bp}" + ) + + if quantity <= self._quantity_precision: + logger.warning( + f"SKIPPED: {symbol} quantity={quantity} <= precision={self._quantity_precision} after normalization" + ) + continue + + # Update projected positions for subsequent guardrails + signed_delta = quantity if side is TradeSide.BUY else -quantity + projected_positions[symbol] = local_current + signed_delta + projected_gross += consumed_bp + + # active positions accounting + if is_new_position: + active_positions += 1 + if abs(projected_positions[symbol]) <= self._quantity_precision: + active_positions = max(active_positions - 1, 0) + + # Use a stable per-item sub-index to keep instruction ids unique + instr = self._create_instruction( + context, + idx * 10 + sub_i, + item, + symbol, + side, + quantity, + final_leverage, + local_current, + sub_target, + ) + instructions.append(instr) + + # advance local_current for the next sub-step + local_current = projected_positions[symbol] + + return instructions + + def _create_instruction( + self, + context: ComposeContext, + idx: int, + item, + symbol: str, + side: TradeSide, + quantity: float, + final_leverage: float, + current_qty: float, + target_qty: float, + ) -> TradeInstruction: + """Create a normalized TradeInstruction with metadata.""" + final_target = current_qty + (quantity if side is TradeSide.BUY else -quantity) + meta = { + "requested_target_qty": target_qty, + "current_qty": current_qty, + "final_target_qty": final_target, + "action": item.action.value, + } + if item.confidence is not None: + meta["confidence"] = item.confidence + if item.rationale: + meta["rationale"] = item.rationale + + # For derivatives/perpetual markets, mark reduceOnly when instruction reduces absolute exposure to avoid accidental reverse opens + try: + if self._request.exchange_config.market_type != MarketType.SPOT: + if abs(final_target) < abs(current_qty): + meta["reduceOnly"] = True + # Bybit uses a different param key + if ( + self._request.exchange_config.exchange_id or "" + ).lower() == "bybit": + meta["reduce_only"] = True + except Exception: + # Ignore any exception; do not block instruction creation + pass + + instruction = TradeInstruction( + instruction_id=f"{context.compose_id}:{symbol}:{idx}", + compose_id=context.compose_id, + instrument=item.instrument, + action=item.action, + side=side, + quantity=quantity, + leverage=final_leverage, + max_slippage_bps=self._default_slippage_bps, + meta=meta, + ) + logger.debug( + "Created TradeInstruction {} for {} side={} qty={} lev={}", + instruction.instruction_id, + symbol, + instruction.side, + instruction.quantity, + final_leverage, + ) + return instruction + + def _resolve_target_quantity( + self, + item, + current_qty: float, + max_position_qty: Optional[float], + ) -> float: + # NOOP: keep current position + if item.action == TradeDecisionAction.NOOP: + return current_qty + + # Interpret target_qty as operation magnitude (not final position), normalized to positive + mag = abs(float(item.target_qty)) + target = current_qty + + # Compute target position per open/close long/short action + if item.action == TradeDecisionAction.OPEN_LONG: + base = current_qty if current_qty > 0 else 0.0 + target = base + mag + elif item.action == TradeDecisionAction.OPEN_SHORT: + base = current_qty if current_qty < 0 else 0.0 + target = base - mag + elif item.action == TradeDecisionAction.CLOSE_LONG: + if current_qty > 0: + target = max(current_qty - mag, 0.0) + else: + # No long position, keep unchanged + target = current_qty + elif item.action == TradeDecisionAction.CLOSE_SHORT: + if current_qty < 0: + target = min(current_qty + mag, 0.0) + else: + # No short position, keep unchanged + target = current_qty + else: + # Fallback: treat unknown action as NOOP + target = current_qty + + # Clamp by max_position_qty (symmetric) + if max_position_qty is not None: + max_abs = abs(float(max_position_qty)) + target = max(-max_abs, min(max_abs, target)) + + return target + + def _apply_quantity_filters( + self, + symbol: str, + quantity: float, + quantity_step: float, + min_trade_qty: float, + max_order_qty: Optional[float], + min_notional: Optional[float], + price_map: Dict[str, float], + ) -> float: + qty = quantity + logger.debug(f"Filtering {symbol}: initial qty={qty}") + + if max_order_qty is not None: + qty = min(qty, float(max_order_qty)) + logger.debug(f"After max_order_qty filter: qty={qty}") + + if quantity_step > 0: + qty = math.floor(qty / quantity_step) * quantity_step + logger.debug(f"After quantity_step filter: qty={qty}") + + if qty <= 0: + logger.warning(f"FILTERED: {symbol} qty={qty} <= 0") + return 0.0 + + if qty < min_trade_qty: + logger.warning( + f"FILTERED: {symbol} qty={qty} < min_trade_qty={min_trade_qty}" + ) + return 0.0 + + if min_notional is not None: + price = price_map.get(symbol) + if price is None: + logger.warning(f"FILTERED: {symbol} no price reference available") + return 0.0 + notional = qty * price + if notional < float(min_notional): + logger.warning( + f"FILTERED: {symbol} notional={notional:.4f} < min_notional={min_notional}" + ) + return 0.0 + logger.debug( + f"Passed min_notional check: notional={notional:.4f} >= {min_notional}" + ) + + logger.debug(f"Final qty for {symbol}: {qty}") + return qty diff --git a/python/valuecell/agents/common/trading/decision/prompt_based/composer.py b/python/valuecell/agents/common/trading/decision/prompt_based/composer.py index 3cf0ea76d..a7698c6a5 100644 --- a/python/valuecell/agents/common/trading/decision/prompt_based/composer.py +++ b/python/valuecell/agents/common/trading/decision/prompt_based/composer.py @@ -1,9 +1,8 @@ from __future__ import annotations import json -import math from datetime import datetime, timezone -from typing import Dict, List, Optional +from typing import Dict from agno.agent import Agent as AgnoAgent from loguru import logger @@ -11,21 +10,17 @@ from valuecell.utils import env as env_utils from valuecell.utils import model as model_utils -from ...constants import FEATURE_GROUP_BY_KEY from ...models import ( ComposeContext, ComposeResult, - Constraints, - FeatureVector, - LlmDecisionAction, - LlmPlanProposal, - MarketType, - TradeInstruction, - TradeSide, + TradeDecisionAction, + TradePlanProposal, UserRequest, ) from ...utils import ( - extract_price_map, + extract_market_section, + group_features, + prune_none, send_discord_message, ) from ..interfaces import BaseComposer @@ -104,93 +99,6 @@ async def compose(self, context: ComposeContext) -> ComposeResult: return ComposeResult(instructions=normalized, rationale=plan.rationale) # ------------------------------------------------------------------ - # Prompt + LLM helpers - - @staticmethod - def _prune_none(obj): - """Recursively remove None, empty dict, and empty list values.""" - if isinstance(obj, dict): - pruned = { - k: LlmComposer._prune_none(v) for k, v in obj.items() if v is not None - } - return {k: v for k, v in pruned.items() if v not in (None, {}, [])} - if isinstance(obj, list): - pruned = [LlmComposer._prune_none(v) for v in obj] - return [v for v in pruned if v not in (None, {}, [])] - return obj - - def _extract_market_section(self, market_data: List[Dict]) -> Dict: - """Extract decision-critical metrics from market feature entries.""" - - compact: Dict[str, Dict] = {} - for item in market_data: - symbol = (item.get("instrument") or {}).get("symbol") - if not symbol: - continue - - values = item.get("values") or {} - entry: Dict[str, float] = {} - - for feature_key, alias in ( - ("price.last", "last"), - ("price.close", "close"), - ("price.open", "open"), - ("price.high", "high"), - ("price.low", "low"), - ("price.bid", "bid"), - ("price.ask", "ask"), - ("price.change_pct", "change_pct"), - ("price.volume", "volume"), - ): - if feature_key in values and values[feature_key] is not None: - entry[alias] = values[feature_key] - - if values.get("open_interest") is not None: - entry["open_interest"] = values["open_interest"] - - if values.get("funding.rate") is not None: - entry["funding_rate"] = values["funding.rate"] - if values.get("funding.mark_price") is not None: - entry["mark_price"] = values["funding.mark_price"] - - normalized = {k: v for k, v in entry.items() if v is not None} - if normalized: - compact[symbol] = normalized - - return compact - - def _organize_features(self, features: List[FeatureVector]) -> Dict: - """Organize features by grouping metadata and trim payload noise. - - Prefers the FeatureVector.meta group_by_key when present, otherwise - falls back to the interval tag. This allows callers to introduce - ad-hoc groupings (e.g., market snapshots) without overloading the - interval field. - """ - grouped: Dict[str, List] = {} - - for fv in features: - data = fv.model_dump(mode="json") - meta = data.get("meta") or {} - group_key = meta.get(FEATURE_GROUP_BY_KEY) - - if not group_key: - continue - - # Keep only concise metadata helpful for the LLM prompt. - trimmed_meta = {} - if meta.get("interval"): - trimmed_meta["interval"] = meta["interval"] - if meta.get("count") is not None: - trimmed_meta["count"] = meta["count"] - if trimmed_meta: - data["meta"] = trimmed_meta - else: - data.pop("meta", None) - - grouped.setdefault(group_key, []).append(data) - - return grouped def _build_summary(self, context: ComposeContext) -> Dict: """Build portfolio summary with risk metrics.""" @@ -223,8 +131,8 @@ def _build_llm_prompt(self, context: ComposeContext) -> str: # Build components summary = self._build_summary(context) - features = self._organize_features(context.features) - market = self._extract_market_section(features.get("market_snapshot", [])) + features = group_features(context.features) + market = extract_market_section(features.get("market_snapshot", [])) # Portfolio positions positions = [ @@ -245,7 +153,7 @@ def _build_llm_prompt(self, context: ComposeContext) -> str: else {} ) - payload = self._prune_none( + payload = prune_none( { "strategy_prompt": self._build_prompt_text(), "summary": summary, @@ -266,7 +174,7 @@ def _build_llm_prompt(self, context: ComposeContext) -> str: return f"{instructions}\n\nContext:\n{json.dumps(payload, ensure_ascii=False)}" - async def _call_llm(self, prompt: str) -> LlmPlanProposal: + async def _call_llm(self, prompt: str) -> TradePlanProposal: """Invoke an LLM asynchronously and parse the response into LlmPlanProposal. This implementation follows the parser_agent pattern: it creates a model @@ -286,7 +194,7 @@ async def _call_llm(self, prompt: str) -> LlmPlanProposal: # Wrap model in an Agent (consistent with parser_agent usage) agent = AgnoAgent( model=model, - output_schema=LlmPlanProposal, + output_schema=TradePlanProposal, markdown=False, instructions=[SYSTEM_PROMPT], use_json_mode=model_utils.model_should_use_json_mode(model), @@ -297,17 +205,17 @@ async def _call_llm(self, prompt: str) -> LlmPlanProposal: content = getattr(response, "content", None) or response logger.debug("Received LLM response {}", content) # If the agent already returned a validated model, return it directly - if isinstance(content, LlmPlanProposal): + if isinstance(content, TradePlanProposal): return content logger.error("LLM output failed validation: {}", content) - return LlmPlanProposal( + return TradePlanProposal( ts=int(datetime.now(timezone.utc).timestamp() * 1000), items=[], rationale="LLM output failed validation", ) - async def _send_plan_to_discord(self, plan: LlmPlanProposal) -> None: + async def _send_plan_to_discord(self, plan: TradePlanProposal) -> None: """Send plan rationale to Discord when there are actionable items. Behavior: @@ -317,7 +225,7 @@ async def _send_plan_to_discord(self, plan: LlmPlanProposal) -> None: - Reads webhook from `STRATEGY_AGENT_DISCORD_WEBHOOK_URL` (handled by `send_discord_message`). Does nothing if no actionable items exist. """ - actionable = [it for it in plan.items if it.action != LlmDecisionAction.NOOP] + actionable = [it for it in plan.items if it.action != TradeDecisionAction.NOOP] if not actionable: return @@ -354,527 +262,3 @@ async def _send_plan_to_discord(self, plan: LlmPlanProposal) -> None: ) except Exception as exc: logger.warning("Error sending plan to Discord, err={}", exc) - - # ------------------------------------------------------------------ - # Normalization / guardrails helpers - - def _init_buying_power_context( - self, - context: ComposeContext, - ) -> tuple: - """Initialize buying power tracking context. - - Returns: - (equity, allowed_lev, constraints, projected_gross, price_map) - """ - constraints = context.portfolio.constraints or Constraints( - max_positions=self._request.trading_config.max_positions, - max_leverage=self._request.trading_config.max_leverage, - ) - - # Compute equity based on market type: - if self._request.exchange_config.market_type == MarketType.SPOT: - # Spot: use available account_balance as equity - equity = float(context.portfolio.account_balance or 0.0) - else: - # Derivatives: use portfolio equity (account_balance + net exposure), or total_value if provided - if getattr(context.portfolio, "total_value", None) is not None: - equity = float(context.portfolio.total_value or 0.0) - else: - account_balance = float(context.portfolio.account_balance or 0.0) - net = float(context.portfolio.net_exposure or 0.0) - equity = account_balance + net - - # Market-type leverage policy: SPOT -> 1.0; Derivatives -> constraints - if self._request.exchange_config.market_type == MarketType.SPOT: - allowed_lev = 1.0 - else: - allowed_lev = ( - float(constraints.max_leverage) - if constraints.max_leverage is not None - else 1.0 - ) - - # Initialize projected gross exposure - price_map = extract_price_map(context.features) - if getattr(context.portfolio, "gross_exposure", None) is not None: - projected_gross = float(context.portfolio.gross_exposure or 0.0) - else: - projected_gross = 0.0 - for sym, snap in context.portfolio.positions.items(): - px = float( - price_map.get(sym) or getattr(snap, "mark_price", 0.0) or 0.0 - ) - projected_gross += abs(float(snap.quantity)) * px - - return equity, allowed_lev, constraints, projected_gross, price_map - - def _normalize_quantity( - self, - symbol: str, - quantity: float, - side: TradeSide, - current_qty: float, - constraints: Constraints, - equity: float, - allowed_lev: float, - projected_gross: float, - price_map: Dict[str, float], - ) -> tuple: - """Normalize quantity through all guardrails: filters, caps, and buying power. - - Returns: - (final_qty, consumed_buying_power_delta) - """ - qty = quantity - - # Step 1: per-order filters (step size, min notional, max order qty) - logger.debug(f"_normalize_quantity Step 1: {symbol} qty={qty} before filters") - qty = self._apply_quantity_filters( - symbol, - qty, - float(constraints.quantity_step or 0.0), - float(constraints.min_trade_qty or 0.0), - constraints.max_order_qty, - constraints.min_notional, - price_map, - ) - logger.debug(f"_normalize_quantity Step 1: {symbol} qty={qty} after filters") - - if qty <= self._quantity_precision: - logger.warning( - f"Post-filter quantity for {symbol} is {qty} <= precision {self._quantity_precision} -> returning 0" - ) - return 0.0, 0.0 - - # Step 2: notional/leverage cap (Phase 1 rules) - price = price_map.get(symbol) - if price is not None and price > 0: - # cap_factor controls how aggressively we allow position sizing by notional. - # Make it configurable via trading_config.cap_factor (strategy parameter). - cap_factor = float(self._request.trading_config.cap_factor or 1.5) - if constraints.quantity_step and constraints.quantity_step > 0: - cap_factor = max(cap_factor, 1.5) - - allowed_lev_cap = ( - allowed_lev if math.isfinite(allowed_lev) else float("inf") - ) - max_abs_by_factor = (cap_factor * equity) / float(price) - max_abs_by_lev = (allowed_lev_cap * equity) / float(price) - max_abs_final = min(max_abs_by_factor, max_abs_by_lev) - - desired_final = current_qty + (qty if side is TradeSide.BUY else -qty) - if math.isfinite(max_abs_final) and abs(desired_final) > max_abs_final: - target_abs = max_abs_final - new_qty = max(0.0, target_abs - abs(current_qty)) - if new_qty < qty: - logger.debug( - "Capping {} qty due to notional/leverage (price={}, cap_factor={}, old_qty={}, new_qty={})", - symbol, - price, - cap_factor, - qty, - new_qty, - ) - qty = new_qty - - if qty <= self._quantity_precision: - logger.debug( - "Post-cap quantity for {} is {} <= precision {} -> skipping", - symbol, - qty, - self._quantity_precision, - ) - return 0.0, 0.0 - - # Step 3: buying power clamp - px = price_map.get(symbol) - if px is None or px <= 0: - # Without a valid price, we cannot safely assess notional or buying power. - # Allow only de-risking (reductions/closures); block new/exposure-increasing trades. - is_reduction = (side is TradeSide.BUY and current_qty < 0) or ( - side is TradeSide.SELL and current_qty > 0 - ) - if is_reduction: - # Clamp to the current absolute position to avoid overshooting zero - final_qty = min(qty, abs(current_qty)) - logger.warning( - "Missing price for {} — allowing reduce-only trade: final_qty={} (current_qty={})", - symbol, - final_qty, - current_qty, - ) - else: - logger.warning( - "Missing price for {} — blocking exposure-increasing trade (side={}, qty={})", - symbol, - side, - qty, - ) - return 0.0, 0.0 - else: - if self._request.exchange_config.market_type == MarketType.SPOT: - # Spot: cash-only buying power - avail_bp = max(0.0, equity) - else: - # Derivatives: margin-based buying power - avail_bp = max(0.0, equity * allowed_lev - projected_gross) - # When buying power is exhausted, we should still allow reductions/closures. - # Set additional purchasable units to 0 but proceed with piecewise logic - # so that de-risking trades are not blocked. - a = abs(current_qty) - # Conservative buffer for expected slippage: assume execution price may move - # against us by `self._default_slippage_bps`. Use a higher effective price - # when computing how many units fit into available buying power so that - # planned increases don't accidentally exceed real-world costs. - slip_bps = float(self._default_slippage_bps or 0.0) - slip = slip_bps / 10000.0 - effective_px = float(px) * (1.0 + slip) - ap_units = (avail_bp / effective_px) if avail_bp > 0 else 0.0 - - # Piecewise: additional gross consumption must fit into available BP - if side is TradeSide.BUY: - if current_qty >= 0: - q_allowed = ap_units - else: - if qty <= 2 * a: - q_allowed = qty - else: - q_allowed = 2 * a + ap_units - else: # SELL - if current_qty <= 0: - q_allowed = ap_units - else: - if qty <= 2 * a: - q_allowed = qty - else: - q_allowed = 2 * a + ap_units - - final_qty = max(0.0, min(qty, q_allowed)) - - if final_qty <= self._quantity_precision: - logger.debug( - "Post-buying-power quantity for {} is {} <= precision {} -> skipping", - symbol, - final_qty, - self._quantity_precision, - ) - return 0.0, 0.0 - - # Compute consumed buying power delta - abs_before = abs(current_qty) - abs_after = abs( - current_qty + (final_qty if side is TradeSide.BUY else -final_qty) - ) - delta_abs = abs_after - abs_before - # Use effective price (with slippage) for consumed buying power to stay conservative - # If px was missing, we would have returned earlier for exposure-increasing trades; - # for reduction-only trades, treat consumed buying power as 0. - if px is None or px <= 0: - consumed_bp_delta = 0.0 - else: - # Recompute effective price consistently with the clamp - slip_bps = float(self._default_slippage_bps or 0.0) - slip = slip_bps / 10000.0 - effective_px = float(px) * (1.0 + slip) - consumed_bp_delta = (delta_abs * effective_px) if delta_abs > 0 else 0.0 - - return final_qty, consumed_bp_delta - - def _normalize_plan( - self, - context: ComposeContext, - plan: LlmPlanProposal, - ) -> List[TradeInstruction]: - instructions: List[TradeInstruction] = [] - - # --- prepare state --- - projected_positions: Dict[str, float] = { - symbol: snapshot.quantity - for symbol, snapshot in context.portfolio.positions.items() - } - - def _count_active(pos_map: Dict[str, float]) -> int: - return sum(1 for q in pos_map.values() if abs(q) > self._quantity_precision) - - active_positions = _count_active(projected_positions) - - # Initialize buying power context - equity, allowed_lev, constraints, projected_gross, price_map = ( - self._init_buying_power_context(context) - ) - - max_positions = constraints.max_positions - max_position_qty = constraints.max_position_qty - - # --- process each planned item --- - for idx, item in enumerate(plan.items): - symbol = item.instrument.symbol - current_qty = projected_positions.get(symbol, 0.0) - - # determine the intended target quantity (clamped by max_position_qty) - target_qty = self._resolve_target_quantity( - item, current_qty, max_position_qty - ) - # SPOT long-only: do not allow negative target quantities - if ( - self._request.exchange_config.market_type == MarketType.SPOT - and target_qty < 0 - ): - target_qty = 0.0 - # Enforce: single-lot per symbol and no direct flip. If target flips side, - # split into two sub-steps: first flat to 0, then open to target side. - sub_targets: List[float] = [] - if current_qty * target_qty < 0: - sub_targets = [0.0, float(target_qty)] - else: - sub_targets = [float(target_qty)] - - local_current = float(current_qty) - for sub_i, sub_target in enumerate(sub_targets): - delta = sub_target - local_current - - if abs(delta) <= self._quantity_precision: - continue - - is_new_position = ( - abs(local_current) <= self._quantity_precision - and abs(sub_target) > self._quantity_precision - ) - if ( - is_new_position - and max_positions is not None - and active_positions >= int(max_positions) - ): - logger.warning( - "Skipping symbol {} due to max_positions constraint (active={} max={})", - symbol, - active_positions, - max_positions, - ) - continue - - side = TradeSide.BUY if delta > 0 else TradeSide.SELL - # requested leverage (default 1.0), clamped to constraints - requested_lev = ( - float(item.leverage) - if getattr(item, "leverage", None) is not None - else 1.0 - ) - allowed_lev_item = ( - float(constraints.max_leverage) - if constraints.max_leverage is not None - else requested_lev - ) - if self._request.exchange_config.market_type == MarketType.SPOT: - # Spot: long-only, no leverage - final_leverage = 1.0 - else: - final_leverage = max(1.0, min(requested_lev, allowed_lev_item)) - quantity = abs(delta) - - # Normalize quantity through all guardrails - logger.debug(f"Before normalize: {symbol} quantity={quantity}") - quantity, consumed_bp = self._normalize_quantity( - symbol, - quantity, - side, - local_current, - constraints, - equity, - allowed_lev, - projected_gross, - price_map, - ) - logger.debug( - f"After normalize: {symbol} quantity={quantity}, consumed_bp={consumed_bp}" - ) - - if quantity <= self._quantity_precision: - logger.warning( - f"SKIPPED: {symbol} quantity={quantity} <= precision={self._quantity_precision} after normalization" - ) - continue - - # Update projected positions for subsequent guardrails - signed_delta = quantity if side is TradeSide.BUY else -quantity - projected_positions[symbol] = local_current + signed_delta - projected_gross += consumed_bp - - # active positions accounting - if is_new_position: - active_positions += 1 - if abs(projected_positions[symbol]) <= self._quantity_precision: - active_positions = max(active_positions - 1, 0) - - # Use a stable per-item sub-index to keep instruction ids unique - instr = self._create_instruction( - context, - idx * 10 + sub_i, - item, - symbol, - side, - quantity, - final_leverage, - local_current, - sub_target, - ) - instructions.append(instr) - - # advance local_current for the next sub-step - local_current = projected_positions[symbol] - - return instructions - - def _create_instruction( - self, - context: ComposeContext, - idx: int, - item, - symbol: str, - side: TradeSide, - quantity: float, - final_leverage: float, - current_qty: float, - target_qty: float, - ) -> TradeInstruction: - """Create a normalized TradeInstruction with metadata.""" - final_target = current_qty + (quantity if side is TradeSide.BUY else -quantity) - meta = { - "requested_target_qty": target_qty, - "current_qty": current_qty, - "final_target_qty": final_target, - "action": item.action.value, - } - if item.confidence is not None: - meta["confidence"] = item.confidence - if item.rationale: - meta["rationale"] = item.rationale - - # For derivatives/perpetual markets, mark reduceOnly when instruction reduces absolute exposure to avoid accidental reverse opens - try: - if self._request.exchange_config.market_type != MarketType.SPOT: - if abs(final_target) < abs(current_qty): - meta["reduceOnly"] = True - # Bybit uses a different param key - if ( - self._request.exchange_config.exchange_id or "" - ).lower() == "bybit": - meta["reduce_only"] = True - except Exception: - # Ignore any exception; do not block instruction creation - pass - - instruction = TradeInstruction( - instruction_id=f"{context.compose_id}:{symbol}:{idx}", - compose_id=context.compose_id, - instrument=item.instrument, - action=item.action, - side=side, - quantity=quantity, - leverage=final_leverage, - max_slippage_bps=self._default_slippage_bps, - meta=meta, - ) - logger.debug( - "Created TradeInstruction {} for {} side={} qty={} lev={}", - instruction.instruction_id, - symbol, - instruction.side, - instruction.quantity, - final_leverage, - ) - return instruction - - def _resolve_target_quantity( - self, - item, - current_qty: float, - max_position_qty: Optional[float], - ) -> float: - # NOOP: keep current position - if item.action == LlmDecisionAction.NOOP: - return current_qty - - # Interpret target_qty as operation magnitude (not final position), normalized to positive - mag = abs(float(item.target_qty)) - target = current_qty - - # Compute target position per open/close long/short action - if item.action == LlmDecisionAction.OPEN_LONG: - base = current_qty if current_qty > 0 else 0.0 - target = base + mag - elif item.action == LlmDecisionAction.OPEN_SHORT: - base = current_qty if current_qty < 0 else 0.0 - target = base - mag - elif item.action == LlmDecisionAction.CLOSE_LONG: - if current_qty > 0: - target = max(current_qty - mag, 0.0) - else: - # No long position, keep unchanged - target = current_qty - elif item.action == LlmDecisionAction.CLOSE_SHORT: - if current_qty < 0: - target = min(current_qty + mag, 0.0) - else: - # No short position, keep unchanged - target = current_qty - else: - # Fallback: treat unknown action as NOOP - target = current_qty - - # Clamp by max_position_qty (symmetric) - if max_position_qty is not None: - max_abs = abs(float(max_position_qty)) - target = max(-max_abs, min(max_abs, target)) - - return target - - def _apply_quantity_filters( - self, - symbol: str, - quantity: float, - quantity_step: float, - min_trade_qty: float, - max_order_qty: Optional[float], - min_notional: Optional[float], - price_map: Dict[str, float], - ) -> float: - qty = quantity - logger.debug(f"Filtering {symbol}: initial qty={qty}") - - if max_order_qty is not None: - qty = min(qty, float(max_order_qty)) - logger.debug(f"After max_order_qty filter: qty={qty}") - - if quantity_step > 0: - qty = math.floor(qty / quantity_step) * quantity_step - logger.debug(f"After quantity_step filter: qty={qty}") - - if qty <= 0: - logger.warning(f"FILTERED: {symbol} qty={qty} <= 0") - return 0.0 - - if qty < min_trade_qty: - logger.warning( - f"FILTERED: {symbol} qty={qty} < min_trade_qty={min_trade_qty}" - ) - return 0.0 - - if min_notional is not None: - price = price_map.get(symbol) - if price is None: - logger.warning(f"FILTERED: {symbol} no price reference available") - return 0.0 - notional = qty * price - if notional < float(min_notional): - logger.warning( - f"FILTERED: {symbol} notional={notional:.4f} < min_notional={min_notional}" - ) - return 0.0 - logger.debug( - f"Passed min_notional check: notional={notional:.4f} >= {min_notional}" - ) - - logger.debug(f"Final qty for {symbol}: {qty}") - return qty diff --git a/python/valuecell/agents/common/trading/models.py b/python/valuecell/agents/common/trading/models.py index 2a58de02d..b0e66c3c3 100644 --- a/python/valuecell/agents/common/trading/models.py +++ b/python/valuecell/agents/common/trading/models.py @@ -32,13 +32,13 @@ class TradeType(str, Enum): class TradeSide(str, Enum): """Low-level execution side (exchange primitive). - This remains distinct from `LlmDecisionAction` which encodes *intent* at a + This remains distinct from `TradeDecisionAction` which encodes *intent* at a position semantic level (open_long/close_short/etc). TradeSide is kept for: - direct mapping to exchange APIs that require BUY/SELL - conveying slippage/fee direction in execution records Removal consideration: if the pipeline fully normalizes around - LlmDecisionAction -> (final target delta), we can derive side on the fly: + TradeDecisionAction -> (final target delta), we can derive side on the fly: OPEN_LONG, CLOSE_SHORT -> BUY OPEN_SHORT, CLOSE_LONG -> SELL For now we keep it explicit to avoid recomputation and ease auditing. @@ -291,13 +291,6 @@ def _infer_market_type(cls, data): return values -# ========================= -# Minimal DTOs for Strategy Agent (LLM-driven composer, no StrategyHint) -# These DTOs define the data contract across modules following the -# simplified pipeline: data -> features -> composer(LLM+rules) -> execution -> history/digest. -# ========================= - - class InstrumentRef(BaseModel): """Identifies a tradable instrument. @@ -482,8 +475,8 @@ class PortfolioView(BaseModel): ) -class LlmDecisionAction(str, Enum): - """Position-oriented high-level actions produced by the LLM plan. +class TradeDecisionAction(str, Enum): + """Position-oriented high-level actions produced by the plan. Semantics: - OPEN_LONG: open/increase long; if currently short, flatten then open long @@ -501,7 +494,7 @@ class LlmDecisionAction(str, Enum): def derive_side_from_action( - action: Optional[LlmDecisionAction], + action: Optional[TradeDecisionAction], ) -> Optional["TradeSide"]: """Derive execution side (BUY/SELL) from a high-level action. @@ -509,16 +502,16 @@ def derive_side_from_action( """ if action is None: return None - if action in (LlmDecisionAction.OPEN_LONG, LlmDecisionAction.CLOSE_SHORT): + if action in (TradeDecisionAction.OPEN_LONG, TradeDecisionAction.CLOSE_SHORT): return TradeSide.BUY - if action in (LlmDecisionAction.OPEN_SHORT, LlmDecisionAction.CLOSE_LONG): + if action in (TradeDecisionAction.OPEN_SHORT, TradeDecisionAction.CLOSE_LONG): return TradeSide.SELL # NOOP or future adjust/cancel actions: no executable side return None -class LlmDecisionItem(BaseModel): - """LLM plan item. Interprets target_qty as operation size (magnitude). +class TradeDecisionItem(BaseModel): + """Trade plan item. Interprets target_qty as operation size (magnitude). Unlike the previous "final target position" semantics, target_qty here is the size to operate (same unit as position quantity). The composer @@ -526,7 +519,7 @@ class LlmDecisionItem(BaseModel): """ instrument: InstrumentRef - action: LlmDecisionAction + action: TradeDecisionAction target_qty: float = Field( ..., description="Operation size for this action (units), e.g., open/close long/short", @@ -543,11 +536,11 @@ class LlmDecisionItem(BaseModel): ) -class LlmPlanProposal(BaseModel): - """Structured LLM output before rule normalization.""" +class TradePlanProposal(BaseModel): + """Structured output before rule normalization.""" ts: int - items: List[LlmDecisionItem] = Field(default_factory=list) + items: List[TradeDecisionItem] = Field(default_factory=list) rationale: Optional[str] = Field( default=None, description="Optional natural language rationale" ) @@ -573,7 +566,7 @@ class TradeInstruction(BaseModel): ..., description="Decision cycle id to correlate instructions and history" ) instrument: InstrumentRef - action: Optional[LlmDecisionAction] = Field( + action: Optional[TradeDecisionAction] = Field( default=None, description="High-level intent action for dispatch ('open_long'|'open_short'|'close_long'|'close_short'|'noop')", ) @@ -607,12 +600,15 @@ def _validate_action_side_alignment(self): if act is None: return self try: - if act == LlmDecisionAction.NOOP: + if act == TradeDecisionAction.NOOP: # Composer should not emit NOOP instructions; tolerate in lenient mode return self - if act in (LlmDecisionAction.OPEN_LONG, LlmDecisionAction.CLOSE_SHORT): + if act in (TradeDecisionAction.OPEN_LONG, TradeDecisionAction.CLOSE_SHORT): expected = TradeSide.BUY - elif act in (LlmDecisionAction.OPEN_SHORT, LlmDecisionAction.CLOSE_LONG): + elif act in ( + TradeDecisionAction.OPEN_SHORT, + TradeDecisionAction.CLOSE_LONG, + ): expected = TradeSide.SELL else: return self @@ -684,7 +680,7 @@ class PortfolioValueSeries(BaseModel): class ComposeContext(BaseModel): - """Context assembled for the LLM-driven composer.""" + """Context assembled for the composer.""" ts: int compose_id: str = Field( diff --git a/python/valuecell/agents/common/trading/utils.py b/python/valuecell/agents/common/trading/utils.py index 07aa02293..82e56b4f2 100644 --- a/python/valuecell/agents/common/trading/utils.py +++ b/python/valuecell/agents/common/trading/utils.py @@ -242,3 +242,78 @@ async def send_discord_message( if raise_for_status: resp.raise_for_status() return resp.text + + +def prune_none(obj): + """Recursively remove None, empty dict, and empty list values.""" + if isinstance(obj, dict): + pruned = {k: prune_none(v) for k, v in obj.items() if v is not None} + return {k: v for k, v in pruned.items() if v not in (None, {}, [])} + if isinstance(obj, list): + pruned = [prune_none(v) for v in obj] + return [v for v in pruned if v not in (None, {}, [])] + return obj + + +def extract_market_section(market_data: List[Dict]) -> Dict: + """Extract decision-critical metrics from market feature entries.""" + + compact: Dict[str, Dict] = {} + for item in market_data: + symbol = (item.get("instrument") or {}).get("symbol") + if not symbol: + continue + + values = item.get("values") or {} + entry: Dict[str, float] = {} + + for feature_key, alias in ( + ("price.last", "last"), + ("price.close", "close"), + ("price.open", "open"), + ("price.high", "high"), + ("price.low", "low"), + ("price.bid", "bid"), + ("price.ask", "ask"), + ("price.change_pct", "change_pct"), + ("price.volume", "volume"), + ): + if feature_key in values and values[feature_key] is not None: + entry[alias] = values[feature_key] + + if values.get("open_interest") is not None: + entry["open_interest"] = values["open_interest"] + + if values.get("funding.rate") is not None: + entry["funding_rate"] = values["funding.rate"] + if values.get("funding.mark_price") is not None: + entry["mark_price"] = values["funding.mark_price"] + + normalized = {k: v for k, v in entry.items() if v is not None} + if normalized: + compact[symbol] = normalized + + return compact + + +def group_features(features: List[FeatureVector]) -> Dict: + """Organize features by grouping metadata and trim payload noise. + + Prefers the FeatureVector.meta group_by_key when present, otherwise + falls back to the interval tag. This allows callers to introduce + ad-hoc groupings (e.g., market snapshots) without overloading the + interval field. + """ + grouped: Dict[str, List] = {} + + for fv in features: + data = fv.model_dump(mode="json") + meta = data.get("meta") or {} + group_key = meta.get(FEATURE_GROUP_BY_KEY) + + if not group_key: + continue + + grouped.setdefault(group_key, []).append(data) + + return grouped diff --git a/python/valuecell/agents/grid_agent/__init__.py b/python/valuecell/agents/grid_agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/valuecell/agents/grid_agent/grid_agent.py b/python/valuecell/agents/grid_agent/grid_agent.py new file mode 100644 index 000000000..929fda8d8 --- /dev/null +++ b/python/valuecell/agents/grid_agent/grid_agent.py @@ -0,0 +1,45 @@ +"""Grid strategy agent following the same abstraction as the prompt agent. + +This agent reuses: +- Default features pipeline `DefaultFeaturesPipeline` +- Rule-based decision composer `GridComposer` + +Usage: + from valuecell.agents.grid_agent.grid_agent import GridStrategyAgent + agent = GridStrategyAgent() + await agent.stream(request) +""" + +from __future__ import annotations + +from valuecell.agents.common.trading.base_agent import BaseStrategyAgent +from valuecell.agents.common.trading.decision.grid_composer.grid_composer import ( + GridComposer, +) +from valuecell.agents.common.trading.decision.interfaces import BaseComposer +from valuecell.agents.common.trading.features.interfaces import BaseFeaturesPipeline +from valuecell.agents.common.trading.features.pipeline import DefaultFeaturesPipeline +from valuecell.agents.common.trading.models import UserRequest + + +class GridStrategyAgent(BaseStrategyAgent): + """Grid trading agent: default features + rule-based grid composer. + + - Spot: long-only grid add/reduce. + - Perpetual/derivatives: bi-directional grid; add short on up moves, + add long on down moves; reduce on reversals. + """ + + def _build_features_pipeline( + self, request: UserRequest + ) -> BaseFeaturesPipeline | None: + return DefaultFeaturesPipeline.from_request(request) + + def _create_decision_composer(self, request: UserRequest) -> BaseComposer | None: + # Adjust step_pct / max_steps / base_fraction as needed + return GridComposer( + request=request, + step_pct=0.005, # ~0.5% per step + max_steps=3, # up to 3 steps per cycle + base_fraction=0.08, # base order size = equity * 8% + )