Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions python/valuecell/agents/strategy_agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Strategy Agent (Design Overview)

This document describes the design for the Strategy Agent: a lightweight, LLM-driven trading decision pipeline with a short, testable chain from market data to executable instructions, plus history and digest for feedback.

- Assumptions (current stage):
- Real-time data (no explicit handling of late/out-of-order data yet)
- No complex live-trading order/fill/cancel processing (kept out of scope for now)
- Decisions are generated by an LLM inside the composer; guardrails normalize the output into executable instructions.

## Goals

- Keep the dependency flow one-way and simple: data → features → composer(LLM + guardrails) → execution → history/digest
- Clearly defined DTOs and interfaces so each module can be developed and tested in isolation
- Minimal surface area for configuration: the strategy prompt is a plain string (prompt_text)
- Idempotent and auditable: each composition run has a compose_id; any optional
auditing metadata (prompt hash, model name, token usage, latency, filters)
is recorded as a HistoryRecord payload (no separate report object).

## Module Layout

- `data/`
- `market_data.py` — Market data source (candles) abstraction(s)
- `features/`
- `technical_indicators.py`, `multimodal_analysis.py`, etc. — Feature computation from raw data
- `decision/`
- `composer.py` — LLM decision + normalization + guardrails (core)
- `system_prompt.py` (optional) — prompt templates, or store in config/constants
- `execution/`
- `exchanges.py`, `paper_trading.py` — Gateways to real or paper execution (only instructions input for now)
- `trading_history/`
- `recorder.py` — Persist key checkpoints
- `digest.py` — Build `TradeDigest` for historical guidance
- Root files
- `models.py` — DTOs only (interfaces live in module-level files)
- `core.py` — DecisionCoordinator (wires the full decision cycle)
- `constants.py` — Basic configuration/limits; can hold prompt_text initially

## Data Flow (one decision cycle)

1. DecisionCoordinator pulls `PortfolioView` (positions, cash, optional constraints)
1. DecisionCoordinator gets recent `Candle` from `MarketDataSource`
1. `FeatureComputer` produces `FeatureVector[]`
1. DecisionCoordinator assembles `ComposeContext`: features, portfolio, digest, prompt_text (string), optional market_snapshot and extra constraints

1. `Composer.compose(context)`: calls LLM with `ComposeContext` → `LlmPlanProposal`; normalizes plan (target position logic, limits, step size, min notional, cool-down, etc.); returns `TradeInstruction[]`

1. `ExecutionGateway.execute(instructions)` (no detailed order/fill handling at this stage)
1. `HistoryRecorder.record(...)` checkpoints (including optional auditing metadata);

DigestBuilder updates `TradeDigest`

ASCII overview:

```text
Data → Features → Composer(LLM+Guardrails) → Execution → History → Digest
↑ ↓ ↑
PortfolioView ----------------------------- |
prompt_text ----------------------------------------→
```

## DTOs (Pydantic models)

Defined in `models.py`:

- Identification and raw data
- `InstrumentRef { symbol, exchange_id?, quote_ccy? }`
- `Candle { ts, instrument, open, high, low, close, volume, interval }`

- User request / configuration
- `UserRequest { model_config: ModelConfig, exchange_config: ExchangeConfig, trading_config: TradingConfig }`
- `ModelConfig { provider, model_id, api_key }`
- `ExchangeConfig { exchange_id?, trading_mode, api_key?, secret_key? }`
- `TradingConfig { strategy_name?, initial_capital?, max_leverage?, max_positions?, symbols, decide_interval?, template_id?, custom_prompt? }`

- Features and portfolio
- `FeatureVector { ts, instrument, values: Dict[str, float], meta? }`
- `PositionSnapshot { instrument, quantity, avg_price?, mark_price?, unrealized_pnl?, notional?, leverage?, entry_ts?, pnl_pct?, trade_type? }`
- `PortfolioView { strategy_id?, ts, cash, positions: Dict[symbol, PositionSnapshot], gross_exposure?, net_exposure?, constraints?, total_value?, total_unrealized_pnl?, available_cash? }`

