Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions python/valuecell/agents/strategy_agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,19 @@ async def run_once(self) -> DecisionCycleResult:
if self._request.exchange_config.trading_mode == TradingMode.VIRTUAL:
if self._request.exchange_config.market_type == MarketType.SPOT:
portfolio.buying_power = max(0.0, float(portfolio.cash))
# Use fixed 1-minute interval and lookback of 4 hours (60 * 4 minutes)

# Use fixed 1-second interval and lookback of 3 minutes (60 * 3 seconds)
candles = await self._market_data_source.get_recent_candles(
self._symbols, "1m", 60 * 4
self._symbols, "1s", 60 * 3
)
features = self._feature_computer.compute_features(candles=candles)
market_snapshot = _build_market_snapshot(features)
# Use fixed 1-minute interval and lookback of 4 hours (60 * 4 minutes)
candles = await self._market_data_source.get_recent_candles(
self._symbols, "1m", 60 * 4
)
features.extend(self._feature_computer.compute_features(candles=candles))

digest = self._digest_builder.build(list(self._history_records))

context = ComposeContext(
Expand Down
241 changes: 206 additions & 35 deletions python/valuecell/agents/strategy_agent/decision/composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import math
from typing import Dict, List, Optional

from agno.agent import Agent as AgnoAgent
from loguru import logger
from pydantic import ValidationError

from valuecell.utils import env as env_utils
from valuecell.utils import model as model_utils

from ..models import (
ComposeContext,
Constraints,
Expand All @@ -19,6 +23,7 @@
UserRequest,
)
from .interfaces import Composer
from .system_prompt import SYSTEM_PROMPT


class LlmComposer(Composer):
Expand Down Expand Up @@ -50,16 +55,13 @@ def __init__(

async def compose(self, context: ComposeContext) -> List[TradeInstruction]:
prompt = self._build_llm_prompt(context)
logger.debug(
"Built LLM prompt for compose_id={}: {}",
context.compose_id,
prompt,
)
try:
plan = await self._call_llm(prompt)
if not plan.items:
logger.error(
"LLM returned empty plan for compose_id={}", context.compose_id
logger.info(
"LLM returned empty plan for compose_id={} with rationale={}",
context.compose_id,
plan.rationale,
)
return []
except ValidationError as exc:
Expand All @@ -75,32 +77,198 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]:
# Prompt + LLM helpers

def _build_llm_prompt(self, context: ComposeContext) -> str:
"""Serialize compose context into a textual prompt for the LLM."""

payload = {
"strategy_prompt": context.prompt_text,
"compose_id": context.compose_id,
"timestamp": context.ts,
"portfolio": context.portfolio.model_dump(mode="json"),
"market_snapshot": context.market_snapshot or {},
"digest": context.digest.model_dump(mode="json"),
"features": [vector.model_dump(mode="json") for vector in context.features],
# Constraints live on the portfolio view; prefer typed model_dump when present
"constraints": (
context.portfolio.constraints.model_dump(mode="json", exclude_none=True)
if context.portfolio and context.portfolio.constraints
else {}
),
}
"""Serialize a concise, structured prompt for the LLM (low-noise).

Design goals (inspired by the prompt doc):
- Keep only the most actionable state: prices, compact tech signals, positions, constraints
- Avoid verbose/raw dumps; drop nulls and unused fields
- Encourage risk-aware decisions and allow NOOP when no edge
- Preserve our output contract (LlmPlanProposal)
"""

# Helper: recursively drop keys with None values and empty dict/list
def _prune_none(obj):
if isinstance(obj, dict):
pruned = {k: _prune_none(v) for k, v in obj.items() if v is not None}
return {k: v for k, v in pruned.items() if v not in (None, {}, [])}
if isinstance(obj, list):
pruned = [_prune_none(v) for v in obj]
return [v for v in pruned if v not in (None, {}, [])]
return obj

# Compact portfolio snapshot
pv = context.portfolio
positions = []
for sym, snap in pv.positions.items():
positions.append(
_prune_none(
{
"symbol": sym,
"qty": float(snap.quantity),
"avg_px": snap.avg_price,
"mark_px": snap.mark_price,
"unrealized_pnl": snap.unrealized_pnl,
"lev": snap.leverage,
"entry_ts": snap.entry_ts,
"type": getattr(snap, "trade_type", None),
}
)
)

# Constraints (only non-empty)
constraints = (
pv.constraints.model_dump(mode="json", exclude_none=True)
if pv and pv.constraints
else {}
)

# --- Summary & Risk Flags ---
# Aggregate win_rate across instruments (weighted by trade_count)
total_trades = 0
weighted_win = 0.0
for entry in (context.digest.by_instrument or {}).values():
tc = int(getattr(entry, "trade_count", 0) or 0)
wr = getattr(entry, "win_rate", None)
if tc and wr is not None:
total_trades += tc
weighted_win += float(wr) * tc
agg_win_rate = (weighted_win / total_trades) if total_trades > 0 else None

# Active positions
active_positions = sum(
1
for snap in pv.positions.values()
if abs(float(getattr(snap, "quantity", 0.0) or 0.0)) > 0.0
)

# Unrealized pnl pct relative to total_value (if available)
unrealized = getattr(pv, "total_unrealized_pnl", None)
total_value = getattr(pv, "total_value", None)
unrealized_pct = (
(float(unrealized) / float(total_value) * 100.0)
if (unrealized is not None and total_value)
else None
)

# Buying power and leverage risk assessment
risk_flags: List[str] = []
try:
equity, allowed_lev, constraints_typed, projected_gross, price_map2 = (
self._init_buying_power_context(context)
)
max_positions_cfg = constraints.get("max_positions")
if max_positions_cfg:
try:
if active_positions / float(max_positions_cfg) >= 0.8:
risk_flags.append("approaching_max_positions")
except Exception:
pass

avail_bp = max(
0.0, float(equity) * float(allowed_lev) - float(projected_gross)
)
denom = (
float(equity) * float(allowed_lev) if equity and allowed_lev else None
)
if denom and denom > 0:
bp_ratio = avail_bp / denom
if bp_ratio <= 0.1:
risk_flags.append("low_buying_power")

# High leverage usage check per-position against max_leverage
max_lev_cfg = constraints.get("max_leverage")
if max_lev_cfg:
try:
max_used_ratio = 0.0
for snap in pv.positions.values():
lev = getattr(snap, "leverage", None)
if lev is not None and float(max_lev_cfg) > 0:
max_used_ratio = max(
max_used_ratio, float(lev) / float(max_lev_cfg)
)
if max_used_ratio >= 0.8:
risk_flags.append("high_leverage_usage")
except Exception:
pass
except Exception:
# If any issue computing context, skip risk flags additions silently
pass

summary = _prune_none(
{
"active_positions": active_positions,
"max_positions": constraints.get("max_positions"),
"total_value": total_value,
"cash": pv.cash,
"unrealized_pnl": unrealized,
"unrealized_pnl_pct": unrealized_pct,
"win_rate": agg_win_rate,
"trade_count": total_trades,
# Include available buying power if computed
# This helps the model adjust aggressiveness
}
)

# Digest (minimal useful stats)
digest_compact: Dict[str, dict] = {}
for sym, entry in (context.digest.by_instrument or {}).items():
digest_compact[sym] = _prune_none(
{
"trade_count": entry.trade_count,
"realized_pnl": entry.realized_pnl,
"win_rate": entry.win_rate,
"avg_holding_ms": entry.avg_holding_ms,
"last_trade_ts": entry.last_trade_ts,
}
)

# Environment summary
env = _prune_none(
{
"exchange_id": self._request.exchange_config.exchange_id,
"trading_mode": str(self._request.exchange_config.trading_mode),
"max_leverage": constraints.get("max_leverage"),
"max_positions": constraints.get("max_positions"),
}
)

# Preserve original feature structure (do not prune fields inside FeatureVector)
features_payload = [fv.model_dump(mode="json") for fv in context.features]

payload = _prune_none(
{
"strategy_prompt": context.prompt_text,
"summary": summary,
"risk_flags": risk_flags or None,
"env": env,
"compose_id": context.compose_id,
"ts": context.ts,
"market": context.market_snapshot,
"features": features_payload,
"portfolio": _prune_none(
{
"strategy_id": context.strategy_id,
"cash": pv.cash,
"total_value": getattr(pv, "total_value", None),
"total_unrealized_pnl": getattr(
pv, "total_unrealized_pnl", None
),
"positions": positions,
}
),
"constraints": constraints,
"digest": digest_compact,
}
)

instructions = (
"You are a trading strategy planner. Analyze the JSON context and "
"produce a structured plan that aligns with the LlmPlanProposal "
"schema (items array with instrument, action, target_qty, rationale, "
"confidence). Focus on risk-aware, executable decisions."
"Per-cycle guidance: Read the Context JSON and form a concise plan. "
"If any arrays appear, they are ordered OLDEST → NEWEST (last = most recent). "
"Respect constraints, buying power, and risk_flags; prefer NOOP when edge is unclear. "
"Manage existing positions first; propose new exposure only with clear, trend-aligned confluence and within limits. Keep rationale brief."
)

return f"{instructions}\n\nContext:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
return f"{instructions}\n\nContext:\n{json.dumps(payload, ensure_ascii=False)}"

async def _call_llm(self, prompt: str) -> LlmPlanProposal:
"""Invoke an LLM asynchronously and parse the response into LlmPlanProposal.
Expand All @@ -112,19 +280,22 @@ async def _call_llm(self, prompt: str) -> LlmPlanProposal:
`LlmPlanProposal`.
"""

from agno.agent import Agent as AgnoAgent

from valuecell.utils.model import create_model_with_provider

cfg = self._request.llm_model_config
model = create_model_with_provider(
model = model_utils.create_model_with_provider(
provider=cfg.provider,
model_id=cfg.model_id,
api_key=cfg.api_key,
)

# Wrap model in an Agent (consistent with parser_agent usage)
agent = AgnoAgent(model=model, output_schema=LlmPlanProposal, markdown=False)
agent = AgnoAgent(
model=model,
output_schema=LlmPlanProposal,
markdown=False,
instructions=[SYSTEM_PROMPT],
use_json_mode=model_utils.model_should_use_json_mode(model),
debug_mode=env_utils.agent_debug_mode_enabled(),
)
response = await agent.arun(prompt)
content = getattr(response, "content", None) or response
logger.debug("Received LLM response {}", content)
Expand Down
32 changes: 32 additions & 0 deletions python/valuecell/agents/strategy_agent/decision/system_prompt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""System prompt for the Strategy Agent LLM planner.

This prompt captures ONLY the agent's role, IO contract (schema), and
responsibilities around constraints and validation. Trading style and
heuristics live in strategy templates (e.g., templates/default.txt).

It is passed to the LLM wrapper as a system/instruction message, while the
per-cycle JSON Context is provided as the user message by the composer.
"""

SYSTEM_PROMPT: str = """
ROLE & IDENTITY
You are an autonomous trading planner that outputs a structured plan for a crypto strategy executor. Your objective is to maximize risk-adjusted returns while preserving capital. You are stateless across cycles.

ACTION SEMANTICS
- target_qty is the desired FINAL signed position quantity: >0 long, <0 short, 0 flat (close). The executor computes delta = target_qty − current_qty to create orders.
- To close, set target_qty to 0. Do not invent other action names.
- One item per symbol at most. No hedging (never propose both long and short exposure on the same symbol).

CONSTRAINTS & VALIDATION
- Respect max_positions, max_leverage, max_position_qty, quantity_step, min_trade_qty, max_order_qty, min_notional, and available buying power.
- Keep leverage positive if provided. Confidence must be in [0,1].
- If arrays appear in Context, they are ordered: OLDEST → NEWEST (last isthe most recent).
- If risk_flags contain low_buying_power or high_leverage_usage, prefer reducing size or choosing noop. If approaching_max_positions is set, prioritize managing existing positions over opening new ones.
- When estimating quantity, account for estimated fees (e.g., 1%) and potential market movement; reserve a small buffer so executed size does not exceed intended risk after fees/slippage.

DECISION FRAMEWORK
1) Manage current positions first (reduce risk, close invalidated trades).
2) Only propose new exposure when constraints and buying power allow.
3) Prefer fewer, higher-quality actions when signals are mixed.
4) When in doubt or edge is weak, choose noop.
"""
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List, Optional
from typing import Any, Dict, List, Optional

from ..models import Candle, FeatureVector

Expand All @@ -20,11 +20,15 @@ class FeatureComputer(ABC):
def compute_features(
self,
candles: Optional[List[Candle]] = None,
meta: Optional[Dict[str, Any]] = None,
) -> List[FeatureVector]:
"""Build feature vectors from the given inputs.

Args:
candles: optional window of candles
meta: optional metadata about the input window (e.g., interval,
window_start_ts, window_end_ts, num_points). Implementations may
use this to populate FeatureVector.meta.
Returns:
A list of FeatureVector items, one or more per instrument.
"""
Expand Down
Loading