diff --git a/python/configs/agent_cards/auto_trading_agent.json b/python/configs/agent_cards/auto_trading_agent.json new file mode 100644 index 000000000..a1dedb730 --- /dev/null +++ b/python/configs/agent_cards/auto_trading_agent.json @@ -0,0 +1,175 @@ +{ + "name": "AutoTradingAgent", + "display_name": "Auto Trading Agent", + "url": "http://localhost:10003/", + "description": "Automated cryptocurrency trading agent with real-time technical analysis, position management, and continuous monitoring. Supports multiple crypto assets and AI-powered trading strategies using various LLM models via OpenRouter.", + "capabilities": { + "streaming": true, + "push_notifications": true + }, + "skills": [{ + "id": "auto_trading_setup", + "name": "Setup Auto Trading", + "description": "Configure the automated trading agent with initial capital, crypto symbols, risk parameters, and AI model selection.", + "examples": [ + "Setup trading with $100,000 for BTC-USD and ETH-USD using DeepSeek model", + "Configure auto trader with $50,000 capital, trade BTC-USD, ETH-USD, SOL-USD with 1.5% risk", + "Setup trading agent with default settings for Bitcoin" + ], + "tags": [ + "setup", + "configuration", + "initialization" + ] + }, + { + "id": "auto_trading_notify", + "name": "Live Trading Monitoring", + "description": "Start continuous automated trading with real-time technical analysis, trade execution notifications, and portfolio updates.", + "examples": [ + "Start automated trading for BTC-USD with Claude model", + "Monitor and trade ETH-USD and SOL-USD using GPT-4 analysis", + "Begin live trading with portfolio notifications" + ], + "tags": [ + "trading", + "monitoring", + "real-time", + "notifications" + ] + }, + { + "id": "technical_analysis", + "name": "Technical Analysis", + "description": "Calculate and analyze technical indicators including MACD, RSI, EMA, and Bollinger Bands for trading decisions.", + "examples": [ + "Analyze BTC-USD technical indicators", + "Show MACD and RSI signals for ETH-USD", + "Calculate Bollinger Bands for SOL-USD" + ], + "tags": [ + "analysis", + "indicators", + "signals" + ] + }, + { + "id": "position_management", + "name": "Position Management", + "description": "Manage trading positions with risk controls, automatic entry/exit, and portfolio tracking.", + "examples": [ + "Show current positions and portfolio value", + "What is my P&L on open trades?", + "Display portfolio summary" + ], + "tags": [ + "positions", + "portfolio", + "risk-management" + ] + } + ], + "enabled": true, + "provider": { + "organization": "ValueCell", + "url": "https://github.com/valuecell" + }, + "metadata": { + "version": "1.0.0", + "author": "ValueCell Team", + "tags": [ + "trading", + "crypto", + "automation", + "technical-analysis", + "ai-trading", + "real-time" + ], + "supported_crypto_symbols": [ + "BTC-USD", + "ETH-USD", + "USDT-USD", + "BNB-USD", + "XRP-USD", + "SOL-USD", + "USDC-USD", + "TRX-USD", + "DOGE-USD", + "ADA-USD", + "MATIC-USD", + "WBTC-USD", + "LINK-USD", + "XLM-USD", + "BCH-USD", + "SUI-USD", + "HBAR-USD", + "AVAX-USD", + "TON-USD", + "LTC-USD", + "MNT-USD", + "NEAR-USD", + "FLOW-USD", + "OKB-USD", + "APE-USD", + "ATOM-USD", + "QNT-USD", + "BSV-USD", + "EOS-USD", + "NEO-USD", + "DYDX-USD", + "CRV-USD", + "KLAY-USD", + "MINA-USD", + "CHZ-USD", + "AXS-USD", + "FTM-USD", + "IMX-USD", + "ZEC-USD", + "ALGO-USD", + "XDC-USD", + "ICP-USD", + "BTG-USD", + "BLZ-USD", + "ENJ-USD", + "KSM-USD", + "VGX-USD", + "RUNE-USD", + "BAT-USD", + "THETA-USD", + "QTUM-USD" + ], + "technical_indicators": [ + "MACD (12/26/9)", + "RSI (14)", + "EMA (12/26/50)", + "Bollinger Bands (20, 2Οƒ)" + ], + "supported_models": [ + "deepseek/deepseek-v3.1-terminus", + "openai/gpt-5", + "anthropic/claude-sonnet-4.5", + "x-ai/grok-4", + "qwen/qwen-max", + "google/gemini-2.5-pro", + "meta-llama/llama-3.1-70b-instruct" + ], + "trading_features": [ + "Long and Short positions", + "Risk-based position sizing", + "Maximum position limits", + "Real-time notifications", + "Portfolio value tracking", + "Multi-symbol trading" + ], + "notification_types": [ + "Trade execution (open/close)", + "Portfolio value updates", + "Technical signal alerts" + ], + "risk_parameters": { + "default_risk_per_trade": 0.02, + "default_max_positions": 3, + "default_check_interval": 60 + } + } +} \ No newline at end of file diff --git a/python/scripts/launch.py b/python/scripts/launch.py index 9eaa4055c..bcd09eb61 100644 --- a/python/scripts/launch.py +++ b/python/scripts/launch.py @@ -30,7 +30,12 @@ } TRADING_AGENTS_NAME = "TradingAgents" RESEARCH_AGENT_NAME = "ResearchAgent" -AGENTS = list(MAP_NAME_ANALYST.keys()) + [TRADING_AGENTS_NAME, RESEARCH_AGENT_NAME] +AUTO_TRADING_AGENT_NAME = "AutoTradingAgent" +AGENTS = list(MAP_NAME_ANALYST.keys()) + [ + TRADING_AGENTS_NAME, + RESEARCH_AGENT_NAME, + AUTO_TRADING_AGENT_NAME, +] PROJECT_DIR = Path(__file__).resolve().parent.parent.parent PYTHON_DIR = PROJECT_DIR / "python" @@ -54,6 +59,9 @@ MAP_NAME_COMMAND[RESEARCH_AGENT_NAME] = ( f"uv run --env-file {ENV_PATH_STR} -m valuecell.agents.research_agent" ) +MAP_NAME_COMMAND[AUTO_TRADING_AGENT_NAME] = ( + f"uv run --env-file {ENV_PATH_STR} -m valuecell.agents.auto_trading_agent" +) BACKEND_COMMAND = ( f"cd {PYTHON_DIR_STR} && uv run --env-file {ENV_PATH_STR} -m valuecell.server.main" ) diff --git a/python/valuecell/agents/auto_trading_agent/__init__.py b/python/valuecell/agents/auto_trading_agent/__init__.py new file mode 100644 index 000000000..6587d296b --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/__init__.py @@ -0,0 +1,66 @@ +"""Auto Trading Agent - Modular architecture for automated crypto trading + +Modules: +- agent: Main AutoTradingAgent orchestrator +- models: Data models and enumerations +- position_manager: Position and cash management +- market_data: Technical analysis and indicator retrieval +- trade_recorder: Trade history and statistics +- trading_executor: High-level trade execution facade +- technical_analysis: Backward-compatible technical analysis interface +- portfolio_decision_manager: Portfolio-level decision making +- formatters: Message formatting utilities +- constants: Configuration constants +""" + +from .agent import AutoTradingAgent +from .market_data import MarketDataProvider, SignalGenerator +from .models import ( + AutoTradingConfig, + CashManagement, + PortfolioValueSnapshot, + Position, + PositionHistorySnapshot, + TechnicalIndicators, + TradeAction, + TradeHistoryRecord, + TradeType, + TradingRequest, +) +from .portfolio_decision_manager import ( + AssetAnalysis, + PortfolioDecision, + PortfolioDecisionManager, +) +from .position_manager import PositionManager +from .technical_analysis import AISignalGenerator, TechnicalAnalyzer +from .trade_recorder import TradeRecorder +from .trading_executor import TradingExecutor + +__all__ = [ + # Main agent + "AutoTradingAgent", + # Core modules + "TradingExecutor", + "PositionManager", + "TradeRecorder", + "MarketDataProvider", + "SignalGenerator", + "PortfolioDecisionManager", + # Models + "AutoTradingConfig", + "TradingRequest", + "Position", + "CashManagement", + "TechnicalIndicators", + "TradeHistoryRecord", + "PositionHistorySnapshot", + "PortfolioValueSnapshot", + "TradeAction", + "TradeType", + "AssetAnalysis", + "PortfolioDecision", + # Utilities + "TechnicalAnalyzer", + "AISignalGenerator", +] diff --git a/python/valuecell/agents/auto_trading_agent/__main__.py b/python/valuecell/agents/auto_trading_agent/__main__.py new file mode 100644 index 000000000..d919d266a --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/__main__.py @@ -0,0 +1,11 @@ +"""Main entry point for auto trading agent""" + +import asyncio + +from valuecell.core.agent.decorator import create_wrapped_agent + +from .agent import AutoTradingAgent + +if __name__ == "__main__": + agent = create_wrapped_agent(AutoTradingAgent) + asyncio.run(agent.serve()) diff --git a/python/valuecell/agents/auto_trading_agent/agent.py b/python/valuecell/agents/auto_trading_agent/agent.py new file mode 100644 index 000000000..4b808b835 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/agent.py @@ -0,0 +1,832 @@ +"""Main auto trading agent implementation with multi-instance support""" + +import asyncio +import json +import logging +import os +from datetime import datetime, timezone +from typing import Any, AsyncGenerator, Dict, Optional + +from agno.agent import Agent +from agno.models.openrouter import OpenRouter + +from valuecell.core.agent.responses import streaming +from valuecell.core.types import ( + BaseAgent, + ComponentType, + FilteredCardPushNotificationComponentData, + FilteredLineChartComponentData, + StreamResponse, +) + +from .constants import ( + DEFAULT_AGENT_MODEL, + DEFAULT_CHECK_INTERVAL, +) +from .formatters import MessageFormatter +from .models import ( + AutoTradingConfig, + TradingRequest, +) +from .portfolio_decision_manager import ( + AssetAnalysis, + PortfolioDecisionManager, +) +from .technical_analysis import AISignalGenerator, TechnicalAnalyzer +from .trading_executor import TradingExecutor + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class AutoTradingAgent(BaseAgent): + """ + Automated crypto trading agent with technical analysis and position management. + Supports multiple trading instances per session with independent configurations. + """ + + def __init__(self): + super().__init__() + + # Configuration + self.parser_model_id = os.getenv("TRADING_PARSER_MODEL_ID", DEFAULT_AGENT_MODEL) + + # Multi-instance state management + # Structure: {session_id: {instance_id: TradingInstanceData}} + self.trading_instances: Dict[str, Dict[str, Dict[str, Any]]] = {} + + try: + # Parser agent for natural language query parsing + self.parser_agent = Agent( + model=OpenRouter(id=self.parser_model_id), + output_schema=TradingRequest, + markdown=True, + ) + logger.info("Auto Trading Agent initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize Auto Trading Agent: {e}") + raise + + def _generate_instance_id(self, task_id: str) -> str: + """Generate unique instance ID""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"trade_{timestamp}_{task_id[:8]}" + + async def _parse_trading_request(self, query: str) -> TradingRequest: + """ + Parse natural language query to extract trading parameters + + Args: + query: User's natural language query + + Returns: + TradingRequest object with parsed parameters + """ + try: + parse_prompt = f""" + Parse the following user query and extract auto trading configuration parameters: + + User query: "{query}" + + Please identify: + 1. crypto_symbols: List of cryptocurrency symbols to trade (e.g., BTC-USD, ETH-USD, SOL-USD) + - If user mentions "Bitcoin", extract as "BTC-USD" + - If user mentions "Ethereum", extract as "ETH-USD" + - If user mentions "Solana", extract as "SOL-USD" + - Always use format: SYMBOL-USD + 2. initial_capital: Initial trading capital in USD (default: 100000 if not specified) + 3. use_ai_signals: Whether to use AI-enhanced signals (default: true) + 4. agent_model: Model ID for trading decisions (default: DEFAULT_AGENT_MODEL) + + Examples: + - "Trade Bitcoin and Ethereum with $50000" -> {{"crypto_symbols": ["BTC-USD", "ETH-USD"], "initial_capital": 50000, "use_ai_signals": true}} + - "Start auto trading BTC-USD" -> {{"crypto_symbols": ["BTC-USD"], "initial_capital": 100000, "use_ai_signals": true}} + - "Trade BTC with AI signals" -> {{"crypto_symbols": ["BTC-USD"], "initial_capital": 100000, "use_ai_signals": true}} + - "Trade BTC with AI signals using DeepSeek model" -> {{"crypto_symbols": ["BTC-USD"], "initial_capital": 100000, "use_ai_signals": true, "agent_model": "deepseek/deepseek-v3.1-terminus"}} + - "Trade Bitcoin, SOL, Eth and DOGE with 100000 capital, using x-ai/grok-4 model" -> {{"crypto_symbols": ["BTC-USD", "SOL-USD", "ETH-USD", "DOGE-USD"], "initial_capital": 100000, "use_ai_signals": true, "agent_model": "x-ai/grok-4"}} + """ + + response = await self.parser_agent.arun(parse_prompt) + trading_request = response.content + + logger.info(f"Parsed trading request: {trading_request}") + return trading_request + + except Exception as e: + logger.error(f"Failed to parse trading request: {e}") + raise ValueError( + f"Could not parse trading configuration from query: {query}" + ) + + def _initialize_ai_signal_generator( + self, config: AutoTradingConfig + ) -> Optional[AISignalGenerator]: + """Initialize AI signal generator if configured""" + if not config.use_ai_signals: + return None + + try: + api_key = config.openrouter_api_key or os.getenv("OPENROUTER_API_KEY") + if not api_key: + logger.warning("OpenRouter API key not provided, AI signals disabled") + return None + + llm_client = OpenRouter( + id=config.agent_model, + api_key=api_key, + ) + return AISignalGenerator(llm_client) + + except Exception as e: + logger.error(f"Failed to initialize AI signal generator: {e}") + return None + + def _get_instance_status_component_data( + self, session_id: str, instance_id: str + ) -> str: + """ + Generate portfolio status report in rich text format + + Returns: + Formatted portfolio details as markdown string + """ + if session_id not in self.trading_instances: + return "" + + if instance_id not in self.trading_instances[session_id]: + return "" + + instance = self.trading_instances[session_id][instance_id] + executor: TradingExecutor = instance["executor"] + config: AutoTradingConfig = instance["config"] + + # Get comprehensive portfolio summary + portfolio_summary = executor.get_portfolio_summary() + + # Calculate overall statistics + total_pnl = portfolio_summary["portfolio"]["total_pnl"] + pnl_pct = portfolio_summary["portfolio"]["pnl_percentage"] + portfolio_value = portfolio_summary["portfolio"]["total_value"] + available_cash = portfolio_summary["cash"]["available"] + + # Build rich text output + output = [] + + # Header + output.append(f"# πŸ“Š Trading Portfolio Status - {instance_id}") + output.append("\n**Instance Configuration**") + output.append(f"- Model: `{config.agent_model}`") + output.append(f"- Symbols: {', '.join(config.crypto_symbols)}") + output.append( + f"- Status: {'🟒 Active' if instance['active'] else 'πŸ”΄ Stopped'}" + ) + + # Portfolio Summary Section + output.append("\n## πŸ’° Portfolio Summary") + output.append("\n**Overall Performance**") + output.append(f"- Initial Capital: `${config.initial_capital:,.2f}`") + output.append(f"- Current Value: `${portfolio_value:,.2f}`") + + pnl_emoji = "🟒" if total_pnl >= 0 else "πŸ”΄" + pnl_sign = "+" if total_pnl >= 0 else "" + output.append( + f"- Total P&L: {pnl_emoji} **{pnl_sign}${total_pnl:,.2f}** ({pnl_sign}{pnl_pct:.2f}%)" + ) + + output.append("\n**Cash Position**") + output.append(f"- Available Cash: `${available_cash:,.2f}`") + + # Current Positions Section + output.append(f"\n## πŸ“ˆ Current Positions ({len(executor.positions)})") + + if executor.positions: + output.append( + "\n| Symbol | Type | Quantity | Avg Price | Current Price | Position Value | Unrealized P&L |" + ) + output.append( + "|--------|------|----------|-----------|---------------|----------------|----------------|" + ) + + for symbol, pos in executor.positions.items(): + try: + import yfinance as yf + + ticker = yf.Ticker(symbol) + current_price = ticker.history(period="1d", interval="1m")[ + "Close" + ].iloc[-1] + + # Calculate unrealized P&L + if pos.trade_type.value == "long": + unrealized_pnl = (current_price - pos.entry_price) * abs( + pos.quantity + ) + position_value = abs(pos.quantity) * current_price + else: + unrealized_pnl = (pos.entry_price - current_price) * abs( + pos.quantity + ) + position_value = pos.notional + unrealized_pnl + + # Format row + pnl_emoji = "🟒" if unrealized_pnl >= 0 else "πŸ”΄" + pnl_sign = "+" if unrealized_pnl >= 0 else "" + + output.append( + f"| **{symbol}** | {pos.trade_type.value.upper()} | " + f"{abs(pos.quantity):.4f} | ${pos.entry_price:,.2f} | " + f"${current_price:,.2f} | ${position_value:,.2f} | " + f"{pnl_emoji} {pnl_sign}${unrealized_pnl:,.2f} |" + ) + + except Exception as e: + logger.warning(f"Failed to get price for {symbol}: {e}") + # Fallback display with entry price only + output.append( + f"| **{symbol}** | {pos.trade_type.value.upper()} | " + f"{abs(pos.quantity):.4f} | ${pos.entry_price:,.2f} | " + f"N/A | ${pos.notional:,.2f} | N/A |" + ) + else: + output.append("\n*No open positions*") + + component_data = FilteredCardPushNotificationComponentData( + title=f"{config.agent_model} Portfolio Status", + data="\n".join(output), + filters=[config.agent_model], + table_title="Portfolio Detail", + create_time=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + ) + return component_data.model_dump_json() + + def _get_session_portfolio_chart_data(self, session_id: str) -> str: + """ + Generate FilteredLineChartComponentData for all instances in a session + + Data format: + [ + ['Time', 'model1', 'model2', 'model3'], + ['2025-10-21 10:00:00', 100000, 50000, 30000], + ['2025-10-21 10:01:00', 100234, 50123, 30045], + ... + ] + + Returns: + JSON string of FilteredLineChartComponentData + """ + if session_id not in self.trading_instances: + return "" + + # Collect portfolio value history from all instances + # Group by timestamp and model + timestamp_data = {} # {timestamp_str: {model_id: value}} + model_ids = [] + + for instance_id, instance in self.trading_instances[session_id].items(): + executor: TradingExecutor = instance["executor"] + config: AutoTradingConfig = instance["config"] + model_id = config.agent_model + + if model_id not in model_ids: + model_ids.append(model_id) + + portfolio_history = executor.get_portfolio_history() + + for snapshot in portfolio_history: + # Format timestamp as string + timestamp_str = snapshot.timestamp.strftime("%Y-%m-%d %H:%M:%S") + + if timestamp_str not in timestamp_data: + timestamp_data[timestamp_str] = {} + + timestamp_data[timestamp_str][model_id] = snapshot.total_value + + if not timestamp_data: + return "" + + # Build data array + # First row: ['Time', 'model1', 'model2', ...] + data_array = [["Time"] + model_ids] + + # Data rows: ['timestamp', value1, value2, ...] + for timestamp_str in sorted(timestamp_data.keys()): + row = [timestamp_str] + for model_id in model_ids: + # Use 0 if no data for this model at this timestamp + value = timestamp_data[timestamp_str].get(model_id, 0) + row.append(value) + data_array.append(row) + + component_data = FilteredLineChartComponentData( + title=f"Portfolio Value History - Session {session_id[:8]}", + data=json.dumps(data_array), + create_time=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + ) + + return component_data.model_dump_json() + + async def _handle_stop_command( + self, session_id: str, query: str + ) -> AsyncGenerator[StreamResponse, None]: + """Handle stop command for trading instances""" + query_lower = query.lower().strip() + + # Check if specific instance_id is provided + instance_id = None + if "instance_id:" in query_lower or "instance:" in query_lower: + # Extract instance_id + parts = query.split(":") + if len(parts) >= 2: + instance_id = parts[1].strip() + + if session_id not in self.trading_instances: + yield streaming.message_chunk( + "⚠️ No active trading instances found in this session.\n" + ) + return + + if instance_id: + # Stop specific instance + if instance_id in self.trading_instances[session_id]: + self.trading_instances[session_id][instance_id]["active"] = False + executor = self.trading_instances[session_id][instance_id]["executor"] + portfolio_value = executor.get_portfolio_value() + + yield streaming.message_chunk( + f"πŸ›‘ **Trading Instance Stopped**\n\n" + f"Instance ID: `{instance_id}`\n" + f"Final Portfolio Value: ${portfolio_value:,.2f}\n" + f"Open Positions: {len(executor.positions)}\n\n" + ) + else: + yield streaming.message_chunk( + f"⚠️ Instance ID '{instance_id}' not found.\n" + ) + else: + # Stop all instances in this session + count = 0 + for inst_id in self.trading_instances[session_id]: + self.trading_instances[session_id][inst_id]["active"] = False + count += 1 + + yield streaming.message_chunk( + f"πŸ›‘ **All Trading Instances Stopped**\n\n" + f"Stopped {count} instance(s) in session: {session_id[:8]}\n\n" + ) + + async def _handle_status_command( + self, session_id: str + ) -> AsyncGenerator[StreamResponse, None]: + """Handle status query command""" + if ( + session_id not in self.trading_instances + or not self.trading_instances[session_id] + ): + yield streaming.message_chunk( + "⚠️ No trading instances found in this session.\n" + ) + return + + status_message = f"πŸ“Š **Session Status** - {session_id[:8]}\n\n" + status_message += ( + f"**Total Instances:** {len(self.trading_instances[session_id])}\n\n" + ) + + for instance_id, instance in self.trading_instances[session_id].items(): + executor: TradingExecutor = instance["executor"] + config: AutoTradingConfig = instance["config"] + + status = "🟒 Active" if instance["active"] else "πŸ”΄ Stopped" + portfolio_value = executor.get_portfolio_value() + total_pnl = portfolio_value - config.initial_capital + + status_message += ( + f"**Instance:** `{instance_id}` {status}\n" + f"- Model: {config.agent_model}\n" + f"- Symbols: {', '.join(config.crypto_symbols)}\n" + f"- Portfolio Value: ${portfolio_value:,.2f}\n" + f"- P&L: ${total_pnl:,.2f}\n" + f"- Open Positions: {len(executor.positions)}\n" + f"- Total Trades: {len(executor.get_trade_history())}\n" + f"- Checks: {instance['check_count']}\n\n" + ) + + yield streaming.message_chunk(status_message) + + # Send session-level portfolio chart + chart_data = self._get_session_portfolio_chart_data(session_id) + if chart_data: + yield streaming.component_generator(chart_data, "line_chart") + + async def stream( + self, + query: str, + session_id: str, + task_id: str, + dependencies: Optional[Dict] = None, + ) -> AsyncGenerator[StreamResponse, None]: + """ + Process trading requests and manage multiple trading instances per session. + + Args: + query: User's natural language query + session_id: Session ID + task_id: Task ID + dependencies: Optional dependencies + + Yields: + StreamResponse: Trading setup, execution updates, and data visualizations + """ + try: + logger.info( + f"Processing auto trading request - session: {session_id}, task: {task_id}" + ) + + query_lower = query.lower().strip() + + # Handle stop commands + if any( + cmd in query_lower for cmd in ["stop", "pause", "halt", "停歒", "ζš‚εœ"] + ): + async for response in self._handle_stop_command(session_id, query): + yield response + return + + # Handle status query commands + if any(cmd in query_lower for cmd in ["status", "summary", "ηŠΆζ€", "ζ‘˜θ¦"]): + async for response in self._handle_status_command(session_id): + yield response + return + + # Parse natural language query to extract trading configuration + yield streaming.message_chunk("πŸ” **Parsing trading request...**\n\n") + + try: + trading_request = await self._parse_trading_request(query) + logger.info(f"Parsed request: {trading_request}") + except Exception as e: + logger.error(f"Failed to parse trading request: {e}") + yield streaming.failed( + "**Parse Error**: Could not parse trading configuration from your query. " + "Please specify cryptocurrency symbols (e.g., 'Trade Bitcoin and Ethereum')." + ) + return + + # Generate unique instance ID + instance_id = self._generate_instance_id(task_id) + + # Create full configuration + config = AutoTradingConfig( + initial_capital=trading_request.initial_capital or 100000, + crypto_symbols=trading_request.crypto_symbols, + use_ai_signals=trading_request.use_ai_signals or False, + agent_model=trading_request.agent_model or DEFAULT_AGENT_MODEL, + ) + + # Initialize executor + executor = TradingExecutor(config) + + # Initialize AI signal generator if enabled + ai_signal_generator = self._initialize_ai_signal_generator(config) + + # Initialize session structure if needed + if session_id not in self.trading_instances: + self.trading_instances[session_id] = {} + + # Store instance + self.trading_instances[session_id][instance_id] = { + "instance_id": instance_id, + "config": config, + "executor": executor, + "ai_signal_generator": ai_signal_generator, + "active": True, + "created_at": datetime.now(), + "check_count": 0, + "last_check": None, + } + + # Display configuration + ai_status = "βœ… Enabled" if config.use_ai_signals else "❌ Disabled" + config_message = ( + f"βœ… **Trading Instance Created**\n\n" + f"**Instance ID:** `{instance_id}`\n" + f"**Session ID:** `{session_id[:8]}`\n" + f"**Active Instances in Session:** {len(self.trading_instances[session_id])}\n\n" + f"**Configuration:**\n" + f"- Trading Symbols: {', '.join(config.crypto_symbols)}\n" + f"- Initial Capital: ${config.initial_capital:,.2f}\n" + f"- Check Interval: {config.check_interval}s (1 minute)\n" + f"- Risk Per Trade: {config.risk_per_trade * 100:.1f}%\n" + f"- Max Positions: {config.max_positions}\n" + f"- Analysis Model: {config.agent_model}\n" + f"- AI Signals: {ai_status}\n\n" + f"πŸš€ **Starting continuous trading...**\n" + f"This instance will run continuously until stopped.\n\n" + ) + + yield streaming.message_chunk(config_message) + + # Get instance reference + instance = self.trading_instances[session_id][instance_id] + + # Send initial portfolio snapshot + portfolio_value = executor.get_portfolio_value() + executor.snapshot_portfolio(datetime.now()) + + initial_portfolio_msg = FilteredCardPushNotificationComponentData( + title=f"{config.agent_model} Portfolio", + data=f"πŸ’° **Initial Portfolio**\nTotal Value: ${portfolio_value:,.2f}\nAvailable Capital: ${executor.current_capital:,.2f}\n", + filters=[config.agent_model], + table_title="Portfolio Detail", + create_time=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + ) + yield streaming.component_generator( + initial_portfolio_msg.model_dump_json(), + ComponentType.FILTERED_CARD_PUSH_NOTIFICATION, + ) + + # Set check interval + check_interval = DEFAULT_CHECK_INTERVAL + + # Main trading loop + yield streaming.message_chunk("πŸ“ˆ **Starting monitoring loop...**\n\n") + + while instance["active"]: + try: + # Update check info + instance["check_count"] += 1 + instance["last_check"] = datetime.now() + check_count = instance["check_count"] + + logger.info( + f"Trading check #{check_count} for instance {instance_id}" + ) + + yield streaming.message_chunk( + f"\n{'=' * 50}\n" + f"πŸ”„ **Check #{check_count}** - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Instance: `{instance_id}`\n" + f"{'=' * 50}\n\n" + ) + + # Phase 1: Collect analysis for all symbols + yield streaming.message_chunk( + "πŸ“Š **Phase 1: Analyzing all assets...**\n\n" + ) + + # Initialize portfolio manager with LLM client for AI-powered decisions + llm_client = None + if ai_signal_generator and ai_signal_generator.llm_client: + llm_client = ai_signal_generator.llm_client + + portfolio_manager = PortfolioDecisionManager(config, llm_client) + + for symbol in config.crypto_symbols: + # Calculate indicators + indicators = TechnicalAnalyzer.calculate_indicators(symbol) + + if indicators is None: + logger.warning(f"Skipping {symbol} - insufficient data") + yield streaming.message_chunk( + f"⚠️ Skipping {symbol} - insufficient data\n\n" + ) + continue + + # Generate technical signal + technical_action, technical_trade_type = ( + TechnicalAnalyzer.generate_signal(indicators) + ) + + # Generate AI signal if enabled + ai_action, ai_trade_type, ai_reasoning, ai_confidence = ( + None, + None, + None, + None, + ) + + if ai_signal_generator: + ai_signal = await ai_signal_generator.get_signal(indicators) + if ai_signal: + ( + ai_action, + ai_trade_type, + ai_reasoning, + ai_confidence, + ) = ai_signal + logger.info( + f"AI signal for {symbol}: {ai_action.value} {ai_trade_type.value} " + f"(confidence: {ai_confidence}%)" + ) + + # Create asset analysis + asset_analysis = AssetAnalysis( + symbol=symbol, + indicators=indicators, + technical_action=technical_action, + technical_trade_type=technical_trade_type, + ai_action=ai_action, + ai_trade_type=ai_trade_type, + ai_reasoning=ai_reasoning, + ai_confidence=ai_confidence, + ) + + # Add to portfolio manager + portfolio_manager.add_asset_analysis(asset_analysis) + + # Display individual asset analysis + yield streaming.message_chunk( + MessageFormatter.format_market_analysis_notification( + symbol, + indicators, + asset_analysis.recommended_action, + asset_analysis.recommended_trade_type, + executor.positions, + ai_reasoning, + ) + ) + + # Phase 2: Make portfolio-level decision + yield streaming.message_chunk( + "\n" + "=" * 50 + "\n" + "🎯 **Phase 2: Portfolio Decision Making...**\n" + + "=" * 50 + + "\n\n" + ) + + # Get portfolio summary + portfolio_summary = portfolio_manager.get_portfolio_summary() + yield streaming.message_chunk(portfolio_summary + "\n") + + # Make coordinated decision (async call for AI analysis) + portfolio_decision = ( + await portfolio_manager.make_portfolio_decision( + current_positions=executor.positions, + available_cash=executor.get_current_capital(), + total_portfolio_value=executor.get_portfolio_value(), + ) + ) + + # Display decision reasoning + portfolio_decision_msg = FilteredCardPushNotificationComponentData( + title=f"{config.agent_model} Analysis", + data=f"πŸ’° **Portfolio Decision Reasoning**\n{portfolio_decision.reasoning}\n", + filters=[config.agent_model], + table_title="Market Analysis", + create_time=datetime.now(timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ), + ) + yield streaming.component_generator( + portfolio_decision_msg.model_dump_json(), + ComponentType.FILTERED_CARD_PUSH_NOTIFICATION, + ) + + # Phase 3: Execute approved trades + if portfolio_decision.trades_to_execute: + yield streaming.message_chunk( + "\n" + "=" * 50 + "\n" + f"⚑ **Phase 3: Executing {len(portfolio_decision.trades_to_execute)} trade(s)...**\n" + + "=" * 50 + + "\n\n" + ) + + for ( + symbol, + action, + trade_type, + ) in portfolio_decision.trades_to_execute: + # Get indicators for this symbol + asset_analysis = portfolio_manager.asset_analyses.get( + symbol + ) + if not asset_analysis: + continue + + # Execute trade + trade_details = executor.execute_trade( + symbol, action, trade_type, asset_analysis.indicators + ) + + if trade_details: + # Send trade notification + trade_message_text = ( + MessageFormatter.format_trade_notification( + trade_details, config.agent_model + ) + ) + trade_message = FilteredCardPushNotificationComponentData( + title=f"{config.agent_model} Trade", + data=f"πŸ’° **Trade Executed:**\n{trade_message_text}\n", + filters=[config.agent_model], + table_title="Trade Detail", + create_time=datetime.now(timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ), + ) + yield streaming.component_generator( + trade_message.model_dump_json(), + ComponentType.FILTERED_CARD_PUSH_NOTIFICATION, + ) + else: + trade_message = FilteredCardPushNotificationComponentData( + title=f"{config.agent_model} Trade", + data=f"πŸ’° **Trade Failed:** Could not execute {action.value} " + f"{trade_type.value} on {symbol}\n", + filters=[config.agent_model], + table_title="Trade Detail", + create_time=datetime.now(timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ), + ) + yield streaming.component_generator( + trade_message.model_dump_json(), + ComponentType.FILTERED_CARD_PUSH_NOTIFICATION, + ) + + # Take snapshots + timestamp = datetime.now() + executor.snapshot_positions(timestamp) + executor.snapshot_portfolio(timestamp) + + # Send portfolio update + portfolio_value = executor.get_portfolio_value() + total_pnl = portfolio_value - config.initial_capital + + portfolio_msg = ( + f"πŸ’° **Portfolio Update**\n" + f"Time: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Total Value: ${portfolio_value:,.2f}\n" + f"P&L: ${total_pnl:,.2f}\n" + f"Open Positions: {len(executor.positions)}\n" + f"Available Capital: ${executor.current_capital:,.2f}\n" + ) + + if executor.positions: + portfolio_msg += "\n**Open Positions:**\n" + for symbol, pos in executor.positions.items(): + try: + import yfinance as yf + + ticker = yf.Ticker(symbol) + current_price = ticker.history( + period="1d", interval="1m" + )["Close"].iloc[-1] + if pos.trade_type.value == "long": + current_pnl = ( + current_price - pos.entry_price + ) * abs(pos.quantity) + else: + current_pnl = ( + pos.entry_price - current_price + ) * abs(pos.quantity) + pnl_emoji = "🟒" if current_pnl >= 0 else "πŸ”΄" + portfolio_msg += f"- {symbol}: {pos.trade_type.value.upper()} @ ${pos.entry_price:,.2f} {pnl_emoji} P&L: ${current_pnl:,.2f}\n" + except Exception as e: + logger.warning( + f"Failed to calculate P&L for {symbol}: {e}" + ) + portfolio_msg += f"- {symbol}: {pos.trade_type.value.upper()} @ ${pos.entry_price:,.2f}\n" + + yield streaming.message_chunk(portfolio_msg + "\n") + + component_data = self._get_instance_status_component_data( + session_id, instance_id + ) + if component_data: + yield streaming.component_generator( + component_data, + ComponentType.FILTERED_CARD_PUSH_NOTIFICATION, + ) + + chart_data = self._get_session_portfolio_chart_data(session_id) + if chart_data: + yield streaming.component_generator( + chart_data, ComponentType.FILTERED_LINE_CHART + ) + + # Wait for next check interval + logger.info(f"Waiting {check_interval}s until next check...") + yield streaming.message_chunk( + f"⏳ Waiting {check_interval} seconds until next check...\n\n" + ) + await asyncio.sleep(check_interval) + + except Exception as e: + logger.error(f"Error during trading cycle: {e}") + yield streaming.message_chunk( + f"⚠️ **Error during trading cycle**: {str(e)}\n" + f"Continuing with next check...\n\n" + ) + await asyncio.sleep(check_interval) + + except Exception as e: + logger.error(f"Critical error in stream method: {e}") + yield streaming.failed(f"Critical error: {str(e)}") + finally: + # Mark instance as inactive but keep data for history + if session_id in self.trading_instances: + if instance_id in self.trading_instances[session_id]: + self.trading_instances[session_id][instance_id]["active"] = False + logger.info(f"Stopped instance: {instance_id}") diff --git a/python/valuecell/agents/auto_trading_agent/constants.py b/python/valuecell/agents/auto_trading_agent/constants.py new file mode 100644 index 000000000..0a32bee0a --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/constants.py @@ -0,0 +1,11 @@ +"""Constants for auto trading agent""" + +# Limits +MAX_SYMBOLS = 10 +DEFAULT_CHECK_INTERVAL = 60 # 1 minute in seconds + +# Default configuration values +DEFAULT_INITIAL_CAPITAL = 100000 +DEFAULT_RISK_PER_TRADE = 0.02 +DEFAULT_MAX_POSITIONS = 3 +DEFAULT_AGENT_MODEL = "deepseek/deepseek-v3.1-terminus" diff --git a/python/valuecell/agents/auto_trading_agent/exchanges/__init__.py b/python/valuecell/agents/auto_trading_agent/exchanges/__init__.py new file mode 100644 index 000000000..b2df842f9 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/exchanges/__init__.py @@ -0,0 +1,20 @@ +"""Exchange adapters for different trading platforms + +This module provides adapters for various cryptocurrency exchanges, +allowing the AutoTradingAgent to trade on both paper (simulated) and live (real) exchanges. + +Adapters: +- ExchangeBase: Abstract base class defining the exchange interface +- PaperTrading: Simulated trading (default) +- BinanceExchange: Live trading on Binance (requires API keys) +""" + +from .base_exchange import ExchangeBase, ExchangeType, OrderStatus +from .paper_trading import PaperTrading + +__all__ = [ + "ExchangeBase", + "ExchangeType", + "OrderStatus", + "PaperTrading", +] diff --git a/python/valuecell/agents/auto_trading_agent/exchanges/base_exchange.py b/python/valuecell/agents/auto_trading_agent/exchanges/base_exchange.py new file mode 100644 index 000000000..4346f2393 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/exchanges/base_exchange.py @@ -0,0 +1,418 @@ +"""Abstract base class for exchange adapters""" + +import logging +from abc import ABC, abstractmethod +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + +from ..models import TradeType + +logger = logging.getLogger(__name__) + + +class ExchangeType(str, Enum): + """Supported exchange types""" + + PAPER = "paper" # Simulated trading + BINANCE = "binance" # Binance exchange + BYBIT = "bybit" # Bybit exchange (future support) + COINBASE = "coinbase" # Coinbase (future support) + + +class OrderStatus(str, Enum): + """Order execution status""" + + PENDING = "pending" + PARTIALLY_FILLED = "partially_filled" + FILLED = "filled" + CANCELLED = "cancelled" + REJECTED = "rejected" + EXPIRED = "expired" + + +class Order: + """Represents a single order""" + + def __init__( + self, + order_id: str, + symbol: str, + side: str, # "buy" or "sell" + quantity: float, + price: float, + order_type: str = "limit", # "limit", "market", etc. + trade_type: Optional[TradeType] = None, + ): + self.order_id = order_id + self.symbol = symbol + self.side = side + self.quantity = quantity + self.price = price + self.order_type = order_type + self.trade_type = trade_type + self.status = OrderStatus.PENDING + self.filled_quantity = 0.0 + self.filled_price = 0.0 + self.created_at = datetime.now() + self.updated_at = datetime.now() + + def to_dict(self) -> Dict[str, Any]: + """Convert order to dictionary""" + return { + "order_id": self.order_id, + "symbol": self.symbol, + "side": self.side, + "quantity": self.quantity, + "price": self.price, + "order_type": self.order_type, + "status": self.status.value, + "filled_quantity": self.filled_quantity, + "filled_price": self.filled_price, + "created_at": self.created_at.isoformat(), + } + + +class ExchangeBase(ABC): + """ + Abstract base class for exchange adapters. + + All exchange implementations (Binance, Bybit, etc.) must inherit from this + class and implement all abstract methods. + """ + + def __init__(self, exchange_type: ExchangeType): + """ + Initialize exchange adapter. + + Args: + exchange_type: Type of exchange (PAPER, BINANCE, etc.) + """ + self.exchange_type = exchange_type + self.is_connected = False + self.orders: Dict[str, Order] = {} + self.order_history: List[Order] = [] + + # ============ Connection Management ============ + + @abstractmethod + async def connect(self) -> bool: + """ + Connect to exchange (authenticate, validate credentials). + + Returns: + True if connection successful + """ + pass + + @abstractmethod + async def disconnect(self) -> bool: + """ + Disconnect from exchange gracefully. + + Returns: + True if disconnection successful + """ + pass + + @abstractmethod + async def validate_connection(self) -> bool: + """ + Validate that connection is still active and valid. + + Returns: + True if connection is valid + """ + pass + + # ============ Account Information ============ + + @abstractmethod + async def get_balance(self) -> Dict[str, float]: + """ + Get account balances across all assets. + + Returns: + Dictionary mapping asset symbols to balances + Example: {"USDT": 100000, "BTC": 1.5} + """ + pass + + @abstractmethod + async def get_asset_balance(self, asset: str) -> float: + """ + Get balance for a specific asset. + + Args: + asset: Asset symbol (e.g., "USDT", "BTC") + + Returns: + Available balance + """ + pass + + # ============ Market Data ============ + + @abstractmethod + async def get_current_price(self, symbol: str) -> float: + """ + Get current market price for a symbol. + + Args: + symbol: Trading symbol (e.g., "BTCUSDT") + + Returns: + Current price + """ + pass + + @abstractmethod + async def get_24h_ticker(self, symbol: str) -> Dict[str, Any]: + """ + Get 24-hour ticker data. + + Args: + symbol: Trading symbol + + Returns: + Dictionary with price, volume, change data + """ + pass + + # ============ Order Management ============ + + @abstractmethod + async def place_order( + self, + symbol: str, + side: str, + quantity: float, + price: Optional[float] = None, + order_type: str = "limit", + **kwargs, + ) -> Order: + """ + Place a new order. + + Args: + symbol: Trading symbol (e.g., "BTCUSDT") + side: "buy" or "sell" + quantity: Order quantity + price: Order price (None for market orders) + order_type: "limit" or "market" + **kwargs: Exchange-specific parameters + + Returns: + Order object with order_id + """ + pass + + @abstractmethod + async def cancel_order(self, symbol: str, order_id: str) -> bool: + """ + Cancel an open order. + + Args: + symbol: Trading symbol + order_id: Order ID to cancel + + Returns: + True if cancellation successful + """ + pass + + @abstractmethod + async def get_order_status(self, symbol: str, order_id: str) -> OrderStatus: + """ + Get status of a specific order. + + Args: + symbol: Trading symbol + order_id: Order ID + + Returns: + Order status + """ + pass + + @abstractmethod + async def get_open_orders(self, symbol: Optional[str] = None) -> List[Order]: + """ + Get all open orders. + + Args: + symbol: Optional symbol to filter by + + Returns: + List of open Order objects + """ + pass + + @abstractmethod + async def get_order_history( + self, symbol: Optional[str] = None, limit: int = 100 + ) -> List[Order]: + """ + Get order history. + + Args: + symbol: Optional symbol to filter by + limit: Maximum number of orders to return + + Returns: + List of Order objects + """ + pass + + # ============ Position Management ============ + + @abstractmethod + async def get_open_positions( + self, symbol: Optional[str] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Get all open positions. + + Args: + symbol: Optional symbol to filter by + + Returns: + Dictionary with position details + Example: { + "BTC": { + "quantity": 1.5, + "entry_price": 45000, + "current_price": 46000, + "unrealized_pnl": 1500 + } + } + """ + pass + + @abstractmethod + async def get_position_details(self, symbol: str) -> Optional[Dict[str, Any]]: + """ + Get details for a specific position. + + Args: + symbol: Trading symbol + + Returns: + Position details or None if no position + """ + pass + + # ============ Trade Execution ============ + + @abstractmethod + async def execute_buy( + self, + symbol: str, + quantity: float, + price: Optional[float] = None, + **kwargs, + ) -> Optional[Order]: + """ + Execute a buy order. + + Args: + symbol: Trading symbol + quantity: Amount to buy + price: Price (None for market order) + **kwargs: Exchange-specific parameters + + Returns: + Order object or None if execution failed + """ + pass + + @abstractmethod + async def execute_sell( + self, + symbol: str, + quantity: float, + price: Optional[float] = None, + **kwargs, + ) -> Optional[Order]: + """ + Execute a sell order. + + Args: + symbol: Trading symbol + quantity: Amount to sell + price: Price (None for market order) + **kwargs: Exchange-specific parameters + + Returns: + Order object or None if execution failed + """ + pass + + # ============ Utilities ============ + + @abstractmethod + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to exchange format. + + Args: + symbol: Symbol to normalize (e.g., "BTC-USD") + + Returns: + Exchange-formatted symbol (e.g., "BTCUSDT" for Binance) + """ + pass + + @abstractmethod + async def get_fee_tier(self) -> Dict[str, float]: + """ + Get current trading fee tier. + + Returns: + Dictionary with maker/taker fees + """ + pass + + @abstractmethod + async def get_trading_limits(self, symbol: str) -> Dict[str, float]: + """ + Get trading limits for a symbol. + + Args: + symbol: Trading symbol + + Returns: + Dictionary with min/max quantities, precision, etc. + """ + pass + + # ============ Error Handling ============ + + async def handle_order_rejection(self, order: Order, reason: str) -> bool: + """ + Handle order rejection (cleanup, logging, etc.). + + Args: + order: Rejected order + reason: Rejection reason + + Returns: + True if handled successfully + """ + logger.warning(f"Order {order.order_id} rejected: {reason}") + order.status = OrderStatus.REJECTED + return True + + async def handle_connection_error(self, error: Exception) -> bool: + """ + Handle connection errors. + + Args: + error: Connection error + + Returns: + True if handled, False if critical + """ + logger.error(f"Connection error: {error}") + self.is_connected = False + return False diff --git a/python/valuecell/agents/auto_trading_agent/exchanges/binance_exchange.py b/python/valuecell/agents/auto_trading_agent/exchanges/binance_exchange.py new file mode 100644 index 000000000..548899a95 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/exchanges/binance_exchange.py @@ -0,0 +1,533 @@ +"""Binance exchange adapter for live trading + +This adapter connects to Binance API for real trading on live accounts. +Requires: API key and secret from Binance account settings. + +WARNING: Real money trading - handle with care! +""" + +import logging +from typing import Any, Dict, List, Optional + +from .base_exchange import ExchangeBase, ExchangeType, Order, OrderStatus + +logger = logging.getLogger(__name__) + + +class BinanceExchange(ExchangeBase): + """ + Binance exchange adapter for live trading. + + Features (TODO - Future Implementation): + - Connect to Binance API + - Execute real trades + - Monitor real-time positions + - Handle Binance-specific errors + - Support spot and margin trading + + WARNING: This implementation is for architecture design only. + Real implementation requires proper error handling, rate limiting, and security measures. + """ + + def __init__(self, api_key: str, api_secret: str, testnet: bool = False): + """ + Initialize Binance exchange adapter. + + Args: + api_key: Binance API key + api_secret: Binance API secret + testnet: Use testnet for testing (default: False) + + Note: + - testnet=True connects to https://testnet.binance.vision (for testing) + - testnet=False connects to https://api.binance.com (real trading!) + """ + super().__init__(ExchangeType.BINANCE) + self.api_key = api_key + self.api_secret = api_secret + self.testnet = testnet + + # TODO: Initialize Binance client + # self.client = BinanceClientAsync(api_key, api_secret) + # if testnet: + # self.client.API_URL = "https://testnet.binance.vision" + + logger.warning( + f"BinanceExchange initialized in {'TESTNET' if testnet else 'LIVE'} mode. " + "TODO: Implement real API connections." + ) + + # ============ Connection Management ============ + + async def connect(self) -> bool: + """ + Connect to Binance API. + + TODO: Implementation + - Validate API credentials + - Check API rate limits + - Verify account status + + Returns: + True if connection successful + """ + logger.info("[TODO] Connecting to Binance API...") + # self.is_connected = await self.client.ping() + self.is_connected = True + return self.is_connected + + async def disconnect(self) -> bool: + """ + Disconnect from Binance API gracefully. + + TODO: Implementation + - Close websocket connections + - Clean up resources + + Returns: + True if disconnection successful + """ + logger.info("[TODO] Disconnecting from Binance API...") + self.is_connected = False + return True + + async def validate_connection(self) -> bool: + """ + Validate that connection is still active. + + TODO: Implementation + - Ping Binance API + - Check if credentials are still valid + + Returns: + True if connection is valid + """ + logger.info("[TODO] Validating Binance connection...") + return self.is_connected + + # ============ Account Information ============ + + async def get_balance(self) -> Dict[str, float]: + """ + Get account balances from Binance. + + TODO: Implementation + - Fetch account info from Binance + - Parse balances for each asset + - Filter out zero balances + + Returns: + Dictionary mapping asset -> balance + Example: {"USDT": 100000.0, "BTC": 1.5} + """ + logger.info("[TODO] Fetching balances from Binance...") + return {"USDT": 100000.0} # Placeholder + + async def get_asset_balance(self, asset: str) -> float: + """ + Get balance for a specific asset. + + TODO: Implementation + - Query Binance for specific asset + - Return available balance + + Args: + asset: Asset symbol (e.g., "USDT", "BTC") + + Returns: + Available balance + """ + logger.info(f"[TODO] Fetching {asset} balance from Binance...") + return 0.0 # Placeholder + + # ============ Market Data ============ + + async def get_current_price(self, symbol: str) -> float: + """ + Get current market price from Binance. + + TODO: Implementation + - Query latest price from Binance + - Handle rate limits + - Cache results if needed + + Args: + symbol: Trading symbol (e.g., "BTCUSDT") + + Returns: + Current price + """ + logger.info(f"[TODO] Fetching price for {symbol} from Binance...") + return 0.0 # Placeholder + + async def get_24h_ticker(self, symbol: str) -> Dict[str, Any]: + """ + Get 24-hour ticker data from Binance. + + TODO: Implementation + - Query Binance 24h ticker + - Parse response + - Calculate changes + + Args: + symbol: Trading symbol + + Returns: + Ticker data dictionary + """ + logger.info(f"[TODO] Fetching 24h ticker for {symbol} from Binance...") + return {} # Placeholder + + # ============ Order Management ============ + + async def place_order( + self, + symbol: str, + side: str, + quantity: float, + price: Optional[float] = None, + order_type: str = "limit", + **kwargs, + ) -> Order: + """ + Place an order on Binance. + + TODO: Implementation + - Validate parameters + - Send order to Binance + - Handle order confirmation + - Return Order object with order_id from Binance + + Args: + symbol: Trading symbol (e.g., "BTCUSDT") + side: "buy" or "sell" + quantity: Order quantity + price: Limit price (None for market orders) + order_type: "limit" or "market" + **kwargs: Binance-specific parameters + + Returns: + Order object with Binance order_id + """ + logger.info( + f"[TODO] Placing {order_type} order on Binance: " + f"{side} {quantity} {symbol} @ ${price or 'market'}" + ) + + # This is a placeholder - real implementation would: + # response = await self.client.create_order( + # symbol=symbol, + # side=side.upper(), + # type=order_type.upper(), + # quantity=quantity, + # price=price, + # ) + # return Order(order_id=response['orderId'], ...) + + return Order( + order_id="binance_placeholder", + symbol=symbol, + side=side, + quantity=quantity, + price=price or 0.0, + order_type=order_type, + ) + + async def cancel_order(self, symbol: str, order_id: str) -> bool: + """ + Cancel an order on Binance. + + TODO: Implementation + - Send cancel request to Binance + - Verify cancellation + - Handle errors + + Args: + symbol: Trading symbol + order_id: Binance order ID + + Returns: + True if cancellation successful + """ + logger.info(f"[TODO] Cancelling order {order_id} on Binance...") + return False # Placeholder + + async def get_order_status(self, symbol: str, order_id: str) -> OrderStatus: + """ + Get order status from Binance. + + TODO: Implementation + - Query Binance for order status + - Map Binance status to OrderStatus enum + + Args: + symbol: Trading symbol + order_id: Binance order ID + + Returns: + Order status + """ + logger.info(f"[TODO] Fetching status for order {order_id} from Binance...") + return OrderStatus.PENDING # Placeholder + + async def get_open_orders(self, symbol: Optional[str] = None) -> List[Order]: + """ + Get open orders from Binance. + + TODO: Implementation + - Query Binance for open orders + - Parse each order into Order objects + - Filter by symbol if provided + + Args: + symbol: Optional symbol filter + + Returns: + List of open Order objects + """ + logger.info(f"[TODO] Fetching open orders from Binance (symbol={symbol})...") + return [] # Placeholder + + async def get_order_history( + self, symbol: Optional[str] = None, limit: int = 100 + ) -> List[Order]: + """ + Get order history from Binance. + + TODO: Implementation + - Query Binance for closed orders + - Parse into Order objects + - Respect limit parameter + + Args: + symbol: Optional symbol filter + limit: Maximum orders to return + + Returns: + List of Order objects + """ + logger.info( + f"[TODO] Fetching order history from Binance " + f"(symbol={symbol}, limit={limit})..." + ) + return [] # Placeholder + + # ============ Position Management ============ + + async def get_open_positions( + self, symbol: Optional[str] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Get open positions from Binance account. + + TODO: Implementation + - Query account balances + - Filter non-zero balances (excluding USDT) + - Calculate current price for each + - Calculate unrealized P&L + + Args: + symbol: Optional symbol filter + + Returns: + Dictionary of positions with details + """ + logger.info(f"[TODO] Fetching open positions from Binance (symbol={symbol})...") + return {} # Placeholder + + async def get_position_details(self, symbol: str) -> Optional[Dict[str, Any]]: + """ + Get details for a specific position. + + TODO: Implementation + - Query position data + - Calculate current value + - Calculate unrealized P&L + + Args: + symbol: Trading symbol + + Returns: + Position details or None + """ + logger.info(f"[TODO] Fetching position details for {symbol} from Binance...") + return None # Placeholder + + # ============ Trade Execution ============ + + async def execute_buy( + self, + symbol: str, + quantity: float, + price: Optional[float] = None, + **kwargs, + ) -> Optional[Order]: + """ + Execute a buy order on Binance. + + TODO: Implementation + - Check balance + - Place market or limit order + - Monitor fill status + - Return filled Order + + Args: + symbol: Trading symbol + quantity: Amount to buy + price: Price (None for market order) + **kwargs: Additional parameters + + Returns: + Filled Order or None if failed + """ + logger.info( + f"[TODO] Executing BUY on Binance: {quantity} {symbol} @ ${price or 'market'}" + ) + return None # Placeholder + + async def execute_sell( + self, + symbol: str, + quantity: float, + price: Optional[float] = None, + **kwargs, + ) -> Optional[Order]: + """ + Execute a sell order on Binance. + + TODO: Implementation + - Check position exists + - Place market or limit order + - Monitor fill status + - Calculate P&L + - Return filled Order + + Args: + symbol: Trading symbol + quantity: Amount to sell + price: Price (None for market order) + **kwargs: Additional parameters + + Returns: + Filled Order or None if failed + """ + logger.info( + f"[TODO] Executing SELL on Binance: {quantity} {symbol} @ ${price or 'market'}" + ) + return None # Placeholder + + # ============ Utilities ============ + + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to Binance format. + + Args: + symbol: Original symbol (e.g., "BTC-USD") + + Returns: + Binance format (e.g., "BTCUSDT") + """ + return symbol.replace("-USD", "USDT").replace("-USDT", "USDT") + + async def get_fee_tier(self) -> Dict[str, float]: + """ + Get current trading fee tier from Binance. + + TODO: Implementation + - Query user trading fees + - Handle VIP tiers + - Return maker/taker fees + + Returns: + Fee dictionary with maker/taker fees + """ + logger.info("[TODO] Fetching fee tier from Binance...") + # Default Binance fees + return {"maker": 0.001, "taker": 0.001} + + async def get_trading_limits(self, symbol: str) -> Dict[str, float]: + """ + Get trading limits for a symbol on Binance. + + TODO: Implementation + - Query symbol filters + - Parse lot size filter + - Parse min notional filter + - Return all limits + + Args: + symbol: Trading symbol + + Returns: + Dictionary with trading limits + """ + logger.info(f"[TODO] Fetching trading limits for {symbol} from Binance...") + return { + "min_quantity": 0.0001, + "max_quantity": 1000000, + "quantity_precision": 8, + "min_notional": 10.0, + } + + # ============ WebSocket Subscriptions (Future) ============ + + async def subscribe_to_ticker(self, symbol: str, callback) -> bool: + """ + Subscribe to real-time ticker updates via WebSocket. + + TODO: Future implementation + - Connect to Binance WebSocket + - Subscribe to ticker stream + - Call callback on each update + - Handle reconnection + + Args: + symbol: Trading symbol + callback: Callback function for updates + + Returns: + True if subscription successful + """ + logger.info(f"[TODO] Subscribing to ticker updates for {symbol}...") + return False + + async def subscribe_to_trades(self, symbol: str, callback) -> bool: + """ + Subscribe to real-time trade updates via WebSocket. + + TODO: Future implementation + - Connect to Binance WebSocket + - Subscribe to trades stream + - Call callback on each trade + + Args: + symbol: Trading symbol + callback: Callback function for updates + + Returns: + True if subscription successful + """ + logger.info(f"[TODO] Subscribing to trade updates for {symbol}...") + return False + + # ============ Error Handling ============ + + async def handle_api_error(self, error: Dict[str, Any]) -> bool: + """ + Handle API errors from Binance. + + TODO: Implementation + - Parse Binance error codes + - Determine severity (warning vs critical) + - Log appropriately + - Take corrective action if needed + + Args: + error: Error response from Binance + + Returns: + True if error was handled, False if critical + """ + logger.error(f"[TODO] Handling Binance API error: {error}") + return False diff --git a/python/valuecell/agents/auto_trading_agent/exchanges/paper_trading.py b/python/valuecell/agents/auto_trading_agent/exchanges/paper_trading.py new file mode 100644 index 000000000..5ebc92c06 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/exchanges/paper_trading.py @@ -0,0 +1,501 @@ +"""Paper trading (simulated) exchange adapter""" + +import logging +import uuid +from typing import Any, Dict, List, Optional + +import yfinance as yf + +from .base_exchange import ExchangeBase, ExchangeType, Order, OrderStatus + +logger = logging.getLogger(__name__) + + +class PaperTrading(ExchangeBase): + """ + Simulated trading on paper (no real money, no real orders). + + Used for backtesting and strategy development without risking real capital. + """ + + def __init__(self, initial_balance: float = 100000.0): + """ + Initialize paper trading exchange. + + Args: + initial_balance: Starting capital for simulated trading + """ + super().__init__(ExchangeType.PAPER) + self.initial_balance = initial_balance + self.balance = initial_balance + self.positions: Dict[str, Dict[str, Any]] = {} # {symbol: position_data} + self.is_connected = True + + # ============ Connection Management ============ + + async def connect(self) -> bool: + """Paper trading is always connected""" + self.is_connected = True + logger.info("Paper trading connected (simulated)") + return True + + async def disconnect(self) -> bool: + """Disconnect paper trading""" + self.is_connected = False + logger.info("Paper trading disconnected") + return True + + async def validate_connection(self) -> bool: + """Paper trading is always valid""" + return self.is_connected + + # ============ Account Information ============ + + async def get_balance(self) -> Dict[str, float]: + """ + Get simulated account balances. + + Returns: + Dictionary with USDT and other assets + """ + balances = {"USDT": self.balance} + # Add positions as assets + for symbol, pos_data in self.positions.items(): + asset = symbol.replace("USDT", "") + balances[asset] = pos_data["quantity"] + return balances + + async def get_asset_balance(self, asset: str) -> float: + """ + Get balance for a specific asset. + + Args: + asset: Asset symbol + + Returns: + Available balance + """ + if asset == "USDT": + return self.balance + + # Check if we have a position + for symbol, pos_data in self.positions.items(): + if symbol.startswith(asset): + return pos_data["quantity"] + + return 0.0 + + # ============ Market Data ============ + + async def get_current_price(self, symbol: str) -> float: + """ + Get current simulated price from yfinance. + + Args: + symbol: Trading symbol in exchange format + + Returns: + Current price + """ + try: + # Convert exchange format back to ticker format + ticker_symbol = self._denormalize_symbol(symbol) + ticker = yf.Ticker(ticker_symbol) + data = ticker.history(period="1d", interval="1m") + if data.empty: + logger.warning(f"No price data for {symbol}") + return 0.0 + return float(data["Close"].iloc[-1]) + except Exception as e: + logger.error(f"Failed to get price for {symbol}: {e}") + return 0.0 + + async def get_24h_ticker(self, symbol: str) -> Dict[str, Any]: + """ + Get 24-hour ticker data. + + Args: + symbol: Trading symbol + + Returns: + Ticker data dictionary + """ + try: + ticker_symbol = self._denormalize_symbol(symbol) + ticker = yf.Ticker(ticker_symbol) + data = ticker.history(period="1d", interval="1h") + + if data.empty: + return {} + + return { + "symbol": symbol, + "current_price": float(data["Close"].iloc[-1]), + "24h_high": float(data["High"].iloc[-24:].max()), + "24h_low": float(data["Low"].iloc[-24:].min()), + "24h_volume": float(data["Volume"].iloc[-24:].sum()), + "24h_change": float( + (data["Close"].iloc[-1] - data["Close"].iloc[0]) + / data["Close"].iloc[0] + * 100 + ), + } + except Exception as e: + logger.error(f"Failed to get 24h ticker for {symbol}: {e}") + return {} + + # ============ Order Management ============ + + async def place_order( + self, + symbol: str, + side: str, + quantity: float, + price: Optional[float] = None, + order_type: str = "limit", + **kwargs, + ) -> Order: + """ + Place a simulated order. + + Args: + symbol: Trading symbol + side: "buy" or "sell" + quantity: Order quantity + price: Order price (None for market) + order_type: "limit" or "market" + **kwargs: Additional parameters + + Returns: + Order object + """ + order_id = str(uuid.uuid4())[:8] + + # Get current price if market order + if price is None or order_type == "market": + price = await self.get_current_price(symbol) + + order = Order( + order_id=order_id, + symbol=symbol, + side=side.lower(), + quantity=quantity, + price=price, + order_type=order_type, + ) + + # Immediately fill market orders + if order_type == "market": + await self._fill_order(order) + + self.orders[order_id] = order + logger.info( + f"Order placed: {order_id} - {side} {quantity} {symbol} @ ${price:.2f}" + ) + return order + + async def cancel_order(self, symbol: str, order_id: str) -> bool: + """ + Cancel an order (for paper trading, just mark as cancelled). + + Args: + symbol: Trading symbol + order_id: Order ID to cancel + + Returns: + True if successful + """ + if order_id in self.orders: + self.orders[order_id].status = OrderStatus.CANCELLED + logger.info(f"Order cancelled: {order_id}") + return True + return False + + async def get_order_status(self, symbol: str, order_id: str) -> OrderStatus: + """ + Get order status. + + Args: + symbol: Trading symbol + order_id: Order ID + + Returns: + Order status + """ + if order_id in self.orders: + return self.orders[order_id].status + return OrderStatus.EXPIRED + + async def get_open_orders(self, symbol: Optional[str] = None) -> List[Order]: + """ + Get open orders. + + Args: + symbol: Optional symbol filter + + Returns: + List of open orders + """ + open_orders = [ + o + for o in self.orders.values() + if o.status in [OrderStatus.PENDING, OrderStatus.PARTIALLY_FILLED] + ] + if symbol: + open_orders = [o for o in open_orders if o.symbol == symbol] + return open_orders + + async def get_order_history( + self, symbol: Optional[str] = None, limit: int = 100 + ) -> List[Order]: + """ + Get order history. + + Args: + symbol: Optional symbol filter + limit: Max orders to return + + Returns: + Order history + """ + history = self.order_history + if symbol: + history = [o for o in history if o.symbol == symbol] + return history[-limit:] + + # ============ Position Management ============ + + async def get_open_positions( + self, symbol: Optional[str] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Get open positions. + + Args: + symbol: Optional symbol filter + + Returns: + Dictionary of positions + """ + if symbol: + if symbol in self.positions: + return {symbol: self.positions[symbol]} + return {} + return self.positions.copy() + + async def get_position_details(self, symbol: str) -> Optional[Dict[str, Any]]: + """ + Get details for a specific position. + + Args: + symbol: Trading symbol + + Returns: + Position details or None + """ + return self.positions.get(symbol) + + # ============ Trade Execution ============ + + async def execute_buy( + self, + symbol: str, + quantity: float, + price: Optional[float] = None, + **kwargs, + ) -> Optional[Order]: + """ + Execute a buy order. + + Args: + symbol: Trading symbol + quantity: Amount to buy + price: Price (None for market) + **kwargs: Additional parameters + + Returns: + Order or None + """ + # Get price + if price is None: + price = await self.get_current_price(symbol) + + notional = quantity * price + + # Check balance + if notional > self.balance: + logger.warning( + f"Insufficient balance for buy: need ${notional:.2f}, have ${self.balance:.2f}" + ) + return None + + # Place and fill order + order = await self.place_order(symbol, "buy", quantity, price, "market") + return order + + async def execute_sell( + self, + symbol: str, + quantity: float, + price: Optional[float] = None, + **kwargs, + ) -> Optional[Order]: + """ + Execute a sell order. + + Args: + symbol: Trading symbol + quantity: Amount to sell + price: Price (None for market) + **kwargs: Additional parameters + + Returns: + Order or None + """ + # Check if we have the position + if symbol not in self.positions: + logger.warning(f"No position to sell for {symbol}") + return None + + if self.positions[symbol]["quantity"] < quantity: + logger.warning( + f"Insufficient position: have {self.positions[symbol]['quantity']}, " + f"trying to sell {quantity}" + ) + return None + + # Get price + if price is None: + price = await self.get_current_price(symbol) + + # Place and fill order + order = await self.place_order(symbol, "sell", quantity, price, "market") + return order + + # ============ Utilities ============ + + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to paper trading format. + + Args: + symbol: Original symbol (e.g., "BTC-USD") + + Returns: + Normalized symbol (e.g., "BTCUSDT") + """ + return symbol.replace("-USD", "USDT").replace("-USDT", "USDT") + + def _denormalize_symbol(self, symbol: str) -> str: + """ + Convert from exchange format back to yfinance format. + + Args: + symbol: Exchange format (e.g., "BTCUSDT") + + Returns: + yfinance format (e.g., "BTC-USD") + """ + return symbol.replace("USDT", "-USD") + + async def get_fee_tier(self) -> Dict[str, float]: + """ + Paper trading has no fees. + + Returns: + Fee dictionary + """ + return {"maker": 0.0, "taker": 0.0} + + async def get_trading_limits(self, symbol: str) -> Dict[str, float]: + """ + Get trading limits (paper trading has no limits). + + Args: + symbol: Trading symbol + + Returns: + Limits dictionary + """ + return { + "min_quantity": 0.0001, + "max_quantity": 1000000, + "quantity_precision": 8, + "min_notional": 1.0, + } + + # ============ Private Methods ============ + + async def _fill_order(self, order: Order) -> bool: + """ + Fill an order (update balance, positions). + + Args: + order: Order to fill + + Returns: + True if filled successfully + """ + try: + if order.side == "buy": + notional = order.quantity * order.price + self.balance -= notional + + # Update position + if order.symbol in self.positions: + self.positions[order.symbol]["quantity"] += order.quantity + # Update entry price (average) + old_notional = self.positions[order.symbol]["entry_price"] * ( + self.positions[order.symbol]["quantity"] - order.quantity + ) + total_notional = old_notional + notional + total_quantity = self.positions[order.symbol]["quantity"] + self.positions[order.symbol]["entry_price"] = ( + total_notional / total_quantity + ) + else: + self.positions[order.symbol] = { + "quantity": order.quantity, + "entry_price": order.price, + "entry_time": order.created_at, + } + + order.filled_quantity = order.quantity + order.filled_price = order.price + order.status = OrderStatus.FILLED + + elif order.side == "sell": + notional = order.quantity * order.price + self.balance += notional + + # Update position + if order.symbol in self.positions: + self.positions[order.symbol]["quantity"] -= order.quantity + if self.positions[order.symbol]["quantity"] <= 0: + del self.positions[order.symbol] + + order.filled_quantity = order.quantity + order.filled_price = order.price + order.status = OrderStatus.FILLED + + self.order_history.append(order) + logger.info(f"Order filled: {order.order_id} - {order.status.value}") + return True + + except Exception as e: + logger.error(f"Failed to fill order: {e}") + return False + + async def reset(self, initial_balance: float): + """ + Reset paper trading to initial state. + + Args: + initial_balance: New starting balance + """ + self.initial_balance = initial_balance + self.balance = initial_balance + self.positions.clear() + self.orders.clear() + self.order_history.clear() + logger.info(f"Paper trading reset with balance: ${initial_balance:,.2f}") diff --git a/python/valuecell/agents/auto_trading_agent/formatters.py b/python/valuecell/agents/auto_trading_agent/formatters.py new file mode 100644 index 000000000..9e2e9a8eb --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/formatters.py @@ -0,0 +1,237 @@ +"""Formatting utilities for notifications and messages""" + +import json +import logging +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +from .models import Position, TechnicalIndicators, TradeAction, TradeType + +logger = logging.getLogger(__name__) + + +class MessageFormatter: + """Formats various messages and notifications""" + + @staticmethod + def format_trade_notification( + trade_details: Dict[str, Any], agent_name: str = "AutoTrading" + ) -> str: + """ + Format trade details into a notification message + + Args: + trade_details: Trade execution details + agent_name: Name of the agent + + Returns: + Formatted notification message + """ + try: + symbol = trade_details["symbol"] + action = trade_details["action"] + trade_type = trade_details["trade_type"] + timestamp = trade_details["timestamp"] + + if action == "opened": + message = ( + f"{agent_name} opened a {trade_type} position on {symbol}!\n" + f"{timestamp.strftime('%m/%d, %I:%M %p')}\n" + f"Price: ${trade_details['entry_price']:,.2f}\n" + f"Quantity: {trade_details['quantity']:.4f}\n" + f"Notional: ${trade_details['notional']:,.2f}" + ) + else: # closed + hours = int(trade_details["holding_time"].total_seconds() // 3600) + minutes = int( + (trade_details["holding_time"].total_seconds() % 3600) // 60 + ) + pnl = trade_details["pnl"] + pnl_sign = "+" if pnl >= 0 else "" + + message = ( + f"{agent_name} completed a {trade_type} trade on {symbol}!\n" + f"{timestamp.strftime('%m/%d, %I:%M %p')}\n" + f"Price: ${trade_details['entry_price']:,.2f} β†’ ${trade_details['exit_price']:,.2f}\n" + f"Quantity: {trade_details['quantity']:.4f}\n" + f"Notional: ${trade_details['entry_notional']:,.2f} β†’ ${trade_details['exit_notional']:,.2f}\n" + f"Holding time: {hours}H {minutes}M\n" + f"Net P&L: {pnl_sign}${pnl:,.2f}" + ) + + return message + + except Exception as e: + logger.error(f"Failed to format trade notification: {e}") + return "Trade executed" + + @staticmethod + def format_portfolio_notification( + portfolio_value: float, + positions_count: int, + current_capital: float, + agent_model: str, + session_id: str, + portfolio_history: list, + ) -> tuple[str, Optional[str]]: + """ + Format portfolio value notification for chart rendering + + Args: + portfolio_value: Current portfolio value + positions_count: Number of open positions + current_capital: Available capital + agent_model: Agent model name + session_id: Current session ID + portfolio_history: Historical portfolio data + + Returns: + Tuple of (display message, chart data JSON) + """ + try: + timestamp = datetime.now(timezone.utc) + + # Append to history + portfolio_history.append( + {"timestamp": timestamp.isoformat(), "value": portfolio_value} + ) + + # Create chart data payload + chart_data = { + "id": f"AutoTradingAgent-{agent_model}", + "filters": [ + {"dimension": "Time", "gte": timestamp.isoformat()}, + {"dimension": "Model", "=": agent_model}, + ], + "data": {"Portfolio": portfolio_value}, + } + + display_message = ( + f"πŸ’° Portfolio Update\n" + f"Time: {timestamp.strftime('%m/%d, %I:%M %p UTC')}\n" + f"Total Value: ${portfolio_value:,.2f}\n" + f"Open Positions: {positions_count}\n" + f"Available Capital: ${current_capital:,.2f}" + ) + + return display_message, json.dumps(chart_data) + + except Exception as e: + logger.error(f"Failed to format portfolio notification: {e}") + return "Portfolio update failed", None + + @staticmethod + def format_market_analysis_notification( + symbol: str, + indicators: TechnicalIndicators, + action: TradeAction, + trade_type: TradeType, + positions: Dict[str, Position], + ai_reasoning: Optional[str] = None, + ) -> str: + """ + Format market analysis notification including HOLD decisions + + Args: + symbol: Trading symbol + indicators: Technical indicators + action: Recommended action + trade_type: Trade type + positions: Current positions dictionary + ai_reasoning: AI reasoning if available + + Returns: + Formatted analysis message + """ + try: + timestamp = datetime.now(timezone.utc) + + # Format action with emoji + action_emoji = { + TradeAction.BUY: "🟒", + TradeAction.SELL: "πŸ”΄", + TradeAction.HOLD: "⏸️", + } + + message = ( + f"πŸ“Š **Market Analysis - {symbol}**\n" + f"Time: {timestamp.strftime('%m/%d, %I:%M %p UTC')}\n\n" + f"**Current Price:** ${indicators.close_price:,.2f}\n" + f"**Decision:** {action_emoji.get(action, '')} {action.value.upper()}" + ) + + if action != TradeAction.HOLD: + message += f" ({trade_type.value.upper()})" + + message += "\n\n**Technical Indicators:**\n" + + # Add MACD + if indicators.macd is not None and indicators.macd_signal is not None: + macd_signal = ( + "🟒 Bullish" + if indicators.macd > indicators.macd_signal + else "πŸ”΄ Bearish" + ) + message += f"- MACD: {indicators.macd:.4f} / Signal: {indicators.macd_signal:.4f} ({macd_signal})\n" + + # Add RSI + if indicators.rsi is not None: + rsi_signal = ( + "🟒 Oversold" + if indicators.rsi < 30 + else ("πŸ”΄ Overbought" if indicators.rsi > 70 else "βšͺ Neutral") + ) + message += f"- RSI: {indicators.rsi:.2f} ({rsi_signal})\n" + + # Add EMAs + if indicators.ema_12 is not None and indicators.ema_26 is not None: + ema_signal = ( + "🟒 Bullish" + if indicators.ema_12 > indicators.ema_26 + else "πŸ”΄ Bearish" + ) + message += f"- EMA 12/26: ${indicators.ema_12:,.2f} / ${indicators.ema_26:,.2f} ({ema_signal})\n" + + # Add Bollinger Bands + if indicators.bb_upper is not None and indicators.bb_lower is not None: + if indicators.close_price > indicators.bb_upper: + bb_signal = "πŸ”΄ Above Upper Band" + elif indicators.close_price < indicators.bb_lower: + bb_signal = "🟒 Below Lower Band" + else: + bb_signal = "βšͺ Within Bands" + message += f"- Bollinger Bands: ${indicators.bb_lower:,.2f} - ${indicators.bb_upper:,.2f} ({bb_signal})\n" + + # Add AI reasoning if available + if ai_reasoning: + message += f"\n**AI Analysis:**\n{ai_reasoning}\n" + + # Add current position info if exists + if symbol in positions: + pos = positions[symbol] + current_pnl = 0 + if pos.trade_type == TradeType.LONG: + current_pnl = (indicators.close_price - pos.entry_price) * abs( + pos.quantity + ) + else: + current_pnl = (pos.entry_price - indicators.close_price) * abs( + pos.quantity + ) + + pnl_emoji = "🟒" if current_pnl >= 0 else "πŸ”΄" + message += ( + f"\n**Current Position:**\n" + f"- Type: {pos.trade_type.value.upper()}\n" + f"- Entry: ${pos.entry_price:,.2f}\n" + f"- Quantity: {abs(pos.quantity):.4f}\n" + f"- Unrealized P&L: {pnl_emoji} ${current_pnl:,.2f}\n" + ) + else: + message += f"\n**Current Position:** No open position for {symbol}\n\n" + + return message + + except Exception as e: + logger.error(f"Failed to format market analysis notification: {e}") + return f"Market analysis for {symbol}" diff --git a/python/valuecell/agents/auto_trading_agent/market_data.py b/python/valuecell/agents/auto_trading_agent/market_data.py new file mode 100644 index 000000000..7bce5f807 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/market_data.py @@ -0,0 +1,269 @@ +"""Market data and technical indicator retrieval - from a trader's perspective""" + +import logging +from datetime import datetime, timezone +from typing import Dict, Optional + +import pandas as pd +import yfinance as yf + +from .models import TechnicalIndicators + +logger = logging.getLogger(__name__) + + +class MarketDataProvider: + """ + Fetches and caches market data. + + A trader typically thinks about: + 1. "What's the current price?" + 2. "What are the technical indicators telling me?" + 3. "Is there enough volume for good execution?" + """ + + def __init__(self, cache_ttl_seconds: int = 60): + """ + Initialize market data provider with optional caching. + + Args: + cache_ttl_seconds: Time to live for cached data + """ + self.cache_ttl_seconds = cache_ttl_seconds + self._cache: Dict[str, tuple] = {} # {symbol: (data, timestamp)} + + def get_current_price(self, symbol: str) -> Optional[float]: + """ + Get current market price for a symbol. + + Args: + symbol: Trading symbol (e.g., BTC-USD) + + Returns: + Current price or None if fetch fails + """ + try: + ticker = yf.Ticker(symbol) + data = ticker.history(period="1d", interval="1m") + if data.empty: + logger.warning(f"No data available for {symbol}") + return None + return float(data["Close"].iloc[-1]) + except Exception as e: + logger.error(f"Failed to get current price for {symbol}: {e}") + return None + + def calculate_indicators( + self, symbol: str, period: str = "5d", interval: str = "1m" + ) -> Optional[TechnicalIndicators]: + """ + Calculate all technical indicators for a symbol. + + Args: + symbol: Trading symbol + period: Data period (default: 5 days for intraday trading) + interval: Data interval (default: 1 minute) + + Returns: + TechnicalIndicators object or None if calculation fails + """ + try: + # Fetch data from yfinance + ticker = yf.Ticker(symbol) + df = ticker.history(period=period, interval=interval) + + if df.empty or len(df) < 50: + logger.warning(f"Insufficient data for {symbol}: {len(df)} bars") + return None + + # Calculate all indicators + self._calculate_moving_averages(df) + self._calculate_macd(df) + self._calculate_rsi(df) + self._calculate_bollinger_bands(df) + + # Get latest values + return self._extract_latest_indicators(df, symbol) + + except Exception as e: + logger.error(f"Failed to calculate indicators for {symbol}: {e}") + return None + + @staticmethod + def _calculate_moving_averages(df: pd.DataFrame): + """Calculate exponential moving averages""" + df["ema_12"] = df["Close"].ewm(span=12, adjust=False).mean() + df["ema_26"] = df["Close"].ewm(span=26, adjust=False).mean() + df["ema_50"] = df["Close"].ewm(span=50, adjust=False).mean() + + @staticmethod + def _calculate_macd(df: pd.DataFrame): + """Calculate MACD and signal line""" + df["ema_12"] = df["Close"].ewm(span=12, adjust=False).mean() + df["ema_26"] = df["Close"].ewm(span=26, adjust=False).mean() + df["macd"] = df["ema_12"] - df["ema_26"] + df["macd_signal"] = df["macd"].ewm(span=9, adjust=False).mean() + df["macd_histogram"] = df["macd"] - df["macd_signal"] + + @staticmethod + def _calculate_rsi(df: pd.DataFrame, period: int = 14): + """Calculate Relative Strength Index""" + delta = df["Close"].diff() + gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() + rs = gain / loss + df["rsi"] = 100 - (100 / (1 + rs)) + + @staticmethod + def _calculate_bollinger_bands( + df: pd.DataFrame, period: int = 20, std_dev: float = 2 + ): + """Calculate Bollinger Bands""" + df["bb_middle"] = df["Close"].rolling(window=period).mean() + bb_std = df["Close"].rolling(window=period).std() + df["bb_upper"] = df["bb_middle"] + (bb_std * std_dev) + df["bb_lower"] = df["bb_middle"] - (bb_std * std_dev) + + @staticmethod + def _extract_latest_indicators( + df: pd.DataFrame, symbol: str + ) -> TechnicalIndicators: + """Extract latest indicator values from dataframe""" + latest = df.iloc[-1] + + def safe_float(value): + """Safely convert to float, handling NaN""" + return float(value) if pd.notna(value) else None + + return TechnicalIndicators( + symbol=symbol, + timestamp=datetime.now(timezone.utc), + close_price=float(latest["Close"]), + volume=float(latest["Volume"]), + macd=safe_float(latest.get("macd")), + macd_signal=safe_float(latest.get("macd_signal")), + macd_histogram=safe_float(latest.get("macd_histogram")), + rsi=safe_float(latest.get("rsi")), + ema_12=safe_float(latest.get("ema_12")), + ema_26=safe_float(latest.get("ema_26")), + ema_50=safe_float(latest.get("ema_50")), + bb_upper=safe_float(latest.get("bb_upper")), + bb_middle=safe_float(latest.get("bb_middle")), + bb_lower=safe_float(latest.get("bb_lower")), + ) + + +class SignalGenerator: + """ + Generates trading signals from technical indicators. + + A trader's signal logic: + 1. When to buy? (Entry signals) + 2. When to sell? (Exit signals) + 3. How confident am I? + """ + + from .models import TradeAction, TradeType + + @staticmethod + def generate_signal( + indicators: TechnicalIndicators, + ) -> tuple["SignalGenerator.TradeAction", "SignalGenerator.TradeType"]: + """ + Generate trading signal based on technical indicators. + + Uses a combination of: + - MACD for trend direction + - RSI for momentum/exhaustion + - Bollinger Bands for volatility and support/resistance + + Args: + indicators: Technical indicators for analysis + + Returns: + Tuple of (TradeAction, TradeType) + """ + from .models import TradeAction, TradeType + + try: + # Check if we have all required indicators + if ( + indicators.macd is None + or indicators.macd_signal is None + or indicators.rsi is None + ): + return (TradeAction.HOLD, TradeType.LONG) + + # Analyze trend direction + macd_bullish = indicators.macd > indicators.macd_signal + macd_bearish = indicators.macd < indicators.macd_signal + + # Analyze momentum + rsi_oversold = indicators.rsi < 30 + rsi_overbought = indicators.rsi > 70 + + # Entry signals: Look for mean-reversion opportunities with trend confirmation + # Long signal: MACD bullish + RSI showing oversold + if macd_bullish and rsi_oversold: + return (TradeAction.BUY, TradeType.LONG) + + # Short signal: MACD bearish + RSI showing overbought + if macd_bearish and rsi_overbought: + return (TradeAction.BUY, TradeType.SHORT) + + # Exit signals: Close positions when momentum reverses + # Exit long: MACD turns bearish or RSI gets overbought + if macd_bearish or rsi_overbought: + return (TradeAction.SELL, TradeType.LONG) + + # Exit short: MACD turns bullish or RSI gets oversold + if macd_bullish or rsi_oversold: + return (TradeAction.SELL, TradeType.SHORT) + + return (TradeAction.HOLD, TradeType.LONG) + + except Exception as e: + logger.error(f"Failed to generate signal: {e}") + return (TradeAction.HOLD, TradeType.LONG) + + @staticmethod + def get_signal_strength(indicators: TechnicalIndicators) -> Dict[str, float]: + """ + Get quantitative strength of signals. + + Returns: + Dictionary with various signal strength indicators (0-100) + """ + strength = {} + + # MACD strength (0-100) + if indicators.macd is not None and indicators.macd_signal is not None: + macd_diff = indicators.macd - indicators.macd_signal + # Normalize to 0-100 scale (assuming typical range) + strength["macd"] = min(100, max(0, 50 + (macd_diff * 100))) + else: + strength["macd"] = 50 # Neutral + + # RSI strength (already 0-100) + if indicators.rsi is not None: + strength["rsi"] = indicators.rsi + else: + strength["rsi"] = 50 # Neutral + + # Distance from Bollinger Bands (0-100) + if ( + indicators.bb_lower is not None + and indicators.bb_upper is not None + and indicators.bb_middle is not None + ): + band_range = indicators.bb_upper - indicators.bb_lower + if band_range > 0: + # Distance from middle: 0 = at lower band, 100 = at upper band + distance = (indicators.close_price - indicators.bb_lower) / band_range + strength["bollinger"] = min(100, max(0, distance * 100)) + else: + strength["bollinger"] = 50 + else: + strength["bollinger"] = 50 # Neutral + + return strength diff --git a/python/valuecell/agents/auto_trading_agent/models.py b/python/valuecell/agents/auto_trading_agent/models.py new file mode 100644 index 000000000..573709507 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/models.py @@ -0,0 +1,243 @@ +"""Data models and enumerations for auto trading agent""" + +from datetime import datetime +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Field, field_validator + +from .constants import ( + DEFAULT_AGENT_MODEL, + DEFAULT_INITIAL_CAPITAL, + DEFAULT_MAX_POSITIONS, + DEFAULT_RISK_PER_TRADE, + MAX_SYMBOLS, +) + + +class TradeAction(str, Enum): + """Trade action enumeration""" + + BUY = "buy" + SELL = "sell" + HOLD = "hold" + + +class TradeType(str, Enum): + """Trade type enumeration""" + + LONG = "long" + SHORT = "short" + + +class TradingRequest(BaseModel): + """Auto trading request model for parsing natural language queries""" + + crypto_symbols: List[str] = Field( + ..., + description="List of crypto symbols to trade (e.g., ['BTC-USD', 'ETH-USD'])", + ) + initial_capital: Optional[float] = Field( + default=DEFAULT_INITIAL_CAPITAL, + description="Initial capital for trading in USD", + gt=0, + ) + use_ai_signals: Optional[bool] = Field( + default=False, + description="Whether to use AI-enhanced trading signals", + ) + agent_model: Optional[str] = Field( + default=DEFAULT_AGENT_MODEL, + description="Model ID for trading decisions", + ) + + @field_validator("crypto_symbols") + @classmethod + def validate_symbols(cls, v): + if not v or len(v) == 0: + raise ValueError("At least one crypto symbol is required") + if len(v) > MAX_SYMBOLS: + raise ValueError(f"Maximum {MAX_SYMBOLS} symbols allowed") + # Normalize symbols to uppercase + return [s.upper() for s in v] + + +class AutoTradingConfig(BaseModel): + """Configuration for auto trading agent""" + + initial_capital: float = Field(..., description="Initial capital for trading", gt=0) + crypto_symbols: List[str] = Field( + ..., + description="List of crypto symbols to trade (max 10)", + max_length=MAX_SYMBOLS, + ) + check_interval: int = Field( + default=60, + description="Check interval in seconds", + gt=0, + ) + risk_per_trade: float = Field( + default=DEFAULT_RISK_PER_TRADE, + description="Risk per trade as percentage of capital", + gt=0, + lt=1, + ) + max_positions: int = Field( + default=DEFAULT_MAX_POSITIONS, + description="Maximum number of concurrent positions", + gt=0, + ) + agent_model: str = Field( + default=DEFAULT_AGENT_MODEL, + description="OpenRouter model ID for AI-enhanced trading decisions", + ) + use_ai_signals: bool = Field( + default=False, + description="Whether to use AI model for enhanced signal generation", + ) + openrouter_api_key: Optional[str] = Field( + default=None, + description="OpenRouter API key for AI model access", + ) + + @field_validator("crypto_symbols") + @classmethod + def validate_symbols(cls, v): + if not v or len(v) == 0: + raise ValueError("At least one crypto symbol is required") + if len(v) > MAX_SYMBOLS: + raise ValueError(f"Maximum {MAX_SYMBOLS} symbols allowed") + # Normalize symbols to uppercase + return [s.upper() for s in v] + + +class Position(BaseModel): + """Trading position model""" + + symbol: str + entry_price: float + quantity: float + entry_time: datetime + trade_type: TradeType + notional: float + + +class CashManagement(BaseModel): + """Cash management tracking""" + + total_cash: float = Field(..., description="Total available cash for trading") + initial_cash: float = Field(..., description="Initial cash allocated") + reserved_cash: float = Field( + default=0, description="Cash reserved for pending positions" + ) + available_cash: float = Field( + ..., description="Available cash for new trades (total_cash - reserved_cash)" + ) + cash_in_trades: float = Field( + default=0, description="Cash currently deployed in open positions" + ) + + class Config: + """Pydantic config""" + + frozen = False + + +class TechnicalIndicators(BaseModel): + """Technical indicators for a symbol""" + + symbol: str + timestamp: datetime + close_price: float + volume: float + macd: Optional[float] = None + macd_signal: Optional[float] = None + macd_histogram: Optional[float] = None + rsi: Optional[float] = None + ema_12: Optional[float] = None + ema_26: Optional[float] = None + ema_50: Optional[float] = None + bb_upper: Optional[float] = None + bb_middle: Optional[float] = None + bb_lower: Optional[float] = None + + +class TradeHistoryRecord(BaseModel): + """Single trade execution history record""" + + timestamp: datetime = Field(..., description="Trade execution timestamp") + symbol: str = Field(..., description="Trading symbol") + action: str = Field(..., description="Trade action: opened or closed") + trade_type: str = Field(..., description="Trade type: long or short") + price: float = Field(..., description="Execution price") + quantity: float = Field(..., description="Trade quantity") + notional: float = Field(..., description="Trade notional value") + pnl: Optional[float] = Field(None, description="P&L for closed positions") + portfolio_value_after: float = Field( + ..., description="Portfolio value after this trade" + ) + cash_after: float = Field(..., description="Available cash after this trade") + + +class PositionHistorySnapshot(BaseModel): + """Position snapshot at a point in time""" + + timestamp: datetime = Field(..., description="Snapshot timestamp") + symbol: str = Field(..., description="Trading symbol") + quantity: float = Field(..., description="Position quantity") + entry_price: float = Field(..., description="Entry price") + current_price: float = Field(..., description="Current market price") + trade_type: str = Field(..., description="Trade type: long or short") + unrealized_pnl: float = Field(..., description="Unrealized P&L") + notional: float = Field(..., description="Position notional value") + + +class PortfolioValueSnapshot(BaseModel): + """Portfolio value snapshot at a point in time""" + + timestamp: datetime = Field(..., description="Snapshot timestamp") + total_value: float = Field(..., description="Total portfolio value") + cash: float = Field(..., description="Available cash") + cash_in_trades: float = Field( + ..., description="Cash currently deployed in positions" + ) + positions_value: float = Field(..., description="Value of open positions") + positions_count: int = Field(..., description="Number of open positions") + total_pnl: float = Field(..., description="Total unrealized P&L") + + +class TradingInstanceData(BaseModel): + """Complete data for a trading instance""" + + instance_id: str = Field(..., description="Unique instance ID") + session_id: str = Field(..., description="Session ID") + config: AutoTradingConfig = Field(..., description="Trading configuration") + created_at: datetime = Field(..., description="Instance creation time") + active: bool = Field(..., description="Whether instance is active") + + # Historical data + trade_history: List[TradeHistoryRecord] = Field( + default_factory=list, description="All trade executions" + ) + position_history: List[PositionHistorySnapshot] = Field( + default_factory=list, description="Position snapshots over time" + ) + portfolio_history: List[PortfolioValueSnapshot] = Field( + default_factory=list, description="Portfolio value over time" + ) + + # Current state + current_positions: List[Position] = Field( + default_factory=list, description="Current open positions" + ) + current_capital: float = Field(..., description="Current available capital") + current_portfolio_value: float = Field( + ..., description="Current total portfolio value" + ) + + # Statistics + check_count: int = Field(default=0, description="Number of market checks performed") + last_check_time: Optional[datetime] = Field( + None, description="Last market check time" + ) + total_trades: int = Field(default=0, description="Total number of trades executed") diff --git a/python/valuecell/agents/auto_trading_agent/portfolio_decision_manager.py b/python/valuecell/agents/auto_trading_agent/portfolio_decision_manager.py new file mode 100644 index 000000000..337cc8094 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/portfolio_decision_manager.py @@ -0,0 +1,686 @@ +"""Portfolio-level decision manager using AI for coordinated multi-asset trading decisions""" + +import logging +from datetime import datetime +from typing import Dict, List, Optional, Tuple + +from agno.agent import Agent +from pydantic import BaseModel, Field + +from .models import ( + AutoTradingConfig, + Position, + TechnicalIndicators, + TradeAction, + TradeType, +) + +logger = logging.getLogger(__name__) + + +class AssetAnalysis: + """Analysis result for a single asset""" + + def __init__( + self, + symbol: str, + indicators: TechnicalIndicators, + technical_action: TradeAction, + technical_trade_type: TradeType, + ai_action: Optional[TradeAction] = None, + ai_trade_type: Optional[TradeType] = None, + ai_reasoning: Optional[str] = None, + ai_confidence: Optional[float] = None, + ): + self.symbol = symbol + self.indicators = indicators + self.technical_action = technical_action + self.technical_trade_type = technical_trade_type + self.ai_action = ai_action + self.ai_trade_type = ai_trade_type + self.ai_reasoning = ai_reasoning + self.ai_confidence = ai_confidence + + # Final recommendation (AI takes precedence if available) + self.recommended_action = ai_action or technical_action + self.recommended_trade_type = ai_trade_type or technical_trade_type + + @property + def current_price(self) -> float: + """Get current price from indicators""" + return self.indicators.close_price + + def to_dict(self) -> Dict: + """Convert analysis to dictionary for prompt construction""" + return { + "symbol": self.symbol, + "current_price": self.current_price, + "volume": self.indicators.volume, + "technical_indicators": { + "macd": self.indicators.macd, + "macd_signal": self.indicators.macd_signal, + "macd_histogram": self.indicators.macd_histogram, + "rsi": self.indicators.rsi, + "ema_12": self.indicators.ema_12, + "ema_26": self.indicators.ema_26, + "ema_50": self.indicators.ema_50, + "bb_upper": self.indicators.bb_upper, + "bb_middle": self.indicators.bb_middle, + "bb_lower": self.indicators.bb_lower, + }, + "technical_signal": { + "action": self.technical_action.value, + "trade_type": self.technical_trade_type.value, + }, + "ai_signal": { + "action": self.ai_action.value if self.ai_action else None, + "trade_type": self.ai_trade_type.value if self.ai_trade_type else None, + "reasoning": self.ai_reasoning, + "confidence": self.ai_confidence, + } + if self.ai_action + else None, + } + + +class TradeDecision(BaseModel): + """Single trade decision""" + + symbol: str = Field(..., description="Trading symbol") + action: str = Field(..., description="BUY, SELL, or HOLD") + trade_type: str = Field(..., description="LONG or SHORT") + priority: int = Field(..., description="Priority score 1-100") + reasoning: str = Field(..., description="Reasoning for this trade decision") + + +class PortfolioDecisionSchema(BaseModel): + """AI-generated portfolio decision schema""" + + overall_market_sentiment: str = Field( + ..., description="Overall market sentiment: BULLISH, BEARISH, or NEUTRAL" + ) + portfolio_risk_assessment: str = Field( + ..., description="Assessment of current portfolio risk: LOW, MEDIUM, or HIGH" + ) + recommended_trades: List[TradeDecision] = Field( + default_factory=list, + description="List of recommended trades in priority order (max 3)", + ) + portfolio_strategy: str = Field( + ..., + description="Overall portfolio strategy: AGGRESSIVE_GROWTH, BALANCED, DEFENSIVE, or HOLD", + ) + risk_warnings: List[str] = Field( + default_factory=list, description="Any risk warnings or concerns" + ) + reasoning: str = Field( + ..., description="Comprehensive reasoning for the portfolio decision" + ) + + +class PortfolioDecision: + """Portfolio-level trading decision""" + + def __init__(self): + self.trades_to_execute: List[Tuple[str, TradeAction, TradeType]] = [] + self.reasoning: str = "" + self.risk_level: float = 0.0 # 0-1 scale + self.market_sentiment: str = "neutral" + self.portfolio_strategy: str = "balanced" + self.risk_warnings: List[str] = [] + + +class PortfolioDecisionManager: + """ + AI-powered portfolio-level decision manager that considers all assets holistically. + + This manager: + 1. Collects analysis for all assets in the portfolio + 2. Uses LLM to analyze portfolio state, risk, and market conditions + 3. Makes coordinated trading decisions based on AI reasoning + 4. Provides transparent reasoning for all decisions + """ + + def __init__(self, config: AutoTradingConfig, llm_client=None): + """ + Initialize portfolio decision manager. + + Args: + config: Trading configuration + llm_client: OpenRouter LLM client for portfolio analysis + """ + self.config = config + self.llm_client = llm_client + self.asset_analyses: Dict[str, AssetAnalysis] = {} + + def add_asset_analysis(self, analysis: AssetAnalysis): + """ + Add analysis for a single asset. + + Args: + analysis: Asset analysis result + """ + self.asset_analyses[analysis.symbol] = analysis + logger.info( + f"Added analysis for {analysis.symbol}: " + f"{analysis.recommended_action.value} {analysis.recommended_trade_type.value}" + ) + + def clear_analyses(self): + """Clear all asset analyses for a new decision cycle""" + self.asset_analyses.clear() + + async def make_portfolio_decision( + self, + current_positions: Dict[str, Position], + available_cash: float, + total_portfolio_value: float, + ) -> PortfolioDecision: + """ + Make AI-powered portfolio-level trading decision. + + Args: + current_positions: Current open positions + available_cash: Available cash for trading + total_portfolio_value: Total portfolio value + + Returns: + PortfolioDecision with AI-coordinated trading actions + """ + decision = PortfolioDecision() + + if not self.asset_analyses: + decision.reasoning = "No asset analyses available" + return decision + + # Calculate basic portfolio metrics + portfolio_metrics = self._calculate_portfolio_metrics( + current_positions, available_cash, total_portfolio_value + ) + + # Use AI to make portfolio decision if available + if self.llm_client: + try: + ai_decision = await self._get_ai_portfolio_decision( + current_positions, + portfolio_metrics, + available_cash, + total_portfolio_value, + ) + decision = self._convert_ai_decision(ai_decision, current_positions) + except Exception as e: + logger.error(f"Failed to get AI portfolio decision: {e}") + # Fallback to rule-based decision + decision = self._make_rule_based_decision( + current_positions, + portfolio_metrics, + available_cash, + total_portfolio_value, + ) + else: + # Fallback to rule-based decision + decision = self._make_rule_based_decision( + current_positions, + portfolio_metrics, + available_cash, + total_portfolio_value, + ) + + return decision + + async def _get_ai_portfolio_decision( + self, + current_positions: Dict[str, Position], + portfolio_metrics: Dict, + available_cash: float, + total_portfolio_value: float, + ) -> PortfolioDecisionSchema: + """ + Use LLM to analyze portfolio and make trading decisions. + + Returns: + AI-generated portfolio decision + """ + # Construct comprehensive prompt + prompt = self._build_portfolio_analysis_prompt( + current_positions, portfolio_metrics, available_cash, total_portfolio_value + ) + + # Create agent with structured output + agent = Agent( + model=self.llm_client, + output_schema=PortfolioDecisionSchema, + markdown=False, + ) + + # Get AI decision + response = await agent.arun(prompt) + ai_decision = response.content + + logger.info( + f"AI Portfolio Decision: {ai_decision.portfolio_strategy}, " + f"Sentiment: {ai_decision.overall_market_sentiment}, " + f"Trades: {len(ai_decision.recommended_trades)}" + ) + + return ai_decision + + def _build_portfolio_analysis_prompt( + self, + current_positions: Dict[str, Position], + portfolio_metrics: Dict, + available_cash: float, + total_portfolio_value: float, + ) -> str: + """Build comprehensive prompt for portfolio analysis""" + + # Current time + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # Portfolio state section + prompt_parts = [ + "You are an expert portfolio manager for cryptocurrency trading. Analyze the following portfolio state and asset analyses to make coordinated trading decisions.", + "", + f"=== ANALYSIS TIME: {current_time} ===", + "", + "=== PORTFOLIO STATE ===", + f"Total Portfolio Value: ${total_portfolio_value:,.2f}", + f"Available Cash: ${available_cash:,.2f} ({portfolio_metrics['cash_ratio'] * 100:.1f}%)", + f"Cash in Positions: ${total_portfolio_value - available_cash:,.2f}", + f"Open Positions: {portfolio_metrics['position_count']}/{self.config.max_positions}", + f"Risk Per Trade: {self.config.risk_per_trade * 100:.1f}%", + f"Max Positions Allowed: {self.config.max_positions}", + "", + ] + + # Current positions section + if current_positions: + prompt_parts.append("=== CURRENT POSITIONS ===") + for symbol, position in current_positions.items(): + if symbol in self.asset_analyses: + current_price = self.asset_analyses[symbol].current_price + position_value = abs(position.quantity) * current_price + + if position.trade_type == TradeType.LONG: + unrealized_pnl = (current_price - position.entry_price) * abs( + position.quantity + ) + else: + unrealized_pnl = (position.entry_price - current_price) * abs( + position.quantity + ) + + pnl_pct = ( + (unrealized_pnl / position.notional * 100) + if position.notional > 0 + else 0 + ) + concentration = ( + (position_value / total_portfolio_value * 100) + if total_portfolio_value > 0 + else 0 + ) + + prompt_parts.extend( + [ + f"\n{symbol}:", + f" Type: {position.trade_type.value.upper()}", + f" Entry Price: ${position.entry_price:,.2f}", + f" Current Price: ${current_price:,.2f}", + f" Quantity: {abs(position.quantity):.4f}", + f" Position Value: ${position_value:,.2f}", + f" Unrealized P&L: ${unrealized_pnl:,.2f} ({pnl_pct:+.2f}%)", + f" Portfolio Concentration: {concentration:.1f}%", + ] + ) + prompt_parts.append("") + else: + prompt_parts.extend( + [ + "=== CURRENT POSITIONS ===", + "No open positions", + "", + ] + ) + + # Asset analyses section + prompt_parts.append("=== ASSET ANALYSES ===") + for symbol, analysis in self.asset_analyses.items(): + indicators = analysis.indicators + prompt_parts.extend( + [ + f"\n{symbol}:", + f" Current Price: ${analysis.current_price:,.2f}", + f" Volume: {indicators.volume:,.0f}", + "", + " Technical Indicators:", + ] + ) + + # MACD + if indicators.macd is not None and indicators.macd_signal is not None: + macd_trend = ( + "BULLISH" if indicators.macd > indicators.macd_signal else "BEARISH" + ) + prompt_parts.append( + f" - MACD: {indicators.macd:.4f} / Signal: {indicators.macd_signal:.4f} ({macd_trend})" + ) + + # RSI + if indicators.rsi is not None: + if indicators.rsi < 30: + rsi_status = "OVERSOLD (Potential Buy)" + elif indicators.rsi > 70: + rsi_status = "OVERBOUGHT (Potential Sell)" + else: + rsi_status = "NEUTRAL" + prompt_parts.append(f" - RSI: {indicators.rsi:.2f} ({rsi_status})") + + # EMAs + if indicators.ema_12 is not None and indicators.ema_26 is not None: + ema_trend = ( + "BULLISH" if indicators.ema_12 > indicators.ema_26 else "BEARISH" + ) + prompt_parts.append( + f" - EMA 12/26: ${indicators.ema_12:,.2f} / ${indicators.ema_26:,.2f} ({ema_trend})" + ) + + # Bollinger Bands + if indicators.bb_upper is not None and indicators.bb_lower is not None: + if analysis.current_price > indicators.bb_upper: + bb_status = "ABOVE UPPER BAND (Overbought)" + elif analysis.current_price < indicators.bb_lower: + bb_status = "BELOW LOWER BAND (Oversold)" + else: + bb_status = "WITHIN BANDS" + prompt_parts.append( + f" - Bollinger Bands: ${indicators.bb_lower:,.2f} - ${indicators.bb_upper:,.2f} ({bb_status})" + ) + + prompt_parts.append("") + prompt_parts.append(" Technical Analysis Signal:") + prompt_parts.append( + f" - Action: {analysis.technical_action.value.upper()}" + ) + if analysis.technical_action != TradeAction.HOLD: + prompt_parts.append( + f" - Type: {analysis.technical_trade_type.value.upper()}" + ) + + # AI signal if available + if analysis.ai_action: + prompt_parts.append("") + prompt_parts.append(" AI-Enhanced Signal:") + prompt_parts.append(f" - Action: {analysis.ai_action.value.upper()}") + if analysis.ai_action != TradeAction.HOLD: + prompt_parts.append( + f" - Type: {analysis.ai_trade_type.value.upper()}" + ) + if analysis.ai_confidence: + prompt_parts.append( + f" - Confidence: {analysis.ai_confidence:.0f}%" + ) + if analysis.ai_reasoning: + prompt_parts.append(f" - Reasoning: {analysis.ai_reasoning}") + + # Current position status + if symbol in current_positions: + prompt_parts.append( + f" ⚠️ CURRENTLY HOLDING: {current_positions[symbol].trade_type.value.upper()} position" + ) + else: + prompt_parts.append(" ℹ️ No current position") + + prompt_parts.append("") + + # Risk management constraints + prompt_parts.extend( + [ + "", + "=== RISK MANAGEMENT CONSTRAINTS ===", + f"- Maximum {self.config.max_positions} concurrent positions allowed", + "- Maximum 3 trades per decision cycle", + f"- Risk per trade: {self.config.risk_per_trade * 100:.1f}% of available cash", + "- Avoid single asset concentration >40% of portfolio", + "- Prioritize closing losing positions if risk is high", + "- Maintain minimum 10% cash reserve", + "", + ] + ) + + # Decision instructions + prompt_parts.extend( + [ + "=== YOUR TASK ===", + "As a professional portfolio manager, analyze:", + "1. Overall market sentiment across all assets", + "2. Current portfolio risk level and concentration", + "3. Individual asset signals (both technical and AI)", + "4. Correlation and diversification opportunities", + "5. Risk/reward of each potential trade", + "", + "Then provide:", + "- overall_market_sentiment: BULLISH, BEARISH, or NEUTRAL", + "- portfolio_risk_assessment: LOW, MEDIUM, or HIGH", + "- recommended_trades: Up to 3 trades in priority order", + " * For each trade: symbol, action (BUY/SELL/HOLD), trade_type (LONG/SHORT), priority (1-100), reasoning", + " * Prioritize closing positions (SELL) over opening new ones if risk is high", + " * Only recommend BUY if we have room and cash available", + "- portfolio_strategy: AGGRESSIVE_GROWTH, BALANCED, DEFENSIVE, or HOLD", + "- risk_warnings: List any concerns (concentration, volatility, etc.)", + "- reasoning: Comprehensive explanation of your portfolio-level decision", + "", + "Important considerations:", + "- Consider the portfolio as a whole, not just individual assets", + "- Balance risk and opportunity across all positions", + "- Prioritize capital preservation when risk is high", + "- Consider taking profits on winning positions", + "- Cut losses on losing positions if trend has reversed", + "- Ensure diversification and avoid over-concentration", + "", + ] + ) + + return "\n".join(prompt_parts) + + def _convert_ai_decision( + self, + ai_decision: PortfolioDecisionSchema, + current_positions: Dict[str, Position], + ) -> PortfolioDecision: + """Convert AI decision schema to PortfolioDecision""" + decision = PortfolioDecision() + + # Set metadata + decision.market_sentiment = ai_decision.overall_market_sentiment.lower() + decision.portfolio_strategy = ai_decision.portfolio_strategy.lower() + decision.risk_warnings = ai_decision.risk_warnings + decision.reasoning = ai_decision.reasoning + + # Map risk assessment to risk level + risk_map = {"LOW": 0.3, "MEDIUM": 0.6, "HIGH": 0.9} + decision.risk_level = risk_map.get( + ai_decision.portfolio_risk_assessment.upper(), 0.6 + ) + + # Convert trades + for trade in ai_decision.recommended_trades: + try: + action = TradeAction(trade.action.lower()) + trade_type = TradeType(trade.trade_type.lower()) + + # Validate trade + if action == TradeAction.BUY and trade.symbol in current_positions: + logger.warning( + f"Skipping BUY for {trade.symbol} - position already exists" + ) + continue + + if action == TradeAction.SELL and trade.symbol not in current_positions: + logger.warning( + f"Skipping SELL for {trade.symbol} - no position to close" + ) + continue + + if action == TradeAction.SELL and trade.symbol in current_positions: + # Verify trade type matches + if current_positions[trade.symbol].trade_type != trade_type: + logger.warning( + f"Skipping SELL for {trade.symbol} - trade type mismatch " + f"(have {current_positions[trade.symbol].trade_type.value}, " + f"trying to close {trade_type.value})" + ) + continue + + if action != TradeAction.HOLD: + decision.trades_to_execute.append( + (trade.symbol, action, trade_type) + ) + + except Exception as e: + logger.error( + f"Failed to convert trade decision for {trade.symbol}: {e}" + ) + + return decision + + def _make_rule_based_decision( + self, + current_positions: Dict[str, Position], + portfolio_metrics: Dict, + available_cash: float, + total_portfolio_value: float, + ) -> PortfolioDecision: + """Fallback rule-based decision making""" + decision = PortfolioDecision() + + # Simple rule-based logic + max_trades = 3 + trades_added = 0 + + # Prioritize selling losing positions + for symbol, position in current_positions.items(): + if trades_added >= max_trades: + break + + if symbol in self.asset_analyses: + analysis = self.asset_analyses[symbol] + current_price = analysis.current_price + + # Calculate P&L + if position.trade_type == TradeType.LONG: + pnl = (current_price - position.entry_price) * abs( + position.quantity + ) + else: + pnl = (position.entry_price - current_price) * abs( + position.quantity + ) + + # Close losing positions if analysis suggests exit + if pnl < 0 and analysis.recommended_action == TradeAction.SELL: + decision.trades_to_execute.append( + (symbol, TradeAction.SELL, position.trade_type) + ) + trades_added += 1 + + # Add new positions if we have room and strong signals + for symbol, analysis in self.asset_analyses.items(): + if trades_added >= max_trades: + break + + if ( + symbol not in current_positions + and analysis.recommended_action == TradeAction.BUY + ): + if portfolio_metrics["position_count"] < self.config.max_positions: + decision.trades_to_execute.append( + (symbol, TradeAction.BUY, analysis.recommended_trade_type) + ) + trades_added += 1 + + decision.reasoning = ( + f"Rule-based decision: {len(decision.trades_to_execute)} trades selected" + ) + decision.risk_level = portfolio_metrics.get("risk_level", 0.5) + + return decision + + def _calculate_portfolio_metrics( + self, + current_positions: Dict[str, Position], + available_cash: float, + total_portfolio_value: float, + ) -> Dict: + """Calculate basic portfolio metrics""" + metrics = { + "position_count": len(current_positions), + "cash_ratio": ( + available_cash / total_portfolio_value + if total_portfolio_value > 0 + else 0 + ), + "risk_level": 0.0, + "concentration": {}, + "max_concentration": 0.0, + } + + # Calculate concentration + for symbol, position in current_positions.items(): + if symbol in self.asset_analyses: + current_value = ( + abs(position.quantity) * self.asset_analyses[symbol].current_price + ) + concentration = ( + current_value / total_portfolio_value + if total_portfolio_value > 0 + else 0 + ) + metrics["concentration"][symbol] = concentration + metrics["max_concentration"] = max( + metrics["max_concentration"], concentration + ) + + # Calculate risk level + concentration_risk = metrics["max_concentration"] * 0.4 + cash_risk = (1 - metrics["cash_ratio"]) * 0.3 + position_count_risk = ( + min(metrics["position_count"] / self.config.max_positions, 1.0) * 0.3 + ) + metrics["risk_level"] = concentration_risk + cash_risk + position_count_risk + + return metrics + + def get_portfolio_summary(self) -> str: + """Get summary of current portfolio analysis""" + if not self.asset_analyses: + return "No asset analyses available" + + summary = ( + f"**Portfolio Analysis Summary** ({len(self.asset_analyses)} assets)\n\n" + ) + + for symbol, analysis in self.asset_analyses.items(): + summary += ( + f"**{symbol}:**\n" + f"- Price: ${analysis.current_price:,.2f}\n" + f"- Technical Signal: {analysis.technical_action.value.upper()}" + ) + if analysis.technical_action != TradeAction.HOLD: + summary += f" ({analysis.technical_trade_type.value.upper()})" + summary += "\n" + + if analysis.ai_action: + summary += f"- AI Signal: {analysis.ai_action.value.upper()}" + if analysis.ai_action != TradeAction.HOLD: + summary += f" ({analysis.ai_trade_type.value.upper()})" + if analysis.ai_confidence: + summary += f" - Confidence: {analysis.ai_confidence:.0f}%" + summary += "\n" + + if analysis.ai_reasoning: + summary += f"- AI Reasoning: {analysis.ai_reasoning}\n" + + summary += "\n" + + return summary diff --git a/python/valuecell/agents/auto_trading_agent/position_manager.py b/python/valuecell/agents/auto_trading_agent/position_manager.py new file mode 100644 index 000000000..f40252e11 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/position_manager.py @@ -0,0 +1,315 @@ +"""Position and cash management module - from a trader's perspective""" + +import logging +from datetime import datetime +from typing import Dict, Optional, Tuple + +import yfinance as yf + +from .models import ( + CashManagement, + PortfolioValueSnapshot, + Position, + PositionHistorySnapshot, + TradeType, +) + +logger = logging.getLogger(__name__) + + +class PositionManager: + """ + Manages all trading positions and cash from a trader's perspective. + + A trader typically thinks about: + 1. "How much cash do I have available?" + 2. "What positions am I currently holding?" + 3. "What's my P&L on each position?" + 4. "How much total capital is deployed?" + """ + + def __init__(self, initial_capital: float): + """ + Initialize position manager with initial capital. + + Args: + initial_capital: Total capital available for trading + """ + self.initial_capital = initial_capital + + # Current state + self._positions: Dict[str, Position] = {} # symbol -> Position + self._cash_management = CashManagement( + total_cash=initial_capital, + initial_cash=initial_capital, + available_cash=initial_capital, + cash_in_trades=0.0, + ) + + # Historical snapshots for analysis + self._position_history: list[PositionHistorySnapshot] = [] + self._portfolio_history: list[PortfolioValueSnapshot] = [] + + # ============ Cash Management Section ============ + + def get_cash_status(self) -> CashManagement: + """Get current cash management status""" + return self._cash_management.model_copy() + + def get_available_cash(self) -> float: + """Get available cash for new trades""" + return self._cash_management.available_cash + + def get_total_cash_deployed(self) -> float: + """Get total cash currently deployed in positions""" + return self._cash_management.cash_in_trades + + def allocate_cash(self, amount: float) -> bool: + """ + Allocate cash for a new position. + + Args: + amount: Amount to allocate + + Returns: + True if allocation successful, False if insufficient cash + """ + if amount > self._cash_management.available_cash: + logger.warning( + f"Insufficient cash: requested {amount}, " + f"available {self._cash_management.available_cash}" + ) + return False + + self._cash_management.available_cash -= amount + self._cash_management.cash_in_trades += amount + return True + + def release_cash(self, amount: float, pnl: float = 0.0): + """ + Release cash from a closed position (including P&L). + + Args: + amount: Initial position notional + pnl: Profit/loss from the position + """ + self._cash_management.cash_in_trades -= amount + self._cash_management.total_cash += pnl + self._cash_management.available_cash = ( + self._cash_management.total_cash - self._cash_management.cash_in_trades + ) + + # ============ Position Management Section ============ + + def open_position(self, symbol: str, position: Position) -> bool: + """ + Open a new position. + + Args: + symbol: Trading symbol + position: Position object + + Returns: + True if position opened successfully + """ + if symbol in self._positions: + logger.warning(f"Position already exists for {symbol}") + return False + + # Allocate cash for this position + if not self.allocate_cash(position.notional): + return False + + self._positions[symbol] = position + logger.info(f"Opened {position.trade_type.value} position on {symbol}") + return True + + def close_position(self, symbol: str) -> Optional[Position]: + """ + Close an existing position. + + Args: + symbol: Trading symbol + + Returns: + Closed position or None if not found + """ + if symbol not in self._positions: + logger.warning(f"No position found for {symbol}") + return None + + position = self._positions.pop(symbol) + logger.info(f"Closed {position.trade_type.value} position on {symbol}") + return position + + def get_position(self, symbol: str) -> Optional[Position]: + """Get position for a specific symbol""" + return self._positions.get(symbol) + + def get_all_positions(self) -> Dict[str, Position]: + """Get all current positions""" + return self._positions.copy() + + def get_positions_count(self) -> int: + """Get number of current open positions""" + return len(self._positions) + + # ============ Portfolio Valuation Section ============ + + def calculate_position_pnl(self, position: Position, current_price: float) -> float: + """ + Calculate unrealized P&L for a position. + + Args: + position: Position object + current_price: Current market price + + Returns: + Unrealized P&L amount + """ + if position.trade_type == TradeType.LONG: + # Long: profit when price goes up + return (current_price - position.entry_price) * abs(position.quantity) + else: + # Short: profit when price goes down + return (position.entry_price - current_price) * abs(position.quantity) + + def calculate_portfolio_value(self) -> Tuple[float, float, float]: + """ + Calculate total portfolio value with breakdown. + + Returns: + Tuple of (total_value, positions_value, total_pnl) + """ + total_value = self._cash_management.total_cash + positions_value = 0.0 + total_pnl = 0.0 + + for symbol, position in self._positions.items(): + try: + ticker = yf.Ticker(symbol) + current_price = ticker.history(period="1d", interval="1m")[ + "Close" + ].iloc[-1] + + # Calculate unrealized P&L + pnl = self.calculate_position_pnl(position, current_price) + total_pnl += pnl + + # Calculate position value + if position.trade_type == TradeType.LONG: + pos_value = abs(position.quantity) * current_price + else: + pos_value = position.notional + pnl + + positions_value += pos_value + total_value += pnl + + except Exception as e: + logger.warning(f"Failed to get price for {symbol}: {e}") + # Fallback to notional + positions_value += position.notional + + return total_value, positions_value, total_pnl + + def get_portfolio_summary(self) -> Dict: + """ + Get complete portfolio summary from trader's perspective. + + Returns: + Dictionary with all portfolio information + """ + total_value, positions_value, total_pnl = self.calculate_portfolio_value() + + return { + "cash": { + "available": self._cash_management.available_cash, + "deployed": self._cash_management.cash_in_trades, + "total": self._cash_management.total_cash, + }, + "positions": { + "count": self.get_positions_count(), + "total_value": positions_value, + }, + "portfolio": { + "total_value": total_value, + "total_pnl": total_pnl, + "pnl_percentage": (total_pnl / self.initial_capital * 100) + if self.initial_capital > 0 + else 0, + }, + } + + # ============ History Tracking Section ============ + + def snapshot_positions(self, timestamp: datetime): + """ + Take a snapshot of all positions at a point in time. + + Args: + timestamp: Snapshot timestamp + """ + for symbol, position in self._positions.items(): + try: + ticker = yf.Ticker(symbol) + current_price = ticker.history(period="1d", interval="1m")[ + "Close" + ].iloc[-1] + + unrealized_pnl = self.calculate_position_pnl(position, current_price) + + snapshot = PositionHistorySnapshot( + timestamp=timestamp, + symbol=symbol, + quantity=position.quantity, + entry_price=position.entry_price, + current_price=current_price, + trade_type=position.trade_type.value, + unrealized_pnl=unrealized_pnl, + notional=position.notional, + ) + self._position_history.append(snapshot) + + except Exception as e: + logger.warning(f"Failed to snapshot position for {symbol}: {e}") + + def snapshot_portfolio(self, timestamp: datetime): + """ + Take a snapshot of the entire portfolio. + + Args: + timestamp: Snapshot timestamp + """ + total_value, positions_value, total_pnl = self.calculate_portfolio_value() + + snapshot = PortfolioValueSnapshot( + timestamp=timestamp, + total_value=total_value, + cash=self._cash_management.available_cash, + cash_in_trades=self._cash_management.cash_in_trades, + positions_value=positions_value, + positions_count=self.get_positions_count(), + total_pnl=total_pnl, + ) + self._portfolio_history.append(snapshot) + + def get_position_history(self) -> list[PositionHistorySnapshot]: + """Get all position history snapshots""" + return self._position_history.copy() + + def get_portfolio_history(self) -> list[PortfolioValueSnapshot]: + """Get all portfolio history snapshots""" + return self._portfolio_history.copy() + + def reset(self, initial_capital: float): + """Reset to initial state""" + self.initial_capital = initial_capital + self._positions.clear() + self._cash_management = CashManagement( + total_cash=initial_capital, + initial_cash=initial_capital, + available_cash=initial_capital, + cash_in_trades=0.0, + ) + self._position_history.clear() + self._portfolio_history.clear() diff --git a/python/valuecell/agents/auto_trading_agent/technical_analysis.py b/python/valuecell/agents/auto_trading_agent/technical_analysis.py new file mode 100644 index 000000000..cc8b29c31 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/technical_analysis.py @@ -0,0 +1,184 @@ +"""Technical analysis and signal generation (refactored)""" + +import json +import logging +from typing import Optional + +from agno.agent import Agent + +from .market_data import MarketDataProvider, SignalGenerator +from .models import TechnicalIndicators, TradeAction, TradeType + +logger = logging.getLogger(__name__) + + +class TechnicalAnalyzer: + """ + Static interface for technical analysis (backward compatible). + + Now delegates to MarketDataProvider internally. + """ + + _market_data_provider = MarketDataProvider() + + @staticmethod + def calculate_indicators( + symbol: str, period: str = "5d", interval: str = "1m" + ) -> Optional[TechnicalIndicators]: + """ + Calculate technical indicators using yfinance data. + + Args: + symbol: Trading symbol (e.g., BTC-USD) + period: Data period + interval: Data interval + + Returns: + TechnicalIndicators object or None if calculation fails + """ + return TechnicalAnalyzer._market_data_provider.calculate_indicators( + symbol, period, interval + ) + + @staticmethod + def generate_signal( + indicators: TechnicalIndicators, + ) -> tuple[TradeAction, TradeType]: + """ + Generate trading signal based on technical indicators. + + Args: + indicators: Technical indicators for analysis + + Returns: + Tuple of (TradeAction, TradeType) + """ + return SignalGenerator.generate_signal(indicators) + + +class AISignalGenerator: + """AI-enhanced signal generation using LLM""" + + def __init__(self, llm_client): + """ + Initialize AI signal generator + + Args: + llm_client: OpenRouter client instance + """ + self.llm_client = llm_client + + async def get_signal( + self, indicators: TechnicalIndicators + ) -> Optional[tuple[TradeAction, TradeType, str, float]]: + """ + Get AI-enhanced trading signal using OpenRouter model + + Args: + indicators: Technical indicators for analysis + + Returns: + Tuple of (TradeAction, TradeType, reasoning, confidence) or None if AI not available + """ + if not self.llm_client: + return None + + try: + # Create analysis prompt with proper formatting + macd_str = ( + f"{indicators.macd:.4f}" if indicators.macd is not None else "N/A" + ) + macd_signal_str = ( + f"{indicators.macd_signal:.4f}" + if indicators.macd_signal is not None + else "N/A" + ) + macd_histogram_str = ( + f"{indicators.macd_histogram:.4f}" + if indicators.macd_histogram is not None + else "N/A" + ) + rsi_str = f"{indicators.rsi:.2f}" if indicators.rsi is not None else "N/A" + ema_12_str = ( + f"${indicators.ema_12:,.2f}" if indicators.ema_12 is not None else "N/A" + ) + ema_26_str = ( + f"${indicators.ema_26:,.2f}" if indicators.ema_26 is not None else "N/A" + ) + ema_50_str = ( + f"${indicators.ema_50:,.2f}" if indicators.ema_50 is not None else "N/A" + ) + bb_upper_str = ( + f"${indicators.bb_upper:,.2f}" + if indicators.bb_upper is not None + else "N/A" + ) + bb_middle_str = ( + f"${indicators.bb_middle:,.2f}" + if indicators.bb_middle is not None + else "N/A" + ) + bb_lower_str = ( + f"${indicators.bb_lower:,.2f}" + if indicators.bb_lower is not None + else "N/A" + ) + + prompt = f"""You are an expert crypto trading analyst. Analyze the following technical indicators for {indicators.symbol} and provide a trading recommendation. + +Current Market Data: +- Symbol: {indicators.symbol} +- Price: ${indicators.close_price:,.2f} +- Volume: {indicators.volume:,.0f} + +Technical Indicators: +- MACD: {macd_str} +- MACD Signal: {macd_signal_str} +- MACD Histogram: {macd_histogram_str} +- RSI: {rsi_str} +- EMA 12: {ema_12_str} +- EMA 26: {ema_26_str} +- EMA 50: {ema_50_str} +- BB Upper: {bb_upper_str} +- BB Middle: {bb_middle_str} +- BB Lower: {bb_lower_str} + +Based on these indicators, provide: +1. Action: BUY, SELL, or HOLD +2. Type: LONG or SHORT (if BUY) +3. Confidence: 0-100% +4. Reasoning: Brief explanation (1-2 sentences) + +Format your response as JSON: +{{"action": "BUY|SELL|HOLD", "type": "LONG|SHORT", "confidence": 0-100, "reasoning": "explanation"}}""" + + agent = Agent(model=self.llm_client, markdown=False) + response = await agent.arun(prompt) + + # Parse response + content = response.content.strip() + # Extract JSON from markdown code blocks if present + if "```json" in content: + content = content.split("```json")[1].split("```")[0].strip() + elif "```" in content: + content = content.split("```")[1].split("```")[0].strip() + + result = json.loads(content) + + action = TradeAction(result["action"].lower()) + trade_type = ( + TradeType(result["type"].lower()) if result["type"] else TradeType.LONG + ) + reasoning = result["reasoning"] + confidence = float(result.get("confidence", 75.0)) + + logger.info( + f"AI Signal for {indicators.symbol}: {action.value} {trade_type.value} " + f"(confidence: {confidence}%) - {reasoning}" + ) + + return (action, trade_type, reasoning, confidence) + + except Exception as e: + logger.error(f"Failed to get AI trading signal: {e}") + return None diff --git a/python/valuecell/agents/auto_trading_agent/trade_recorder.py b/python/valuecell/agents/auto_trading_agent/trade_recorder.py new file mode 100644 index 000000000..1443e2bab --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/trade_recorder.py @@ -0,0 +1,286 @@ +"""Trade recording and history management - from a trader's perspective""" + +import logging +from datetime import datetime, timedelta +from typing import Dict, List + +from .models import TradeHistoryRecord + +logger = logging.getLogger(__name__) + + +class TradeRecorder: + """ + Records and analyzes all trading activity. + + A trader typically wants to know: + 1. "What have I traded?" + 2. "What's my win rate?" + 3. "What's my average win/loss?" + 4. "Which symbols are most profitable?" + """ + + def __init__(self): + """Initialize trade recorder""" + self._trades: List[TradeHistoryRecord] = [] + + def record_trade(self, trade_record: TradeHistoryRecord): + """ + Record a new trade. + + Args: + trade_record: TradeHistoryRecord to record + """ + self._trades.append(trade_record) + logger.info( + f"Recorded {trade_record.action} {trade_record.trade_type} on " + f"{trade_record.symbol} at ${trade_record.price:.2f}" + ) + + def get_all_trades(self) -> List[TradeHistoryRecord]: + """Get all recorded trades""" + return self._trades.copy() + + def get_recent_trades(self, limit: int = 10) -> List[TradeHistoryRecord]: + """Get most recent N trades""" + return self._trades[-limit:] if self._trades else [] + + def get_trades_by_symbol(self, symbol: str) -> List[TradeHistoryRecord]: + """Get all trades for a specific symbol""" + return [t for t in self._trades if t.symbol == symbol] + + def get_trades_by_action(self, action: str) -> List[TradeHistoryRecord]: + """Get all trades of a specific action (opened/closed)""" + return [t for t in self._trades if t.action == action] + + def get_trades_in_period( + self, start_time: datetime, end_time: datetime + ) -> List[TradeHistoryRecord]: + """Get trades executed in a time period""" + return [t for t in self._trades if start_time <= t.timestamp <= end_time] + + # ============ Trade Statistics Section ============ + + def get_trade_statistics(self) -> Dict: + """ + Get comprehensive trade statistics. + + Returns: + Dictionary with various statistics + """ + if not self._trades: + return { + "total_trades": 0, + "win_trades": 0, + "loss_trades": 0, + "win_rate": 0, + "total_pnl": 0, + "average_win": 0, + "average_loss": 0, + "largest_win": 0, + "largest_loss": 0, + "profit_factor": 0, + } + + # Calculate closed trades (those with P&L) + closed_trades = [t for t in self._trades if t.pnl is not None] + + if not closed_trades: + return { + "total_trades": len(self._trades), + "win_trades": 0, + "loss_trades": 0, + "win_rate": 0, + "total_pnl": 0, + "average_win": 0, + "average_loss": 0, + "largest_win": 0, + "largest_loss": 0, + "profit_factor": 0, + } + + winning_trades = [t for t in closed_trades if t.pnl > 0] + losing_trades = [t for t in closed_trades if t.pnl < 0] + + total_pnl = sum(t.pnl for t in closed_trades) + total_wins = sum(t.pnl for t in winning_trades) if winning_trades else 0 + total_losses = sum(t.pnl for t in losing_trades) if losing_trades else 0 + + return { + "total_trades": len(closed_trades), + "win_trades": len(winning_trades), + "loss_trades": len(losing_trades), + "win_rate": (len(winning_trades) / len(closed_trades) * 100) + if closed_trades + else 0, + "total_pnl": total_pnl, + "average_win": (total_wins / len(winning_trades)) if winning_trades else 0, + "average_loss": (total_losses / len(losing_trades)) if losing_trades else 0, + "largest_win": max(t.pnl for t in winning_trades) if winning_trades else 0, + "largest_loss": min(t.pnl for t in losing_trades) if losing_trades else 0, + "profit_factor": (total_wins / abs(total_losses)) + if total_losses != 0 + else (1.0 if total_wins > 0 else 0), + } + + def get_symbol_statistics(self, symbol: str) -> Dict: + """ + Get trading statistics for a specific symbol. + + Args: + symbol: Trading symbol + + Returns: + Statistics dictionary for that symbol + """ + symbol_trades = self.get_trades_by_symbol(symbol) + if not symbol_trades: + return {"symbol": symbol, "trades": 0} + + closed_trades = [t for t in symbol_trades if t.pnl is not None] + if not closed_trades: + return {"symbol": symbol, "trades": len(symbol_trades), "closed": 0} + + winning_trades = [t for t in closed_trades if t.pnl > 0] + losing_trades = [t for t in closed_trades if t.pnl < 0] + + total_pnl = sum(t.pnl for t in closed_trades) + + return { + "symbol": symbol, + "total_trades": len(closed_trades), + "win_trades": len(winning_trades), + "loss_trades": len(losing_trades), + "win_rate": (len(winning_trades) / len(closed_trades) * 100), + "total_pnl": total_pnl, + "average_pnl_per_trade": total_pnl / len(closed_trades), + "largest_win": max(t.pnl for t in winning_trades) if winning_trades else 0, + "largest_loss": min(t.pnl for t in losing_trades) if losing_trades else 0, + } + + def get_daily_statistics(self) -> Dict[str, Dict]: + """ + Get daily P&L breakdown. + + Returns: + Dictionary mapping dates to daily statistics + """ + daily_stats = {} + + for trade in self._trades: + date_key = trade.timestamp.strftime("%Y-%m-%d") + if date_key not in daily_stats: + daily_stats[date_key] = { + "trades": 0, + "pnl": 0, + "wins": 0, + "losses": 0, + } + + daily_stats[date_key]["trades"] += 1 + if trade.pnl is not None: + daily_stats[date_key]["pnl"] += trade.pnl + if trade.pnl > 0: + daily_stats[date_key]["wins"] += 1 + else: + daily_stats[date_key]["losses"] += 1 + + return daily_stats + + def get_holding_time_statistics(self) -> Dict: + """ + Get statistics about holding times. + + Returns: + Statistics about position holding duration + """ + # Match opens and closes for each symbol + holding_times = [] + + for symbol in set(t.symbol for t in self._trades): + symbol_trades = sorted( + self.get_trades_by_symbol(symbol), key=lambda t: t.timestamp + ) + + for i in range(0, len(symbol_trades) - 1, 2): + if ( + i + 1 < len(symbol_trades) + and symbol_trades[i].action == "opened" + and symbol_trades[i + 1].action == "closed" + ): + holding_time = ( + symbol_trades[i + 1].timestamp - symbol_trades[i].timestamp + ) + holding_times.append(holding_time) + + if not holding_times: + return { + "avg_holding_time": timedelta(0), + "min_holding_time": timedelta(0), + "max_holding_time": timedelta(0), + } + + total_holding = sum(holding_times, timedelta()) + + return { + "total_positions": len(holding_times), + "avg_holding_time": total_holding / len(holding_times), + "min_holding_time": min(holding_times), + "max_holding_time": max(holding_times), + } + + # ============ Trade Analysis Section ============ + + def get_best_trades(self, limit: int = 5) -> List[TradeHistoryRecord]: + """Get the most profitable trades""" + closed_trades = [t for t in self._trades if t.pnl is not None] + closed_trades.sort(key=lambda t: t.pnl, reverse=True) + return closed_trades[:limit] + + def get_worst_trades(self, limit: int = 5) -> List[TradeHistoryRecord]: + """Get the least profitable trades""" + closed_trades = [t for t in self._trades if t.pnl is not None] + closed_trades.sort(key=lambda t: t.pnl) + return closed_trades[:limit] + + def get_trade_breakdown_by_type(self) -> Dict[str, Dict]: + """ + Get performance breakdown by trade type (LONG vs SHORT). + + Returns: + Statistics for each trade type + """ + closed_trades = [t for t in self._trades if t.pnl is not None] + + breakdown = {"LONG": {}, "SHORT": {}} + + for trade_type in ["LONG", "SHORT"]: + type_trades = [ + t for t in closed_trades if t.trade_type.upper() == trade_type + ] + + if not type_trades: + breakdown[trade_type] = { + "trades": 0, + "wins": 0, + "losses": 0, + "total_pnl": 0, + } + else: + winning = [t for t in type_trades if t.pnl > 0] + losing = [t for t in type_trades if t.pnl < 0] + + breakdown[trade_type] = { + "trades": len(type_trades), + "wins": len(winning), + "losses": len(losing), + "win_rate": (len(winning) / len(type_trades) * 100), + "total_pnl": sum(t.pnl for t in type_trades), + "average_pnl": sum(t.pnl for t in type_trades) / len(type_trades), + } + + return breakdown + + def reset(self): + """Clear all trade history""" + self._trades.clear() diff --git a/python/valuecell/agents/auto_trading_agent/trading_executor.py b/python/valuecell/agents/auto_trading_agent/trading_executor.py new file mode 100644 index 000000000..c7c07fb38 --- /dev/null +++ b/python/valuecell/agents/auto_trading_agent/trading_executor.py @@ -0,0 +1,273 @@ +"""Trading execution and position management (refactored)""" + +import logging +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from .models import ( + AutoTradingConfig, + PortfolioValueSnapshot, + Position, + PositionHistorySnapshot, + TechnicalIndicators, + TradeAction, + TradeHistoryRecord, + TradeType, +) +from .position_manager import PositionManager +from .trade_recorder import TradeRecorder + +logger = logging.getLogger(__name__) + + +class TradingExecutor: + """ + Orchestrates trade execution using specialized modules. + + This is the main facade that coordinates: + - Position management (via PositionManager) + - Trade recording (via TradeRecorder) + - Cash management (via PositionManager) + """ + + def __init__(self, config: AutoTradingConfig): + """ + Initialize trading executor. + + Args: + config: Auto trading configuration + """ + self.config = config + self.initial_capital = config.initial_capital + + # Use specialized modules + self._position_manager = PositionManager(config.initial_capital) + self._trade_recorder = TradeRecorder() + + def execute_trade( + self, + symbol: str, + action: TradeAction, + trade_type: TradeType, + indicators: TechnicalIndicators, + ) -> Optional[Dict[str, Any]]: + """ + Execute a trade (open or close position). + + Args: + symbol: Trading symbol + action: Trade action (buy/sell) + trade_type: Trade type (long/short) + indicators: Current technical indicators + + Returns: + Trade execution details or None if execution failed + """ + try: + current_price = indicators.close_price + timestamp = datetime.now(timezone.utc) + + if action == TradeAction.BUY: + return self._execute_buy(symbol, trade_type, current_price, timestamp) + elif action == TradeAction.SELL: + return self._execute_sell(symbol, trade_type, current_price, timestamp) + + return None + + except Exception as e: + logger.error(f"Failed to execute trade for {symbol}: {e}") + return None + + def _execute_buy( + self, + symbol: str, + trade_type: TradeType, + current_price: float, + timestamp: datetime, + ) -> Optional[Dict[str, Any]]: + """Open a new position""" + # Check if we already have a position + if self._position_manager.get_position(symbol) is not None: + logger.info(f"Position already exists for {symbol}, skipping") + return None + + # Check max positions limit + if self._position_manager.get_positions_count() >= self.config.max_positions: + logger.info(f"Max positions reached ({self.config.max_positions})") + return None + + # Calculate position size + available_cash = self._position_manager.get_available_cash() + risk_amount = available_cash * self.config.risk_per_trade + quantity = risk_amount / current_price + notional = quantity * current_price + + # Check if we have enough cash + if notional > available_cash: + logger.warning( + f"Insufficient cash: need ${notional:.2f}, have ${available_cash:.2f}" + ) + return None + + # Create and open position + position = Position( + symbol=symbol, + entry_price=current_price, + quantity=quantity if trade_type == TradeType.LONG else -quantity, + entry_time=timestamp, + trade_type=trade_type, + notional=notional, + ) + + if not self._position_manager.open_position(symbol, position): + return None + + # Record trade + portfolio_value = self.get_portfolio_value() + trade_record = TradeHistoryRecord( + timestamp=timestamp, + symbol=symbol, + action="opened", + trade_type=trade_type.value, + price=current_price, + quantity=abs(position.quantity), + notional=notional, + pnl=None, + portfolio_value_after=portfolio_value, + cash_after=self._position_manager.get_available_cash(), + ) + self._trade_recorder.record_trade(trade_record) + + return { + "action": "opened", + "trade_type": trade_type.value, + "symbol": symbol, + "entry_price": current_price, + "quantity": position.quantity, + "notional": notional, + "timestamp": timestamp, + } + + def _execute_sell( + self, + symbol: str, + trade_type: TradeType, + current_price: float, + timestamp: datetime, + ) -> Optional[Dict[str, Any]]: + """Close an existing position""" + # Get position + position = self._position_manager.get_position(symbol) + if position is None: + return None + + # Check if trade type matches + if position.trade_type != trade_type: + return None + + # Calculate P&L + pnl = self._position_manager.calculate_position_pnl(position, current_price) + exit_notional = abs(position.quantity) * current_price + + # Close position + self._position_manager.close_position(symbol) + self._position_manager.release_cash(position.notional, pnl) + + # Record trade + holding_time = timestamp - position.entry_time + portfolio_value = self.get_portfolio_value() + trade_record = TradeHistoryRecord( + timestamp=timestamp, + symbol=symbol, + action="closed", + trade_type=trade_type.value, + price=current_price, + quantity=abs(position.quantity), + notional=exit_notional, + pnl=pnl, + portfolio_value_after=portfolio_value, + cash_after=self._position_manager.get_available_cash(), + ) + self._trade_recorder.record_trade(trade_record) + + return { + "action": "closed", + "trade_type": trade_type.value, + "symbol": symbol, + "entry_price": position.entry_price, + "exit_price": current_price, + "quantity": position.quantity, + "entry_notional": position.notional, + "exit_notional": exit_notional, + "pnl": pnl, + "holding_time": holding_time, + "timestamp": timestamp, + } + + # ============ Portfolio Queries ============ + + def get_portfolio_value(self) -> float: + """Get total portfolio value""" + total_value, _, _ = self._position_manager.calculate_portfolio_value() + return total_value + + def get_portfolio_summary(self) -> Dict: + """Get complete portfolio summary""" + return self._position_manager.get_portfolio_summary() + + def get_current_capital(self) -> float: + """Get available cash""" + return self._position_manager.get_available_cash() + + @property + def current_capital(self) -> float: + """Property for backward compatibility""" + return self._position_manager.get_available_cash() + + @property + def positions(self) -> Dict[str, Position]: + """Property for backward compatibility""" + return self._position_manager.get_all_positions() + + # ============ History Management ============ + + def snapshot_positions(self, timestamp: datetime): + """Take a snapshot of all positions""" + self._position_manager.snapshot_positions(timestamp) + + def snapshot_portfolio(self, timestamp: datetime): + """Take a snapshot of portfolio value""" + self._position_manager.snapshot_portfolio(timestamp) + + def get_trade_history(self) -> List[TradeHistoryRecord]: + """Get all trade history""" + return self._trade_recorder.get_all_trades() + + def get_position_history(self) -> List[PositionHistorySnapshot]: + """Get all position snapshots""" + return self._position_manager.get_position_history() + + def get_portfolio_history(self) -> List[PortfolioValueSnapshot]: + """Get all portfolio snapshots""" + return self._position_manager.get_portfolio_history() + + # ============ Statistics ============ + + def get_trade_statistics(self) -> Dict: + """Get trading statistics""" + return self._trade_recorder.get_trade_statistics() + + def get_symbol_statistics(self, symbol: str) -> Dict: + """Get statistics for a symbol""" + return self._trade_recorder.get_symbol_statistics(symbol) + + def get_daily_statistics(self) -> Dict[str, Dict]: + """Get daily P&L breakdown""" + return self._trade_recorder.get_daily_statistics() + + # ============ Management ============ + + def reset(self, initial_capital: float): + """Reset executor state""" + self._position_manager.reset(initial_capital) + self._trade_recorder.reset() diff --git a/python/valuecell/agents/tests/__init__.py b/python/valuecell/agents/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/valuecell/core/types.py b/python/valuecell/core/types.py index 9f748be8c..22bb716c8 100644 --- a/python/valuecell/core/types.py +++ b/python/valuecell/core/types.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import AsyncGenerator, Callable, Dict, Literal, Optional, Union +from typing import AsyncGenerator, Callable, Dict, List, Literal, Optional, Union from a2a.types import Task, TaskArtifactUpdateEvent, TaskStatusUpdateEvent from pydantic import BaseModel, Field @@ -147,6 +147,8 @@ class ComponentType(str, Enum): REPORT = "report" PROFILE = "profile" + FILTERED_LINE_CHART = "filtered_line_chart" + FILTERED_CARD_PUSH_NOTIFICATION = "filtered_card_push_notification" class ReportComponentData(BaseModel): @@ -162,6 +164,47 @@ class ReportComponentData(BaseModel): ) +class FilteredLineChartComponentData(BaseModel): + """Filtered line chart component data payload. + Data format: + [ + ['x_axis_name', 'value1', 'value2', 'value3', 'value4'], + ['timestamp1', value1, value2, value3, value4], + ['timestamp2', value1, value2, value3, value4], + ['timestamp3', value1, value2, value3, value4], + ] + """ + + title: str = Field( + ..., + description="The line chart title, used by UI to display the line chart title", + ) + data: str = Field( + ..., + description="The line chart data, format: [['x_axis_value', 'value1', 'value2', 'value3', 'value4'], ['x_axis_value', value1, value2, value3, value4], ...]", + ) + create_time: str = Field( + ..., + description="The line chart create time, UTC time, YYYY-MM-DD HH:MM:SS format", + ) + + +class FilteredCardPushNotificationComponentData(BaseModel): + """Filtered card push notification component data payload.""" + + title: str = Field( + ..., + description="The card push notification title, used by UI to display the card push notification title", + ) + data: str = Field(..., description="The card push notification data") + filters: List[str] = Field(..., description="The card push notification filters") + table_title: str = Field(..., description="The card push notification table title") + create_time: str = Field( + ..., + description="The card push notification create time, UTC time, YYYY-MM-DD HH:MM:SS format", + ) + + ResponsePayload = Union[ BaseResponseDataPayload, ComponentGeneratorResponseDataPayload,