- LLM decision and normalization
- `LlmDecisionItem { instrument, action: (buy|sell|flat|noop), target_qty, confidence?, rationale? }`
- `LlmPlanProposal { ts, items: List[LlmDecisionItem], notes?, model_meta? }`
- `TradeInstruction { instruction_id, compose_id, instrument, side: (buy|sell), quantity, price_mode, limit_price?, max_slippage_bps?, meta? }`
- `ComposeContext { ts, compose_id, strategy_id?, features, portfolio, digest, prompt_text, market_snapshot?, constraints? }`

- History and digest
- `HistoryRecord { ts, kind, reference_id, payload }`
- `TradeDigestEntry { instrument, trade_count, realized_pnl, win_rate?, avg_holding_ms?, last_trade_ts?, avg_entry_price?, max_drawdown?, recent_performance_score? }`
- `TradeDigest { ts, by_instrument: Dict[symbol, TradeDigestEntry] }`

- UI/summary and series (optional; for leaderboard and charts)
- `TradingMode = (live|virtual)`
- `StrategyStatus = (running|paused|stopped|error)`
- `StrategySummary { strategy_id?, name?, model_provider?, model_id?, exchange_id?, mode?, status?, pnl_abs?, pnl_pct?, last_updated_ts? }`
- `StrategySummary { strategy_id?, name?, model_provider?, model_id?, exchange_id?, mode?, status?, realized_pnl?, unrealized_pnl?, pnl_pct?, last_updated_ts? }`
- `MetricPoint { ts, value }`
- `PortfolioValueSeries { strategy_id?, points: List[MetricPoint] }`

`TradeHistoryEntry { trade_id?, compose_id?, instruction_id?, strategy_id?, trade_ts?, entry_ts?, exit_ts?, instrument, side, type, quantity, entry_price?, exit_price?, realized_pnl?, realized_pnl_pct?, holding_ms?, leverage?, note? }`

Notes:

- Only `target_qty` is used (no `delta_qty`). Composer computes `order_qty = target_qty − current_qty` and turns it into a `TradeInstruction` (side + quantity).
- Initial versions can set `price_mode = "market"` for simplicity.
Action semantics:

- `flat`: target position is zero (may emit close-out instructions)
- `noop`: target equals current (delta == 0), emit no instruction

Additional notes:

- `mark_price` in `PositionSnapshot` allows consistent P&L visualization without coupling to feed-specific last trade logic.
- The UI-oriented DTOs (`StrategySummary`, `PortfolioValueSeries`, etc.) are additive and do not affect the core compose/execute pipeline.

## ID and correlation model

- `strategy_id`: identity of a running strategy; used by UI aggregation (`StrategySummary`, `PortfolioValueSeries`).
- `compose_id`: unique id generated per decision cycle by the coordinator. It is carried in `ComposeContext` and copied into each `TradeInstruction` for correlation. `HistoryRecord.reference_id` uses this id.
- `instruction_id`: deterministic id for idempotency, recommended format: `${compose_id}:${instrument.symbol}` (or include an ordinal if multiple instructions per instrument).
- `trade_id`: execution-layer id for a closed trade. `TradeHistoryEntry` can store `compose_id` and `instruction_id` optionally to link back to the decision that initiated it.

## Abstract Interfaces (contracts)

Interfaces live in their respective modules as ABCs (not Pydantic models):

- `data/interfaces.py`
- `MarketDataSource.get_recent_candles(symbols, interval, lookback) -> List[Candle]`
- `features/interfaces.py`
- `FeatureComputer.compute_features(candles?: List[Candle]) -> List[FeatureVector]`
- `core.py`
- `DecisionCoordinator.run_once() -> None`
- `portfolio/interfaces.py`
- `PortfolioService.get_view() -> PortfolioView`
- `PortfolioSnapshotStore.load_latest() -> Optional[PortfolioView]`
- `PortfolioSnapshotStore.save(view: PortfolioView) -> None`
- `decision/interfaces.py`
- `Composer.compose(context: ComposeContext) -> List[TradeInstruction]`
- `execution/interfaces.py`
- `ExecutionGateway.execute(instructions: List[TradeInstruction]) -> None`
- `trading_history/interfaces.py`
- `HistoryRecorder.record(record: HistoryRecord) -> None`
- `DigestBuilder.build(records: List[HistoryRecord]) -> TradeDigest`

