Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions python/valuecell/agents/strategy_agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Dict, List
from typing import Callable, List

from loguru import logger

Expand Down Expand Up @@ -78,17 +78,6 @@ def _default_clock() -> datetime:
return datetime.now(timezone.utc)


def _build_market_snapshot(features: List[FeatureVector]) -> Dict[str, float]:
"""Derive latest market snapshot from feature vectors."""

snapshot: Dict[str, float] = {}
for vector in features:
price = vector.values.get("close")
if price is not None:
snapshot[vector.instrument.symbol] = float(price)
return snapshot


class DefaultDecisionCoordinator(DecisionCoordinator):
"""Default implementation that wires the full decision pipeline."""

Expand Down Expand Up @@ -186,17 +175,27 @@ async def run_once(self) -> DecisionCycleResult:
portfolio.buying_power = max(0.0, float(portfolio.cash))

# Use fixed 1-second interval and lookback of 3 minutes (60 * 3 seconds)
candles = await self._market_data_source.get_recent_candles(
candles_1s = await self._market_data_source.get_recent_candles(
self._symbols, "1s", 60 * 3
)
features = self._feature_computer.compute_features(candles=candles)
market_snapshot = _build_market_snapshot(features)
# Compute micro (1s) features with meta preserved
micro_features = self._feature_computer.compute_features(candles=candles_1s)

# Use fixed 1-minute interval and lookback of 4 hours (60 * 4 minutes)
candles = await self._market_data_source.get_recent_candles(
candles_1m = await self._market_data_source.get_recent_candles(
self._symbols, "1m", 60 * 4
)
features.extend(self._feature_computer.compute_features(candles=candles))
minute_features = self._feature_computer.compute_features(candles=candles_1m)

# Compose full features list: minute-level features (structural) then micro-level (freshness).
features = []
features.extend(minute_features)
features.extend(micro_features)

# Ask the data source for an authoritative market snapshot (exchange-ticker based)
market_snapshot = await self._market_data_source.get_market_snapshot(
self._symbols
)
digest = self._digest_builder.build(list(self._history_records))

context = ComposeContext(
Expand Down Expand Up @@ -514,6 +513,7 @@ def _build_summary(
unrealized_pnl=self._unrealized_pnl,
unrealized_pnl_pct=unrealized_pnl_pct,
pnl_pct=pnl_pct,
total_value=equity,
last_updated_ts=timestamp_ms,
)

Expand Down
16 changes: 14 additions & 2 deletions python/valuecell/agents/strategy_agent/data/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from typing import List

from ..models import Candle
from ..models import Candle, MarketSnapShotType

# Contracts for market data sources (module-local abstract interfaces).
# These are plain ABCs (not Pydantic models) so implementations can be
Expand All @@ -25,8 +25,20 @@ async def get_recent_candles(
"""Return recent candles (OHLCV) for the given symbols/interval.

Args:
symbols: list of symbols (e.g., ["BTCUSDT", "ETHUSDT"])
symbols: list of symbols (e.g., ["BTC/USDT", "ETH/USDT"])
interval: candle interval string (e.g., "1m", "5m")
lookback: number of bars to retrieve
"""
raise NotImplementedError

@abstractmethod
async def get_market_snapshot(self, symbols: List[str]) -> MarketSnapShotType:
"""Return a lightweight market snapshot mapping symbol -> price.

Implementations may call exchange endpoints (ticker, funding, open
interest) to build an authoritative latest-price mapping. The return
value should be a dict where keys are symbol strings and values are
latest price floats (or absent if not available).
"""

raise NotImplementedError
141 changes: 133 additions & 8 deletions python/valuecell/agents/strategy_agent/data/market.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from collections import defaultdict
from typing import Dict, List, Optional

import ccxt.pro as ccxtpro
from loguru import logger

from ..models import Candle, InstrumentRef
from ..models import Candle, InstrumentRef, MarketSnapShotType
from ..utils import get_exchange_cls, normalize_symbol
from .interfaces import MarketDataSource


Expand Down Expand Up @@ -34,11 +34,7 @@ async def get_recent_candles(
) -> List[Candle]:
async def _fetch(symbol: str) -> List[List]:
# instantiate exchange class by name (e.g., ccxtpro.kraken)
exchange_cls = getattr(ccxtpro, self._exchange_id, None)
if exchange_cls is None:
raise RuntimeError(
f"Exchange '{self._exchange_id}' not found in ccxt.pro"
)
exchange_cls = get_exchange_cls(self._exchange_id)
exchange = exchange_cls({"newUpdates": False, **self._ccxt_options})
try:
# ccxt.pro uses async fetch_ohlcv
Expand Down Expand Up @@ -66,7 +62,7 @@ async def _fetch(symbol: str) -> List[List]:
instrument=InstrumentRef(
symbol=symbol,
exchange_id=self._exchange_id,
quote_ccy="USD",
# quote_ccy="USD",
),
open=float(open_v),
high=float(high_v),
Expand All @@ -83,3 +79,132 @@ async def _fetch(symbol: str) -> List[List]:
self._exchange_id,
)
return candles

async def get_market_snapshot(self, symbols: List[str]) -> MarketSnapShotType:
"""Fetch latest prices for the given symbols using exchange endpoints.

The method tries to use the exchange's `fetch_ticker` (and optionally
`fetch_open_interest` / `fetch_funding_rate` when available) to build
a mapping symbol -> last price. On any failure for a symbol, it will
fall back to `base_prices` if provided or omit the symbol.
Example:
```
"BTC/USDT": {
"price": {
"symbol": "BTC/USDT:USDT",
"timestamp": 1762930517943,
"datetime": "2025-11-12T06:55:17.943Z",
"high": 105464.2,
"low": 102400.0,
"vwap": 103748.56,
"open": 105107.1,
"close": 103325.0,
"last": 103325.0,
"change": -1782.1,
"percentage": -1.696,
"average": 104216.0,
"baseVolume": 105445.427,
"quoteVolume": 10939811519.57,
"info": {
"symbol": "BTCUSDT",
"priceChange": "-1782.10",
"priceChangePercent": "-1.696",
"weightedAvgPrice": "103748.56",
"lastPrice": "103325.00",
"lastQty": "0.002",
"openPrice": "105107.10",
"highPrice": "105464.20",
"lowPrice": "102400.00",
"volume": "105445.427",
"quoteVolume": "10939811519.57",
"openTime": 1762844100000,
"closeTime": 1762930517943,
"firstId": 6852533393,
"lastId": 6856484055,
"count": 3942419
}
},
"open_interest": {
"symbol": "BTC/USDT:USDT",
"baseVolume": 85179.147,
"openInterestAmount": 85179.147,
"timestamp": 1762930517944,
"datetime": "2025-11-12T06:55:17.944Z",
"info": {
"symbol": "BTCUSDT",
"openInterest": "85179.147",
"time": 1762930517944
}
},
"funding_rate": {
"info": {
"symbol": "BTCUSDT",
"markPrice": "103325.10000000",
"indexPrice": "103382.54282609",
"estimatedSettlePrice": "103477.58650543",
"lastFundingRate": "0.00000967",
"interestRate": "0.00010000",
"nextFundingTime": 1762934400000,
"time": 1762930523000
},
"symbol": "BTC/USDT:USDT",
"markPrice": 103325.1,
"indexPrice": 103382.54282609,
"interestRate": 0.0001,
"estimatedSettlePrice": 103477.58650543,
"timestamp": 1762930523000,
"datetime": "2025-11-12T06:55:23.000Z",
"fundingRate": 9.67e-06,
"fundingTimestamp": 1762934400000,
"fundingDatetime": "2025-11-12T08:00:00.000Z"
}
}
```
"""
snapshot = defaultdict(dict)

exchange_cls = get_exchange_cls(self._exchange_id)
exchange = exchange_cls({"newUpdates": False, **self._ccxt_options})
try:
for symbol in symbols:
sym = normalize_symbol(symbol)
try:
ticker = await exchange.fetch_ticker(sym)
snapshot[symbol]["price"] = ticker

# best-effort: warm other endpoints (open interest / funding)
try:
oi = await exchange.fetch_open_interest(sym)
snapshot[symbol]["open_interest"] = oi
except Exception:
logger.exception(
"Failed to fetch open interest for {} at {}",
symbol,
self._exchange_id,
)

try:
fr = await exchange.fetch_funding_rate(sym)
snapshot[symbol]["funding_rate"] = fr
except Exception:
logger.exception(
"Failed to fetch funding rate for {} at {}",
symbol,
self._exchange_id,
)
except Exception:
logger.exception(
"Failed to fetch market snapshot for {} at {}",
symbol,
self._exchange_id,
)
finally:
try:
await exchange.close()
except Exception:
logger.exception(
"Failed to close exchange connection for {}",
self._exchange_id,
)

return dict(snapshot)
Loading