From 9b48f74d0daa3e7239202c96b4c4ae194929ed02 Mon Sep 17 00:00:00 2001 From: hazeone <709547807@qq.com> Date: Sat, 8 Nov 2025 16:22:26 +0800 Subject: [PATCH 1/4] add ccxt model --- .../strategy_agent/execution/__init__.py | 15 + .../strategy_agent/execution/ccxt_trading.py | 457 ++++++++++++++++++ .../strategy_agent/execution/factory.py | 89 ++++ .../valuecell/agents/strategy_agent/models.py | 38 +- .../agents/strategy_agent/runtime.py | 77 ++- 5 files changed, 671 insertions(+), 5 deletions(-) create mode 100644 python/valuecell/agents/strategy_agent/execution/ccxt_trading.py create mode 100644 python/valuecell/agents/strategy_agent/execution/factory.py diff --git a/python/valuecell/agents/strategy_agent/execution/__init__.py b/python/valuecell/agents/strategy_agent/execution/__init__.py index e69de29bb..70d26af97 100644 --- a/python/valuecell/agents/strategy_agent/execution/__init__.py +++ b/python/valuecell/agents/strategy_agent/execution/__init__.py @@ -0,0 +1,15 @@ +"""Execution adapters for trading instructions.""" + +from .ccxt_trading import CCXTExecutionGateway, create_ccxt_gateway +from .factory import create_execution_gateway, create_execution_gateway_sync +from .interfaces import ExecutionGateway +from .paper_trading import PaperExecutionGateway + +__all__ = [ + "ExecutionGateway", + "PaperExecutionGateway", + "CCXTExecutionGateway", + "create_ccxt_gateway", + "create_execution_gateway", + "create_execution_gateway_sync", +] diff --git a/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py b/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py new file mode 100644 index 000000000..3216b6673 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py @@ -0,0 +1,457 @@ +"""CCXT-based real exchange execution gateway. + +Supports: +- Spot trading +- Futures/Perpetual contracts (USDT-margined, coin-margined) +- Leverage trading (cross/isolated margin) +- Multiple exchanges via CCXT unified API +""" + +from __future__ import annotations + +from typing import Dict, List, Optional + +import ccxt.async_support as ccxt + +from ..models import ( + PriceMode, + TradeInstruction, + TradeSide, + TxResult, + TxStatus, +) +from .interfaces import ExecutionGateway + + +class CCXTExecutionGateway(ExecutionGateway): + """Async execution gateway using CCXT unified API for real exchanges. + + Features: + - Supports spot, futures, and perpetual contracts + - Automatic leverage and margin mode setup + - Symbol format normalization (BTC-USD -> BTC/USD:USD for futures) + - Proper error handling and partial fill support + - Fee tracking from exchange responses + """ + + def __init__( + self, + exchange_id: str, + api_key: str, + secret_key: str, + passphrase: Optional[str] = None, + testnet: bool = False, + default_type: str = "spot", + margin_mode: str = "isolated", + ccxt_options: Optional[Dict] = None, + ) -> None: + """Initialize CCXT exchange gateway. + + Args: + exchange_id: Exchange identifier (e.g., 'binance', 'okx', 'bybit') + api_key: API key for authentication + secret_key: Secret key for authentication + passphrase: Optional passphrase (required for OKX) + testnet: Whether to use testnet/sandbox mode + default_type: Default market type ('spot', 'future', 'swap') + margin_mode: Default margin mode ('isolated' or 'cross') + ccxt_options: Additional CCXT exchange options + """ + self.exchange_id = exchange_id.lower() + self.api_key = api_key + self.secret_key = secret_key + self.passphrase = passphrase + self.testnet = testnet + self.default_type = default_type + self.margin_mode = margin_mode + self._ccxt_options = ccxt_options or {} + + # Track leverage settings per symbol to avoid redundant calls + self._leverage_cache: Dict[str, float] = {} + self._margin_mode_cache: Dict[str, str] = {} + + # Exchange instance (lazy-initialized) + self._exchange: Optional[ccxt.Exchange] = None + + async def _get_exchange(self) -> ccxt.Exchange: + """Get or create the CCXT exchange instance.""" + if self._exchange is not None: + return self._exchange + + # Get exchange class by name + try: + exchange_class = getattr(ccxt, self.exchange_id) + except AttributeError: + raise ValueError( + f"Exchange '{self.exchange_id}' not supported by CCXT. " + f"Available: {', '.join(ccxt.exchanges)}" + ) + + # Build configuration + config = { + "apiKey": self.api_key, + "secret": self.secret_key, + "enableRateLimit": True, # Respect rate limits + "options": { + "defaultType": self.default_type, + **self._ccxt_options, + }, + } + + # Add passphrase if provided (required for OKX) + if self.passphrase: + config["password"] = self.passphrase + + # Create exchange instance + self._exchange = exchange_class(config) + + # Enable sandbox/testnet mode if requested + if self.testnet: + self._exchange.set_sandbox_mode(True) + + # Load markets + try: + await self._exchange.load_markets() + except Exception as e: + raise RuntimeError( + f"Failed to load markets for {self.exchange_id}: {e}" + ) from e + + return self._exchange + + def _normalize_symbol(self, symbol: str, market_type: Optional[str] = None) -> str: + """Normalize symbol format for CCXT. + + Examples: + BTC-USD -> BTC/USD (spot) + BTC-USDT -> BTC/USDT:USDT (USDT futures) + ETH-USD -> ETH/USD:USD (USD futures) + + Args: + symbol: Symbol in format 'BTC-USD', 'BTC-USDT', etc. + market_type: Optional market type override ('spot', 'future', 'swap') + + Returns: + Normalized CCXT symbol + """ + mtype = market_type or self.default_type + + # Replace dash with slash + base_symbol = symbol.replace("-", "/") + + # For futures/swap, append settlement currency + if mtype in ("future", "swap"): + # If symbol is like BTC/USDT, make it BTC/USDT:USDT + if ":" not in base_symbol: + parts = base_symbol.split("/") + if len(parts) == 2: + base_symbol = f"{parts[0]}/{parts[1]}:{parts[1]}" + + return base_symbol + + async def _setup_leverage( + self, symbol: str, leverage: Optional[float], exchange: ccxt.Exchange + ) -> None: + """Set leverage for a symbol if needed and supported. + + Args: + symbol: CCXT normalized symbol + leverage: Desired leverage (None means 1x) + exchange: CCXT exchange instance + """ + if leverage is None: + leverage = 1.0 + + # Check if already set to avoid redundant calls + if self._leverage_cache.get(symbol) == leverage: + return + + # Check if exchange supports setting leverage + if not exchange.has.get("setLeverage"): + return + + try: + await exchange.set_leverage(int(leverage), symbol) + self._leverage_cache[symbol] = leverage + except Exception as e: + # Some exchanges don't support leverage on certain symbols + # Log but don't fail the trade + print(f"Warning: Could not set leverage for {symbol}: {e}") + + async def _setup_margin_mode(self, symbol: str, exchange: ccxt.Exchange) -> None: + """Set margin mode for a symbol if needed and supported. + + Args: + symbol: CCXT normalized symbol + exchange: CCXT exchange instance + """ + # Check if already set + if self._margin_mode_cache.get(symbol) == self.margin_mode: + return + + # Check if exchange supports setting margin mode + if not exchange.has.get("setMarginMode"): + return + + try: + await exchange.set_margin_mode(self.margin_mode, symbol) + self._margin_mode_cache[symbol] = self.margin_mode + except Exception as e: + # Log but don't fail + print(f"Warning: Could not set margin mode for {symbol}: {e}") + + async def execute( + self, + instructions: List[TradeInstruction], + market_snapshot: Optional[Dict[str, float]] = None, + ) -> List[TxResult]: + """Execute trade instructions on the real exchange via CCXT. + + Args: + instructions: List of trade instructions to execute + market_snapshot: Optional market snapshot (not used for real execution) + + Returns: + List of transaction results with fill details + """ + if not instructions: + return [] + + exchange = await self._get_exchange() + results: List[TxResult] = [] + + for inst in instructions: + try: + result = await self._execute_single(inst, exchange) + results.append(result) + except Exception as e: + # Create error result for failed instruction + results.append( + TxResult( + instruction_id=inst.instruction_id, + instrument=inst.instrument, + side=inst.side, + requested_qty=float(inst.quantity), + filled_qty=0.0, + status=TxStatus.ERROR, + reason=str(e), + meta=inst.meta, + ) + ) + + return results + + async def _execute_single( + self, inst: TradeInstruction, exchange: ccxt.Exchange + ) -> TxResult: + """Execute a single trade instruction. + + Args: + inst: Trade instruction to execute + exchange: CCXT exchange instance + + Returns: + Transaction result with execution details + """ + # Normalize symbol for CCXT + symbol = self._normalize_symbol(inst.instrument.symbol) + + # Setup leverage and margin mode + await self._setup_leverage(symbol, inst.leverage, exchange) + await self._setup_margin_mode(symbol, exchange) + + # Map instruction to CCXT parameters + side = "buy" if inst.side == TradeSide.BUY else "sell" + order_type = "limit" if inst.price_mode == PriceMode.LIMIT else "market" + amount = float(inst.quantity) + price = float(inst.limit_price) if inst.limit_price else None + + # Build order params + params = {} + if inst.meta: + # Pass through any exchange-specific parameters + params.update(inst.meta) + + # Create order + try: + order = await exchange.create_order( + symbol=symbol, + type=order_type, + side=side, + amount=amount, + price=price, + params=params, + ) + except Exception as e: + raise RuntimeError(f"Failed to create order for {symbol}: {e}") from e + + # Parse order response + filled_qty = float(order.get("filled", 0.0)) + avg_price = float(order.get("average") or 0.0) + fee_cost = 0.0 + + # Extract fee information + if "fee" in order and order["fee"]: + fee_info = order["fee"] + fee_cost = float(fee_info.get("cost", 0.0)) + + # Calculate slippage if applicable + slippage_bps = None + if avg_price and inst.limit_price and inst.price_mode == PriceMode.LIMIT: + expected = float(inst.limit_price) + slippage = abs(avg_price - expected) / expected * 10000.0 + slippage_bps = slippage + + # Determine status + status = TxStatus.FILLED + if filled_qty < amount * 0.99: # Allow 1% tolerance + status = TxStatus.PARTIAL + if filled_qty == 0: + status = TxStatus.REJECTED + + return TxResult( + instruction_id=inst.instruction_id, + instrument=inst.instrument, + side=inst.side, + requested_qty=amount, + filled_qty=filled_qty, + avg_exec_price=avg_price if avg_price > 0 else None, + slippage_bps=slippage_bps, + fee_cost=fee_cost if fee_cost > 0 else None, + leverage=inst.leverage, + status=status, + reason=order.get("status") if status != TxStatus.FILLED else None, + meta={ + "order_id": order.get("id"), + "exchange_symbol": symbol, + **(inst.meta or {}), + }, + ) + + async def close(self) -> None: + """Close the exchange connection and cleanup resources.""" + if self._exchange is not None: + await self._exchange.close() + self._exchange = None + + async def fetch_balance(self) -> Dict: + """Fetch account balance from exchange. + + Returns: + Balance dictionary with free, used, and total amounts per currency + """ + exchange = await self._get_exchange() + return await exchange.fetch_balance() + + async def fetch_positions(self, symbols: Optional[List[str]] = None) -> List[Dict]: + """Fetch current positions from exchange. + + Args: + symbols: Optional list of symbols to fetch positions for + + Returns: + List of position dictionaries + """ + exchange = await self._get_exchange() + + # Check if exchange supports fetching positions + if not exchange.has.get("fetchPositions"): + return [] + + # Normalize symbols if provided + normalized_symbols = None + if symbols: + normalized_symbols = [self._normalize_symbol(s) for s in symbols] + + try: + positions = await exchange.fetch_positions(normalized_symbols) + return positions + except Exception as e: + print(f"Warning: Could not fetch positions: {e}") + return [] + + async def cancel_order(self, order_id: str, symbol: str) -> Dict: + """Cancel an open order. + + Args: + order_id: Order ID to cancel + symbol: Symbol of the order + + Returns: + Cancellation result dictionary + """ + exchange = await self._get_exchange() + normalized_symbol = self._normalize_symbol(symbol) + return await exchange.cancel_order(order_id, normalized_symbol) + + async def fetch_open_orders(self, symbol: Optional[str] = None) -> List[Dict]: + """Fetch open orders from exchange. + + Args: + symbol: Optional symbol to filter orders + + Returns: + List of open order dictionaries + """ + exchange = await self._get_exchange() + normalized_symbol = self._normalize_symbol(symbol) if symbol else None + return await exchange.fetch_open_orders(normalized_symbol) + + def __repr__(self) -> str: + mode = "testnet" if self.testnet else "live" + return ( + f"CCXTExecutionGateway(exchange={self.exchange_id}, " + f"type={self.default_type}, margin={self.margin_mode}, mode={mode})" + ) + + +async def create_ccxt_gateway( + exchange_id: str, + api_key: str, + secret_key: str, + passphrase: Optional[str] = None, + testnet: bool = False, + market_type: str = "spot", + margin_mode: str = "isolated", + **ccxt_options, +) -> CCXTExecutionGateway: + """Factory function to create and initialize a CCXT execution gateway. + + Args: + exchange_id: Exchange identifier (e.g., 'binance', 'okx', 'bybit') + api_key: API key for authentication + secret_key: Secret key for authentication + passphrase: Optional passphrase (required for OKX) + testnet: Whether to use testnet/sandbox mode + market_type: Market type ('spot', 'future', 'swap') + margin_mode: Margin mode ('isolated' or 'cross') + **ccxt_options: Additional CCXT exchange options + + Returns: + Initialized CCXT execution gateway + + Example: + >>> gateway = await create_ccxt_gateway( + ... exchange_id='binance', + ... api_key='YOUR_KEY', + ... secret_key='YOUR_SECRET', + ... market_type='swap', # For perpetual futures + ... margin_mode='isolated', + ... testnet=True + ... ) + """ + gateway = CCXTExecutionGateway( + exchange_id=exchange_id, + api_key=api_key, + secret_key=secret_key, + passphrase=passphrase, + testnet=testnet, + default_type=market_type, + margin_mode=margin_mode, + ccxt_options=ccxt_options, + ) + + # Pre-load markets to validate connection + await gateway._get_exchange() + + return gateway diff --git a/python/valuecell/agents/strategy_agent/execution/factory.py b/python/valuecell/agents/strategy_agent/execution/factory.py new file mode 100644 index 000000000..06c12dbdf --- /dev/null +++ b/python/valuecell/agents/strategy_agent/execution/factory.py @@ -0,0 +1,89 @@ +"""Factory for creating execution gateways based on configuration.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ..models import ExchangeConfig + +from .ccxt_trading import CCXTExecutionGateway +from .interfaces import ExecutionGateway +from .paper_trading import PaperExecutionGateway + + +async def create_execution_gateway(config: ExchangeConfig) -> ExecutionGateway: + """Create an execution gateway based on exchange configuration. + + Args: + config: Exchange configuration with trading mode and credentials + + Returns: + ExecutionGateway instance (paper or real CCXT gateway) + + Raises: + ValueError: If configuration is invalid for the requested trading mode + """ + from ..models import TradingMode + + # Virtual/paper trading mode + if config.trading_mode == TradingMode.VIRTUAL: + return PaperExecutionGateway(fee_bps=config.fee_bps) + + # Live trading mode requires exchange credentials + if config.trading_mode == TradingMode.LIVE: + if not config.exchange_id: + raise ValueError( + "exchange_id is required for live trading mode. " + "Please specify an exchange (e.g., 'binance', 'okx', 'bybit')" + ) + + if not config.api_key or not config.secret_key: + raise ValueError( + f"API credentials are required for live trading on {config.exchange_id}. " + "Please provide api_key and secret_key in ExchangeConfig." + ) + + # Create CCXT gateway with full configuration + gateway = CCXTExecutionGateway( + exchange_id=config.exchange_id, + api_key=config.api_key, + secret_key=config.secret_key, + passphrase=config.passphrase, + testnet=config.testnet, + default_type=config.market_type.value, + margin_mode=config.margin_mode.value, + ) + + # Initialize exchange connection + await gateway._get_exchange() + + return gateway + + raise ValueError(f"Unsupported trading mode: {config.trading_mode}") + + +def create_execution_gateway_sync(config: ExchangeConfig) -> ExecutionGateway: + """Synchronous version that returns paper gateway or raises for live mode. + + Use this when you need a gateway immediately without async initialization. + For live trading, use the async create_execution_gateway instead. + + Args: + config: Exchange configuration + + Returns: + ExecutionGateway instance + + Raises: + RuntimeError: If live trading is requested (requires async initialization) + """ + from ..models import TradingMode + + if config.trading_mode == TradingMode.VIRTUAL: + return PaperExecutionGateway(fee_bps=config.fee_bps) + + raise RuntimeError( + "Live trading gateway requires async initialization. " + "Use 'await create_execution_gateway(config)' instead." + ) diff --git a/python/valuecell/agents/strategy_agent/models.py b/python/valuecell/agents/strategy_agent/models.py index 87d271185..b8e559062 100644 --- a/python/valuecell/agents/strategy_agent/models.py +++ b/python/valuecell/agents/strategy_agent/models.py @@ -108,11 +108,27 @@ def _fill_defaults(cls, data): return values +class MarketType(str, Enum): + """Market type for trading.""" + + SPOT = "spot" + FUTURE = "future" + SWAP = "swap" # Perpetual futures + + +class MarginMode(str, Enum): + """Margin mode for leverage trading.""" + + ISOLATED = "isolated" # Isolated margin (逐仓) + CROSS = "cross" # Cross margin (全仓) + + class ExchangeConfig(BaseModel): """Exchange configuration for trading.""" exchange_id: Optional[str] = Field( - default=None, description="Exchange identifier (e.g., 'okx', 'binance')" + default=None, + description="Exchange identifier (e.g., 'okx', 'binance', 'bybit')", ) trading_mode: TradingMode = Field( default=TradingMode.VIRTUAL, description="Trading mode for this strategy" @@ -123,6 +139,26 @@ class ExchangeConfig(BaseModel): secret_key: Optional[str] = Field( default=None, description="Exchange secret key (required for live trading)" ) + passphrase: Optional[str] = Field( + default=None, + description="API passphrase (required for some exchanges like OKX)", + ) + testnet: bool = Field( + default=False, description="Use testnet/sandbox mode for testing" + ) + market_type: MarketType = Field( + default=MarketType.SPOT, + description="Market type: spot, future (delivery), or swap (perpetual)", + ) + margin_mode: MarginMode = Field( + default=MarginMode.ISOLATED, + description="Margin mode: isolated (逐仓) or cross (全仓)", + ) + fee_bps: float = Field( + default=10.0, + description="Trading fee in basis points (default 10 bps = 0.1%) for paper trading", + gt=0, + ) class TradingConfig(BaseModel): diff --git a/python/valuecell/agents/strategy_agent/runtime.py b/python/valuecell/agents/strategy_agent/runtime.py index ed29c683e..b066ed194 100644 --- a/python/valuecell/agents/strategy_agent/runtime.py +++ b/python/valuecell/agents/strategy_agent/runtime.py @@ -7,9 +7,10 @@ from .core import DecisionCycleResult, DefaultDecisionCoordinator from .data.market import SimpleMarketDataSource from .decision.composer import LlmComposer -from .execution.paper_trading import PaperExecutionGateway +from .execution.factory import create_execution_gateway, create_execution_gateway_sync +from .execution.interfaces import ExecutionGateway from .features.simple import SimpleFeatureComputer -from .models import Constraints, UserRequest +from .models import Constraints, TradingMode, UserRequest from .portfolio.in_memory import InMemoryPortfolioService from .trading_history.digest import RollingDigestBuilder from .trading_history.recorder import InMemoryHistoryRecorder @@ -74,7 +75,27 @@ async def run_cycle(self) -> DecisionCycleResult: return await self.coordinator.run_once() -def create_strategy_runtime(request: UserRequest) -> StrategyRuntime: +def create_strategy_runtime( + request: UserRequest, + execution_gateway: Optional[ExecutionGateway] = None, +) -> StrategyRuntime: + """Create a strategy runtime with synchronous initialization. + + Note: This function only supports paper trading by default. For live trading, + use create_strategy_runtime_async() instead, which properly initializes + the CCXT exchange connection. + + Args: + request: User request with strategy configuration + execution_gateway: Optional pre-initialized execution gateway. + If None, will be created based on request.exchange_config. + + Returns: + StrategyRuntime instance + + Raises: + RuntimeError: If live trading is requested without providing a gateway + """ strategy_id = generate_uuid("strategy") initial_capital = request.trading_config.initial_capital or 0.0 constraints = Constraints( @@ -97,7 +118,16 @@ def create_strategy_runtime(request: UserRequest) -> StrategyRuntime: ) feature_computer = SimpleFeatureComputer() composer = LlmComposer(request=request) - execution_gateway = PaperExecutionGateway() + + # Create execution gateway if not provided + if execution_gateway is None: + if request.exchange_config.trading_mode == TradingMode.LIVE: + raise RuntimeError( + "Live trading requires async initialization. " + "Use create_strategy_runtime_async() or provide a pre-initialized gateway." + ) + execution_gateway = create_execution_gateway_sync(request.exchange_config) + history_recorder = InMemoryHistoryRecorder() digest_builder = RollingDigestBuilder() @@ -119,3 +149,42 @@ def create_strategy_runtime(request: UserRequest) -> StrategyRuntime: strategy_id=strategy_id, coordinator=coordinator, ) + + +async def create_strategy_runtime_async(request: UserRequest) -> StrategyRuntime: + """Create a strategy runtime with async initialization (supports live trading). + + This function properly initializes CCXT exchange connections for live trading. + It can also be used for paper trading. + + Args: + request: User request with strategy configuration + + Returns: + StrategyRuntime instance with initialized execution gateway + + Example: + >>> request = UserRequest( + ... exchange_config=ExchangeConfig( + ... exchange_id='binance', + ... trading_mode=TradingMode.LIVE, + ... api_key='YOUR_KEY', + ... secret_key='YOUR_SECRET', + ... market_type=MarketType.SWAP, + ... margin_mode=MarginMode.ISOLATED, + ... testnet=True, + ... ), + ... trading_config=TradingConfig( + ... symbols=['BTC-USDT', 'ETH-USDT'], + ... initial_capital=10000.0, + ... max_leverage=10.0, + ... max_positions=5, + ... ) + ... ) + >>> runtime = await create_strategy_runtime_async(request) + """ + # Create execution gateway asynchronously + execution_gateway = await create_execution_gateway(request.exchange_config) + + # Use the sync function with the pre-initialized gateway + return create_strategy_runtime(request, execution_gateway=execution_gateway) From f35a9af1c8c7c94412573fc8922b762efc871e49 Mon Sep 17 00:00:00 2001 From: hazeone <709547807@qq.com> Date: Sat, 8 Nov 2025 16:37:49 +0800 Subject: [PATCH 2/4] add okx filter --- .../strategy-items/create-strategy-modal.tsx | 42 +++++++++++++++++++ frontend/src/types/strategy.ts | 1 + 2 files changed, 43 insertions(+) diff --git a/frontend/src/app/agent/components/strategy-items/create-strategy-modal.tsx b/frontend/src/app/agent/components/strategy-items/create-strategy-modal.tsx index 3d521d986..3b2721e8a 100644 --- a/frontend/src/app/agent/components/strategy-items/create-strategy-modal.tsx +++ b/frontend/src/app/agent/components/strategy-items/create-strategy-modal.tsx @@ -55,6 +55,7 @@ const step2Schema = z exchange_id: z.string(), api_key: z.string(), secret_key: z.string(), + passphrase: z.string(), // Required string, but can be empty for non-OKX exchanges }) .superRefine((data, ctx) => { // Only validate exchange credentials when live trading is selected @@ -86,6 +87,15 @@ const step2Schema = z }); } } + + // OKX requires passphrase + if (data.exchange_id === "okx" && !data.passphrase?.trim()) { + ctx.addIssue({ + code: "custom", + message: "Password is required for OKX", + path: ["passphrase"], + }); + } } // Virtual trading mode: no validation needed for exchange fields }); @@ -214,6 +224,7 @@ const CreateStrategyModal: FC = ({ children }) => { exchange_id: "okx", api_key: "", secret_key: "", + passphrase: "", }, validators: { onSubmit: step2Schema, @@ -543,6 +554,37 @@ const CreateStrategyModal: FC = ({ children }) => { )} + + {/* Password field - only shown for OKX */} + + {(exchangeField) => + exchangeField.state.value === "okx" ? ( + + {(field) => ( + + + Password + + + field.handleChange(e.target.value) + } + onBlur={field.handleBlur} + placeholder="Enter Password (Required for OKX)" + /> + {field.state.meta.errors.length > + 0 && ( + + )} + + )} + + ) : null + } + )} diff --git a/frontend/src/types/strategy.ts b/frontend/src/types/strategy.ts index 5fea58cc7..622c7bbc1 100644 --- a/frontend/src/types/strategy.ts +++ b/frontend/src/types/strategy.ts @@ -61,6 +61,7 @@ export interface CreateStrategyRequest { trading_mode: "live" | "virtual"; api_key?: string; secret_key?: string; + passphrase?: string; // Required for some exchanges like OKX }; // Trading Strategy Configuration From 1ee816fbfceb4a0b235ec49c43f77d124fbd3f0d Mon Sep 17 00:00:00 2001 From: hazeone <709547807@qq.com> Date: Sat, 8 Nov 2025 19:50:07 +0800 Subject: [PATCH 3/4] fix async api --- python/valuecell/agents/strategy_agent/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/valuecell/agents/strategy_agent/agent.py b/python/valuecell/agents/strategy_agent/agent.py index 7687b816d..0baa482ba 100644 --- a/python/valuecell/agents/strategy_agent/agent.py +++ b/python/valuecell/agents/strategy_agent/agent.py @@ -16,7 +16,7 @@ StrategyStatusContent, UserRequest, ) -from .runtime import create_strategy_runtime +from .runtime import create_strategy_runtime_async class StrategyAgent(BaseAgent): @@ -122,7 +122,7 @@ async def stream( yield streaming.done() return - runtime = create_strategy_runtime(request) + runtime = await create_strategy_runtime_async(request) strategy_id = runtime.strategy_id logger.info( "Created runtime for strategy_id={} conversation={} task={}", From 5c2e717fbff4ed3adb2f636c7e0642d3bcecfdb9 Mon Sep 17 00:00:00 2001 From: hazeone <709547807@qq.com> Date: Sat, 8 Nov 2025 22:13:13 +0800 Subject: [PATCH 4/4] fix trading history and positions --- .../valuecell/agents/strategy_agent/core.py | 29 +++++++++++++ .../strategy_agent/decision/composer.py | 34 ++++++++++++--- .../strategy_agent/execution/ccxt_trading.py | 43 +++++++++++++++++++ 3 files changed, 100 insertions(+), 6 deletions(-) diff --git a/python/valuecell/agents/strategy_agent/core.py b/python/valuecell/agents/strategy_agent/core.py index 758919eb9..9a8b82f59 100644 --- a/python/valuecell/agents/strategy_agent/core.py +++ b/python/valuecell/agents/strategy_agent/core.py @@ -5,6 +5,8 @@ from datetime import datetime, timezone from typing import Callable, Dict, List +from loguru import logger + from valuecell.utils.uuid import generate_uuid from .data.interfaces import MarketDataSource @@ -24,6 +26,7 @@ TradeSide, TradeType, TxResult, + TxStatus, UserRequest, ) from .portfolio.interfaces import PortfolioService @@ -148,10 +151,27 @@ async def run_once(self) -> DecisionCycleResult: ) instructions = await self._composer.compose(context) + logger.info(f"🔍 Composer returned {len(instructions)} instructions") + for idx, inst in enumerate(instructions): + logger.info( + f" 📝 Instruction {idx}: {inst.instrument.symbol} {inst.side.value} qty={inst.quantity}" + ) + # Execute instructions via async gateway to obtain execution results + logger.info( + f"🚀 Calling execution_gateway.execute() with {len(instructions)} instructions" + ) + logger.info( + f" ExecutionGateway type: {type(self._execution_gateway).__name__}" + ) tx_results = await self._execution_gateway.execute( instructions, market_snapshot ) + logger.info(f"✅ ExecutionGateway returned {len(tx_results)} results") + for idx, tx in enumerate(tx_results): + logger.info( + f" 📊 TxResult {idx}: {tx.instrument.symbol} status={tx.status.value} filled_qty={tx.filled_qty}" + ) trades = self._create_trades(tx_results, compose_id, timestamp_ms) self._portfolio_service.apply_trades(trades, market_snapshot) @@ -197,7 +217,16 @@ def _create_trades( pre_view = None for tx in tx_results: + # Skip failed or rejected trades - only create history entries for successful fills + # (including partial fills which may still have filled_qty > 0) + if tx.status in (TxStatus.ERROR, TxStatus.REJECTED): + continue + qty = float(tx.filled_qty or 0.0) + # Skip trades with zero filled quantity + if qty == 0: + continue + price = float(tx.avg_exec_price or 0.0) notional = (price * qty) if price and qty else None # Immediate realized effect: fees are costs (negative PnL). Slippage already baked into exec price. diff --git a/python/valuecell/agents/strategy_agent/decision/composer.py b/python/valuecell/agents/strategy_agent/decision/composer.py index 550f1c7df..295138bcb 100644 --- a/python/valuecell/agents/strategy_agent/decision/composer.py +++ b/python/valuecell/agents/strategy_agent/decision/composer.py @@ -194,6 +194,7 @@ def _normalize_quantity( qty = quantity # Step 1: per-order filters (step size, min notional, max order qty) + logger.debug(f"_normalize_quantity Step 1: {symbol} qty={qty} before filters") qty = self._apply_quantity_filters( symbol, qty, @@ -203,13 +204,11 @@ def _normalize_quantity( constraints.min_notional, price_map, ) + logger.debug(f"_normalize_quantity Step 1: {symbol} qty={qty} after filters") if qty <= self._quantity_precision: - logger.debug( - "Post-filter quantity for {} is {} <= precision {} -> skipping", - symbol, - qty, - self._quantity_precision, + logger.warning( + f"Post-filter quantity for {symbol} is {qty} <= precision {self._quantity_precision} -> returning 0" ) return 0.0, 0.0 @@ -402,6 +401,7 @@ def _count_active(pos_map: Dict[str, float]) -> int: quantity = abs(delta) # Normalize quantity through all guardrails + logger.debug(f"Before normalize: {symbol} quantity={quantity}") quantity, consumed_bp = self._normalize_quantity( symbol, quantity, @@ -413,8 +413,14 @@ def _count_active(pos_map: Dict[str, float]) -> int: projected_gross, price_map, ) + logger.debug( + f"After normalize: {symbol} quantity={quantity}, consumed_bp={consumed_bp}" + ) if quantity <= self._quantity_precision: + logger.warning( + f"SKIPPED: {symbol} quantity={quantity} <= precision={self._quantity_precision} after normalization" + ) continue # Update projected positions for subsequent guardrails @@ -529,24 +535,40 @@ def _apply_quantity_filters( market_snapshot: Dict[str, float], ) -> float: qty = quantity + logger.debug(f"Filtering {symbol}: initial qty={qty}") if max_order_qty is not None: qty = min(qty, float(max_order_qty)) + logger.debug(f"After max_order_qty filter: qty={qty}") if quantity_step > 0: qty = math.floor(qty / quantity_step) * quantity_step + logger.debug(f"After quantity_step filter: qty={qty}") if qty <= 0: + logger.warning(f"FILTERED: {symbol} qty={qty} <= 0") return 0.0 if qty < min_trade_qty: + logger.warning( + f"FILTERED: {symbol} qty={qty} < min_trade_qty={min_trade_qty}" + ) return 0.0 if min_notional is not None: price = market_snapshot.get(symbol) if price is None: + logger.warning(f"FILTERED: {symbol} no price in market_snapshot") return 0.0 - if qty * price < float(min_notional): + notional = qty * price + if notional < float(min_notional): + logger.warning( + f"FILTERED: {symbol} notional={notional:.4f} < min_notional={min_notional}" + ) return 0.0 + logger.debug( + f"Passed min_notional check: notional={notional:.4f} >= {min_notional}" + ) + logger.debug(f"Final qty for {symbol}: {qty}") return qty diff --git a/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py b/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py index 3216b6673..c4c2bd186 100644 --- a/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py +++ b/python/valuecell/agents/strategy_agent/execution/ccxt_trading.py @@ -9,9 +9,11 @@ from __future__ import annotations +import asyncio from typing import Dict, List, Optional import ccxt.async_support as ccxt +from loguru import logger from ..models import ( PriceMode, @@ -215,12 +217,19 @@ async def execute( List of transaction results with fill details """ if not instructions: + logger.warning("⚠️ CCXTExecutionGateway: No instructions to execute") return [] + logger.info( + f"💰 CCXTExecutionGateway: Executing {len(instructions)} instructions" + ) exchange = await self._get_exchange() results: List[TxResult] = [] for inst in instructions: + logger.info( + f" 📤 Processing {inst.instrument.symbol} {inst.side.value} qty={inst.quantity}" + ) try: result = await self._execute_single(inst, exchange) results.append(result) @@ -274,6 +283,9 @@ async def _execute_single( # Create order try: + logger.info( + f" 🔨 Creating {order_type} order: {side} {amount} {symbol} @ {price if price else 'market'}" + ) order = await exchange.create_order( symbol=symbol, type=order_type, @@ -282,14 +294,45 @@ async def _execute_single( price=price, params=params, ) + logger.info( + f" ✓ Order created: id={order.get('id')}, status={order.get('status')}, filled={order.get('filled')}" + ) except Exception as e: + logger.error(f" ❌ ERROR creating order for {symbol}: {e}") raise RuntimeError(f"Failed to create order for {symbol}: {e}") from e + # For market orders, wait for fill and fetch final order status + # Many exchanges don't immediately return filled quantities for market orders + if order_type == "market": + order_id = order.get("id") + if order_id and exchange.has.get("fetchOrder"): + try: + # Wait a short time for market order to fill + logger.info( + f" ⏳ Waiting 0.5s for market order {order_id} to fill..." + ) + await asyncio.sleep(0.5) + + # Fetch updated order status + order = await exchange.fetch_order(order_id, symbol) + logger.info( + f" 📈 Order status after fetch: filled={order.get('filled')}, average={order.get('average')}, status={order.get('status')}" + ) + except Exception as e: + # If fetch fails, use original order response + logger.warning( + f" ⚠️ Could not fetch order status for {symbol}: {e}" + ) + # Parse order response filled_qty = float(order.get("filled", 0.0)) avg_price = float(order.get("average") or 0.0) fee_cost = 0.0 + logger.info( + f" 📊 Final parsed: filled_qty={filled_qty}, avg_price={avg_price}" + ) + # Extract fee information if "fee" in order and order["fee"]: fee_info = order["fee"]