## Guardrails (composer)

- Position targeting: compute `order_qty` from `target_qty` vs current position
- Rounding: step size, minimum order quantity/nominal
- Limits: per-instrument cap, net exposure cap, optional shorting allowance
- Cool-down/recent performance: use `TradeDigest` to suppress or downweight
- Confidence threshold and invalid field filtering
- Audit: record optional metadata (prompt hash, model name, token usage, latency, rejection reasons) as a `HistoryRecord` payload at the "compose" checkpoint
- Fallback: if LLM output is invalid/empty, optionally use a simple deterministic rule from features or return no-op

## History and Digest (clarified)

We record a few compact checkpoints using `HistoryRecord { ts, kind, reference_id, payload }`:

- kind = "features":
- reference_id: compose_id
- payload: a small summary (e.g., per-symbol feature keys and last values, or a hash)
- kind = "compose":
- reference_id: compose_id
- payload: optional auditing metadata (e.g., prompt_hash, model_name, token_usage, latency_ms, reasons filtered)
- kind = "instructions":
- reference_id: compose_id
- payload: the normalized `TradeInstruction[]` as a compact list or summary (symbol, side, qty)
- kind = "execution" (optional at this stage):
- reference_id: compose_id
- payload: ack/status if available from the gateway

DigestBuilder consumes these records (recent N bars or N decisions) to build `TradeDigest`:

- Per-instrument aggregates in `TradeDigestEntry`:
- trade_count, realized_pnl, win_rate, avg_holding_ms, last_trade_ts,
avg_entry_price, max_drawdown, recent_performance_score
- Update cadence: periodically (e.g., every M decisions or T minutes) or incrementally per instruction/execution
- Usage in composer: cool-down (skip recent losers), down-weight bad performers,
enforce simple risk heuristics (e.g., cap net additions if recent_performance_score < threshold)

This keeps recording simple and purpose-driven for composer feedback without inventing a separate report object.

## Runtime Modes

- Paper trading: default mode (via `execution/paper_trading.py`)
- Live and backtest: future extensions; the same interfaces remain stable

## Extensibility

- Add new features by extending `FeatureComputer`
- Plug different LLM providers/parsers within `Composer`
- Add more execution backends by implementing `ExecutionGateway`
- Evolve digests: additional stats inside `TradeDigestEntry` without breaking composer

## Out of Scope (current stage)

- Order lifecycle (partial fills, cancels, rejections)
- Late/out-of-order data handling
- Complex portfolio accounting beyond `PortfolioView`

## Minimal DecisionCoordinator Contract

A typical `run_once()` should:

1. `view = portfolio.get_view()`
2. Pull candles via `data` and compute `features = features.compute_features(candles=...)`
3. `context = ComposeContext(ts=..., features=features, portfolio=view, digest=..., prompt_text=..., market_snapshot=..., constraints=...)`
4. `instructions = composer.compose(context)`
5. `executor.execute(instructions)`
6. Record `HistoryRecord` for features, compose auditing metadata, and instructions
7. Update `TradeDigest` periodically or incrementally
Empty file.
9 changes: 9 additions & 0 deletions python/valuecell/agents/strategy_agent/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import asyncio

from valuecell.core.agent import create_wrapped_agent

from .agent import StrategyAgent

if __name__ == "__main__":
agent = create_wrapped_agent(StrategyAgent)
asyncio.run(agent.serve())
31 changes: 31 additions & 0 deletions python/valuecell/agents/strategy_agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

from typing import AsyncGenerator, Dict, Optional

from valuecell.core.agent.responses import streaming
from valuecell.core.types import BaseAgent, StreamResponse


class StrategyAgent(BaseAgent):
"""Minimal StrategyAgent entry for system integration.

This is a placeholder agent that streams a short greeting and completes.
It can be extended to wire the Strategy Agent decision loop
(data -> features -> composer -> execution -> history/digest).
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

async def stream(
self,
query: str,
conversation_id: str,
task_id: str,
dependencies: Optional[Dict] = None,
) -> AsyncGenerator[StreamResponse, None]:
# Minimal streaming lifecycle: one message and done
yield streaming.message_chunk(
"StrategyAgent is online. Decision pipeline will be wired here."
)
yield streaming.done()
11 changes: 11 additions & 0 deletions python/valuecell/agents/strategy_agent/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Default constants used across the strategy_agent package.

Centralizes defaults so they can be imported from one place.
"""

DEFAULT_INITIAL_CAPITAL = 100000.0
DEFAULT_AGENT_MODEL = "deepseek-ai/DeepSeek-V3.1-Terminus"
DEFAULT_MODEL_PROVIDER = "siliconflow"
DEFAULT_MAX_POSITIONS = 5
DEFAULT_MAX_SYMBOLS = 5
DEFAULT_MAX_LEVERAGE = 10.0
25 changes: 25 additions & 0 deletions python/valuecell/agents/strategy_agent/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from abc import ABC, abstractmethod

# Core interfaces for orchestration and portfolio service.
# Plain ABCs to avoid runtime dependencies on pydantic. Concrete implementations
# wire the pipeline: data -> features -> composer -> execution -> history/digest.


class DecisionCoordinator(ABC):
"""Coordinates a single decision cycle end-to-end.

A typical run performs:
1) fetch portfolio view
2) pull data and compute features
3) build compose context (prompt_text, digest, constraints)
4) compose (LLM + guardrails) -> trade instructions
5) execute instructions
6) record checkpoints and update digest
"""

@abstractmethod
def run_once(self) -> None:
"""Execute one decision cycle."""
raise NotImplementedError
Empty file.
32 changes: 32 additions & 0 deletions python/valuecell/agents/strategy_agent/data/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List

from ..models import Candle

# Contracts for market data sources (module-local abstract interfaces).
# These are plain ABCs (not Pydantic models) so implementations can be
# synchronous or asynchronous without runtime overhead.


class MarketDataSource(ABC):
"""Abstract market data access used by feature computation.

Implementations should fetch recent ticks or candles for the requested
symbols and intervals. Caching and batching policies are left to the
concrete classes.
"""

@abstractmethod
def get_recent_candles(
self, symbols: List[str], interval: str, lookback: int
) -> List[Candle]:
"""Return recent candles (OHLCV) for the given symbols/interval.

Args:
symbols: list of symbols (e.g., ["BTCUSDT", "ETHUSDT"])
interval: candle interval string (e.g., "1m", "5m")
lookback: number of bars to retrieve
"""
raise NotImplementedError
Empty file.
Empty file.
Empty file.
Empty file.
26 changes: 26 additions & 0 deletions python/valuecell/agents/strategy_agent/decision/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List

from ..models import ComposeContext, TradeInstruction

# Contracts for decision making (module-local abstract interfaces).
# Composer hosts the LLM call and guardrails, producing executable instructions.


class Composer(ABC):
"""LLM-driven decision composer with guardrails.

Input: ComposeContext
Output: TradeInstruction list
"""

@abstractmethod
def compose(self, context: ComposeContext) -> List[TradeInstruction]:
"""Produce normalized trade instructions given the current context.
Call the LLM, parse/validate output, apply guardrails (limits, step size,
min notional, cool-down), and return executable instructions.
Any optional auditing metadata should be recorded via HistoryRecorder.
"""
raise NotImplementedError
Empty file.
Empty file.
Empty file.
Empty file.
22 changes: 22 additions & 0 deletions python/valuecell/agents/strategy_agent/execution/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List

from ..models import TradeInstruction

# Contracts for execution gateways (module-local abstract interfaces).
# An implementation may route to a real exchange or a paper broker.


class ExecutionGateway(ABC):
"""Executes normalized trade instructions against an exchange/broker."""

@abstractmethod
def execute(self, instructions: List[TradeInstruction]) -> None:
"""Submit the provided instructions for execution.
Implementors may be synchronous or asynchronous. At this stage we
do not model order/fill/cancel lifecycles.
"""

raise NotImplementedError
Empty file.
Empty file.
Loading