diff --git a/.env.example b/.env.example index a1d8253a..de7651ed 100644 --- a/.env.example +++ b/.env.example @@ -1,30 +1,69 @@ -# Binance API Configuration +# ============================================================================= +# ELVIS Trading Bot — Environment Variables +# ============================================================================= +# Copy this file to .env and fill in real values. +# NEVER commit .env to version control. +# ============================================================================= + +# --- Binance API --- BINANCE_API_KEY=your_binance_api_key_here BINANCE_API_SECRET=your_binance_api_secret_here +BINANCE_FUTURES_TESTNET_API_KEY=your_futures_testnet_api_key_here +BINANCE_FUTURES_TESTNET_API_SECRET=your_futures_testnet_api_secret_here + +# --- HashiCorp Vault (ISSUE #10) --- +# REQUIRED: The bot will refuse to start if VAULT_TOKEN is not set. +VAULT_ADDR=http://127.0.0.1:8200 +VAULT_TOKEN=your_vault_token_here + +# --- PostgreSQL Database (ISSUE #11) --- +# REQUIRED: The bot will refuse to start if DB_PASSWORD is not set. +DB_HOST=localhost +DB_PORT=5432 +DB_USER=elvis_user +DB_PASSWORD=your_secure_db_password_here +DB_NAME=elvis_trading + +# --- Flask API Authentication (ISSUE #13) --- +# REQUIRED: All API requests must include header: X-API-Key: +# The /health endpoint is exempt from authentication. +API_KEY=your_random_api_key_here -# Telegram Bot Configuration (optional) +# --- Telegram Bot (optional) --- TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here TELEGRAM_CHAT_ID=your_telegram_chat_id_here -# Redis Configuration (for caching) +# --- Redis (optional) --- REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=0 REDIS_PASSWORD= -# Database Configuration (if using) -POSTGRES_HOST=localhost -POSTGRES_PORT=5432 -POSTGRES_USER=elvis_user -POSTGRES_PASSWORD=elvis_password -POSTGRES_DBNAME=elvis_trading - -# Trading Configuration +# --- Trading Configuration --- TRADING_MODE=paper # paper or live MAX_POSITION_SIZE=0.1 MAX_DAILY_TRADES=5 RISK_PER_TRADE=0.02 -# Monitoring +# --- Leverage Safety (ISSUE #14) --- +# Default is 3x. Set higher only with full understanding of liquidation risk. +DEFAULT_LEVERAGE=3 +# Uncomment the line below ONLY if you explicitly need leverage > 10x. +# OVERRIDE_HIGH_LEVERAGE=true + +# --- Binance Rate Limiting (ISSUE #12) --- +# Tune these only if you know what you're doing. +BINANCE_MAX_RETRIES=5 +BINANCE_RETRY_MIN_WAIT=1 +BINANCE_RETRY_MAX_WAIT=60 +BINANCE_RATE_LIMIT_WARN_FRACTION=0.80 +BINANCE_WEIGHT_LIMIT_PER_MIN=1200 +BINANCE_ORDER_LIMIT_PER_SEC=10 +BINANCE_RATE_LIMIT_PAUSE_SECONDS=5.0 + +# --- Kill-Switch / Redis Persistence (ISSUE #15) --- +# REDIS_HOST / REDIS_PORT already defined above; used for kill-switch persistence too. + +# --- Monitoring --- PROMETHEUS_PUSHGATEWAY_URL=http://localhost:9091 GRAFANA_API_KEY=your_grafana_api_key_here diff --git a/config/config.py b/config/config.py index 4e4e4c0c..16a9f94b 100644 --- a/config/config.py +++ b/config/config.py @@ -38,7 +38,9 @@ def BINANCE_FUTURES_TESTNET_API_SECRET(self): 'TAKE_PROFIT_PCT': 0.02, # Added to fix the current error; adjust as needed 'LEVERAGE_MAX': 125, # Maximum leverage for futures 'LEVERAGE_MIN': 1, # Minimum leverage for futures - 'DEFAULT_LEVERAGE': 100, # Default leverage for maximum trading power + # Issue #14: Default leverage reduced from 100x to 3x to prevent catastrophic + # losses on startup. Override via DEFAULT_LEVERAGE env var (integer). + 'DEFAULT_LEVERAGE': int(os.getenv('DEFAULT_LEVERAGE', '3')), 'MAX_TRADES_PER_DAY': 10, # Added to fix MAX_TRADES_PER_DAY error 'DAILY_PROFIT_TARGET_USD': 100, # Added to fix DAILY_PROFIT_TARGET_USD error 'DAILY_LOSS_LIMIT_USD': 100, # Added to fix DAILY_LOSS_LIMIT_USD error @@ -59,14 +61,23 @@ def BINANCE_FUTURES_TESTNET_API_SECRET(self): 'LOG_TO_FILE': True, } +# ISSUE #11 FIX: Removed hardcoded Postgres password 'elvis_password'. +# Database credentials must never be hardcoded in source code. +# Set DB_PASSWORD (and optionally DB_HOST, DB_PORT, DB_USER, DB_NAME) as environment variables. POSTGRES_CONFIG = { - 'HOST': 'localhost', - 'PORT': 5432, - 'USER': 'elvis_user', - 'PASSWORD': 'elvis_password', - 'DBNAME': 'elvis_trading' + 'HOST': os.getenv('DB_HOST', 'localhost'), + 'PORT': int(os.getenv('DB_PORT', '5432')), + 'USER': os.getenv('DB_USER', 'elvis_user'), + 'PASSWORD': os.getenv('DB_PASSWORD'), # Required — no default; must be set explicitly + 'DBNAME': os.getenv('DB_NAME', 'elvis_trading'), } +if not POSTGRES_CONFIG['PASSWORD']: + raise EnvironmentError( + "DB_PASSWORD environment variable is not set. " + "Export it before starting: export DB_PASSWORD=" + ) + @@ -92,3 +103,64 @@ def BINANCE_FUTURES_TESTNET_API_SECRET(self): 'MAX_CONCURRENT_PAIRS': 3, # Maximum pairs to trade simultaneously } + + +# --------------------------------------------------------------------------- +# Issue #14: Leverage safety validation +# Call validate_leverage_config() before starting the trading engine. +# --------------------------------------------------------------------------- +import logging as _logging + +_leverage_logger = _logging.getLogger(__name__) + +def validate_leverage_config(leverage: int = None) -> int: + """ + Validate the configured leverage and refuse to start if it is dangerously + high without an explicit operator override. + + Rules: + - leverage > 10x requires OVERRIDE_HIGH_LEVERAGE=true in the environment. + - leverage > 5x emits a WARNING log on every startup. + - leverage <= 0 raises ValueError immediately. + + Args: + leverage: The leverage value to validate. Defaults to + TRADING_CONFIG['DEFAULT_LEVERAGE']. + + Returns: + The validated leverage value (int). + + Raises: + ValueError: If leverage <= 0 or an invalid value is supplied. + EnvironmentError: If leverage > 10x and OVERRIDE_HIGH_LEVERAGE != 'true'. + """ + if leverage is None: + leverage = TRADING_CONFIG['DEFAULT_LEVERAGE'] + + leverage = int(leverage) + + if leverage <= 0: + raise ValueError(f"Leverage must be a positive integer, got {leverage}.") + + if leverage > 5: + _leverage_logger.warning( + "⚠️ Leverage is set to %dx which is above the recommended maximum of 5x. " + "High leverage significantly increases liquidation risk.", + leverage, + ) + + if leverage > 10: + override = os.getenv("OVERRIDE_HIGH_LEVERAGE", "false").strip().lower() + if override != "true": + raise EnvironmentError( + f"Leverage {leverage}x exceeds the 10x safety limit. " + "Set OVERRIDE_HIGH_LEVERAGE=true in your environment to acknowledge " + "the risk and allow startup." + ) + _leverage_logger.warning( + "🚨 OVERRIDE_HIGH_LEVERAGE=true detected — starting with %dx leverage. " + "Ensure you understand the liquidation risks.", + leverage, + ) + + return leverage diff --git a/main.py b/main.py index 90bf9770..b28ecdbe 100644 --- a/main.py +++ b/main.py @@ -15,7 +15,13 @@ if not os.getenv('VAULT_ADDR'): os.environ['VAULT_ADDR'] = 'http://127.0.0.1:8200' if not os.getenv('VAULT_TOKEN'): - os.environ['VAULT_TOKEN'] = 'trading-bot-token' + # ISSUE #10 FIX: Removed hardcoded Vault token 'trading-bot-token'. + # Hardcoded secrets in source code expose credentials to anyone with repo access. + # VAULT_TOKEN must be set as an environment variable before starting the bot. + raise EnvironmentError( + "VAULT_TOKEN environment variable is not set. " + "Export it before starting: export VAULT_TOKEN=" + ) from core.bootstrap import bootstrap_application from core.di import container diff --git a/requirements.txt b/requirements.txt index 0991e406..291047fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ +# Retry / back-off (Issue #12 — Binance rate limiting) +tenacity + # Core dependencies numpy pandas diff --git a/trading/execution/binance_executor.py b/trading/execution/binance_executor.py index 040b5f43..f2c1aef2 100644 --- a/trading/execution/binance_executor.py +++ b/trading/execution/binance_executor.py @@ -18,17 +18,28 @@ from trading.fees.binance_fee_calculator import BinanceFeeCalculator from datetime import datetime +# Issue #12: Import rate-limit utilities (retry decorator + header checker). +from utils.binance_rate_limiter import binance_retry, check_rate_limit_headers + class BinanceExecutor(BaseExecutor): - def __init__(self, logger: logging.Logger = None, api_key: str = None, api_secret: str = None, is_testnet: bool = False, use_futures: bool = False, default_leverage: int = 100, **kwargs): + # Issue #14: Default leverage reduced from 100x to 3x. Callers may pass an + # explicit value, which is validated by validate_leverage_config() below. + def __init__(self, logger: logging.Logger = None, api_key: str = None, api_secret: str = None, is_testnet: bool = False, use_futures: bool = False, default_leverage: int = None, **kwargs): super().__init__(logger, **kwargs) self.client = None self.api_key = api_key self.api_secret = api_secret self.is_testnet = is_testnet self.use_futures = use_futures - self.default_leverage = default_leverage self.fee_calculator = BinanceFeeCalculator(logger) self.db_available = False + + # Issue #14: Validate leverage before storing it. Imports here to + # avoid circular-import issues at module load time. + from config.config import validate_leverage_config, TRADING_CONFIG + resolved_leverage = default_leverage if default_leverage is not None else TRADING_CONFIG['DEFAULT_LEVERAGE'] + self.default_leverage = validate_leverage_config(resolved_leverage) + if is_testnet: self._init_paper_trading_db() @@ -70,19 +81,30 @@ def initialize(self) -> bool: self.logger.error(f"Failed to initialize BinanceExecutor: {e}") return False # Failed due to other error + # Issue #12: Wrap each live Binance API call with @binance_retry so that + # transient failures (network blips, 429 rate-limit responses) are retried + # with exponential back-off instead of failing immediately. + def get_balance(self) -> Dict[str, float]: if self.client is None or (self.is_testnet and not self.use_futures): return self._calculate_paper_balance() try: if FUTURES_AVAILABLE and isinstance(self.client, UMFutures): - account = self.client.balance() + @binance_retry + def _fetch(): + account = self.client.balance() + account_info = self.client.account() + return account, account_info + account, account_info = _fetch() balances = {item['asset']: float(item['balance']) for item in account if float(item['balance']) > 0} - account_info = self.client.account() wallet_balance = float(account_info['totalWalletBalance']) self.logger.info(f"Futures account - Wallet Balance: ${wallet_balance:.2f}") return {'USDT': wallet_balance, **balances} else: - account = self.client.get_account() + @binance_retry + def _fetch(): + return self.client.get_account() + account = _fetch() return {item['asset']: float(item['free']) for item in account['balances']} except (ClientError if FUTURES_AVAILABLE else BinanceAPIException) as e: self.logger.error(f"Error getting balance: {e}") @@ -92,7 +114,10 @@ def get_position(self, symbol: str) -> Dict[str, Any]: if self.client is None or not self.use_futures: return {} try: - positions = self.client.get_position_risk(symbol=symbol) + @binance_retry + def _fetch(): + return self.client.get_position_risk(symbol=symbol) + positions = _fetch() return positions[0] if positions else {} except (ClientError if FUTURES_AVAILABLE else BinanceAPIException) as e: self.logger.error(f"Error getting position for {symbol}: {e}") @@ -103,9 +128,15 @@ def get_current_price(self, symbol: str) -> float: return self._get_mock_price(symbol) try: if self.use_futures: - return float(self.client.ticker_price(symbol=symbol)['price']) + @binance_retry + def _fetch(): + return self.client.ticker_price(symbol=symbol) + return float(_fetch()['price']) else: - return float(self.client.get_symbol_ticker(symbol=symbol)['price']) + @binance_retry + def _fetch(): + return self.client.get_symbol_ticker(symbol=symbol) + return float(_fetch()['price']) except (ClientError if FUTURES_AVAILABLE else BinanceAPIException) as e: self.logger.error(f"Error getting current price for {symbol}: {e}") return 0.0 @@ -115,7 +146,10 @@ def set_leverage(self, symbol: str, leverage: int) -> None: self.logger.info(f"Paper trading: Leverage set to {leverage}x for {symbol}") return try: - self.client.change_leverage(symbol=symbol, leverage=leverage) + @binance_retry + def _set(): + return self.client.change_leverage(symbol=symbol, leverage=leverage) + _set() self.logger.info(f"Leverage for {symbol} set to {leverage}x.") except BinanceAPIException as e: self.logger.error(f"Error setting leverage for {symbol}: {e}") diff --git a/trading/utils/trade_history_api.py b/trading/utils/trade_history_api.py index 85b90c6b..f59a6f49 100644 --- a/trading/utils/trade_history_api.py +++ b/trading/utils/trade_history_api.py @@ -23,6 +23,32 @@ app = Flask(__name__) CORS(app) +# --------------------------------------------------------------------------- +# ISSUE #13 FIX: API Key Authentication +# The Flask API was publicly accessible on 0.0.0.0:5050 with no authentication. +# Any process on the network could read trade data or trigger actions. +# Now every request (except /health) must supply the correct API key via header: +# X-API-Key: +# Set the API_KEY environment variable before starting the bot. +# --------------------------------------------------------------------------- +_API_KEY = os.getenv('API_KEY') + +@app.before_request +def require_api_key(): + """Reject requests that do not present the correct API key header.""" + # Health check is exempt so load balancers / Docker health checks still work + from flask import request, jsonify + if request.path == '/health': + return # exempt from auth + + if not _API_KEY: + # Fail closed: if no API_KEY is configured, block all requests + return jsonify({"error": "API authentication not configured on server"}), 503 + + provided = request.headers.get('X-API-Key', '') + if provided != _API_KEY: + return jsonify({"error": "Unauthorized — provide a valid X-API-Key header"}), 401 + # Initialize Prometheus if available if HAS_PROMETHEUS: metrics = PrometheusMetrics(app) @@ -471,13 +497,270 @@ def dashboard(): except Exception as e: return jsonify({"error": f"Dashboard not found: {str(e)}"}), 404 +# --------------------------------------------------------------------------- +# ISSUE #15 FIX (updated): Kill-switch / Emergency Stop — Redis persistence +# +# The kill-switch state is now stored in Redis under the key +# 'ELVIS_KILL_SWITCH' so that it survives process restarts. +# +# Activate : redis.set('ELVIS_KILL_SWITCH', '1') +# Deactivate: redis.delete('ELVIS_KILL_SWITCH') +# Query : redis.get('ELVIS_KILL_SWITCH') == '1' +# +# The in-memory flag (KILL_SWITCH_ACTIVE) is kept as a local cache so that +# high-frequency callers (the trading loop) don't hammer Redis on every tick. +# The flag is synchronised with Redis on every REST call that changes state, +# and also on every read of /emergency_stop/status. +# +# If Redis is unavailable the module falls back gracefully to in-memory only +# behaviour (exactly as Sprint 1) and logs a warning. +# --------------------------------------------------------------------------- +import datetime as _datetime +import os as _os + +# Redis key used to persist the kill-switch state. +_REDIS_KILL_SWITCH_KEY = "ELVIS_KILL_SWITCH" + +def _get_redis_client(): + """ + Return a Redis client connected to the configured server, or None if + Redis is unavailable. Connection parameters come from env vars so that + no credentials are hardcoded. + """ + try: + import redis as _redis + client = _redis.Redis( + host=_os.getenv("REDIS_HOST", "localhost"), + port=int(_os.getenv("REDIS_PORT", "6379")), + db=int(_os.getenv("REDIS_DB", "0")), + password=_os.getenv("REDIS_PASSWORD") or None, + decode_responses=True, + socket_timeout=2, + socket_connect_timeout=2, + ) + client.ping() # Verify connection. + return client + except Exception as _e: + logger.warning( + "Kill-switch: Redis unavailable (%s). Falling back to in-memory state.", _e + ) + return None + + +def _redis_set_kill_switch(active: bool) -> bool: + """ + Persist kill-switch state to Redis. + + Returns True if Redis write succeeded, False if Redis was unreachable + (in which case only the in-memory flag is updated). + """ + client = _get_redis_client() + if client is None: + return False + try: + if active: + client.set(_REDIS_KILL_SWITCH_KEY, "1") + else: + client.delete(_REDIS_KILL_SWITCH_KEY) + return True + except Exception as _e: + logger.error("Failed to update kill-switch in Redis: %s", _e) + return False + + +def _redis_get_kill_switch() -> bool: + """ + Read the kill-switch state from Redis. + + Returns the Redis value if available, otherwise falls back to the + in-memory flag so the bot stays safe during a Redis outage. + """ + client = _get_redis_client() + if client is None: + return KILL_SWITCH_ACTIVE # Fallback: trust in-memory state. + try: + return client.get(_REDIS_KILL_SWITCH_KEY) == "1" + except Exception as _e: + logger.error("Failed to read kill-switch from Redis: %s", _e) + return KILL_SWITCH_ACTIVE + + +# In-memory cache of the kill-switch state (avoids a Redis round-trip on +# every call to is_trading_halted() in the hot trading loop). +KILL_SWITCH_ACTIVE = False # In-memory flag — synced with Redis on state changes +_kill_switch_activated_at = None # Timestamp when the stop was triggered +_kill_switch_activated_by = None # Remote IP that triggered it (for audit) + +# On startup: check Redis and restore any previously persisted kill-switch. +_startup_state = _redis_get_kill_switch() +if _startup_state: + KILL_SWITCH_ACTIVE = True + logger.critical( + "🚨 KILL-SWITCH was ACTIVE in Redis from a previous session. " + "Trading is HALTED. POST DELETE /emergency_stop to clear." + ) + + +def is_trading_halted() -> bool: + """ + Public helper: return True if the emergency kill-switch is active. + + Uses the in-memory flag for speed; the flag is kept in sync with Redis + whenever the state is changed via the REST endpoints. + """ + return KILL_SWITCH_ACTIVE + +@app.route('/emergency_stop', methods=['POST']) +def emergency_stop(): + """ + Activate the kill-switch to halt all trading immediately. + + Persists state to Redis (key 'ELVIS_KILL_SWITCH' = '1') so it survives + process restarts. Falls back to in-memory only if Redis is unavailable. + + Expected request body (JSON, optional): + { "reason": "string describing why the stop was triggered" } + + Returns: + 200 — kill-switch activated + 409 — kill-switch was already active + """ + global KILL_SWITCH_ACTIVE, _kill_switch_activated_at, _kill_switch_activated_by + + from flask import request, jsonify + + reason = "No reason provided" + try: + body = request.get_json(silent=True) or {} + reason = body.get("reason", reason) + except Exception: + pass + + if KILL_SWITCH_ACTIVE: + return jsonify({ + "status": "already_stopped", + "kill_switch_active": True, + "activated_at": _kill_switch_activated_at, + "activated_by": _kill_switch_activated_by, + }), 409 + + KILL_SWITCH_ACTIVE = True + _kill_switch_activated_at = _datetime.datetime.utcnow().isoformat() + "Z" + _kill_switch_activated_by = request.remote_addr + + # Persist to Redis so the kill-switch survives a process restart. + redis_ok = _redis_set_kill_switch(True) + + logger.critical( + "🚨 KILL-SWITCH ACTIVATED by %s at %s. Reason: %s (Redis persisted: %s)", + _kill_switch_activated_by, _kill_switch_activated_at, reason, redis_ok, + ) + + return jsonify({ + "status": "stopped", + "kill_switch_active": True, + "activated_at": _kill_switch_activated_at, + "activated_by": _kill_switch_activated_by, + "reason": reason, + "redis_persisted": redis_ok, + "message": "Emergency stop activated — all trading halted.", + }), 200 + + +@app.route('/emergency_stop', methods=['DELETE']) +def reset_kill_switch(): + """ + Deactivate the kill-switch (allow trading to resume after human review). + + Removes the 'ELVIS_KILL_SWITCH' key from Redis so the cleared state + also persists across restarts. + + Returns: + 200 — kill-switch cleared + 409 — kill-switch was not active + """ + global KILL_SWITCH_ACTIVE, _kill_switch_activated_at, _kill_switch_activated_by + + from flask import request, jsonify + + if not KILL_SWITCH_ACTIVE: + return jsonify({ + "status": "not_active", + "kill_switch_active": False, + }), 409 + + KILL_SWITCH_ACTIVE = False + # Remove from Redis so the cleared state persists across restarts. + redis_ok = _redis_set_kill_switch(False) + + logger.warning( + "✅ Kill-switch CLEARED by %s at %s (Redis updated: %s)", + request.remote_addr, + _datetime.datetime.utcnow().isoformat() + "Z", + redis_ok, + ) + _kill_switch_activated_at = None + _kill_switch_activated_by = None + + return jsonify({ + "status": "cleared", + "kill_switch_active": False, + "redis_updated": redis_ok, + "message": "Kill-switch cleared — trading may resume.", + }), 200 + + +@app.route('/emergency_stop/status', methods=['GET']) +def kill_switch_status(): + """ + Return the current kill-switch state sourced directly from Redis. + + Syncs the in-memory flag with the Redis value on every call so that + external tools (e.g. monitoring) always see an accurate picture even if + Redis was updated by another process. + """ + global KILL_SWITCH_ACTIVE + from flask import jsonify + + # Re-read from Redis on every status check (low-frequency endpoint). + redis_state = _redis_get_kill_switch() + KILL_SWITCH_ACTIVE = redis_state # Keep in-memory flag in sync. + + return jsonify({ + "kill_switch_active": KILL_SWITCH_ACTIVE, + "activated_at": _kill_switch_activated_at, + "activated_by": _kill_switch_activated_by, + "source": "redis", + }), 200 + + +# RESTful hyphen aliases — /emergency-stop mirrors /emergency_stop +@app.route('/emergency-stop', methods=['POST']) +def emergency_stop_hyphen(): + """Alias for POST /emergency_stop (hyphen variant per Issue #15 spec).""" + return emergency_stop() + + +@app.route('/emergency-stop', methods=['DELETE']) +def reset_kill_switch_hyphen(): + """Alias for DELETE /emergency_stop (hyphen variant).""" + return reset_kill_switch() + + +@app.route('/emergency-stop/status', methods=['GET']) +def kill_switch_status_hyphen(): + """Alias for GET /emergency_stop/status (hyphen variant).""" + return kill_switch_status() + + @app.route('/health', methods=['GET']) def health(): """Health check endpoint for Docker container""" return jsonify({ "status": "healthy", "service": "trade_history_api", - "version": "1.0.0" + "version": "1.0.0", + "kill_switch_active": KILL_SWITCH_ACTIVE, }) # NEW: Create a function that can be called externally diff --git a/utils/binance_rate_limiter.py b/utils/binance_rate_limiter.py new file mode 100644 index 00000000..65881214 --- /dev/null +++ b/utils/binance_rate_limiter.py @@ -0,0 +1,246 @@ +""" +utils/binance_rate_limiter.py — Issue #12: Binance API Rate Limiting + +Provides a retry decorator (using tenacity) with exponential backoff for all +Binance API calls, respects Binance rate limits, logs rate-limit response +headers, and auto-pauses the caller when the limit window is nearly exhausted. + +Binance documented limits (USDⓈ-M Futures): + - 1200 request-weight per minute (HTTP header: X-MBX-USED-WEIGHT-1M) + - 10 orders per second (HTTP header: X-MBX-ORDER-COUNT-1S) + +Usage: + from utils.binance_rate_limiter import binance_retry, check_rate_limit_headers + + @binance_retry + def my_binance_call(): + return client.some_endpoint() +""" + +import logging +import time +import functools +from typing import Callable, Any, Optional + +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, + before_sleep_log, + RetryCallState, +) + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Rate-limit thresholds (env-overridable via os.getenv, but sane defaults) +# --------------------------------------------------------------------------- +import os + +# Warn / pause when used-weight exceeds this fraction of the per-minute limit. +_WEIGHT_WARN_FRACTION = float(os.getenv("BINANCE_RATE_LIMIT_WARN_FRACTION", "0.80")) +_WEIGHT_LIMIT_PER_MIN = int(os.getenv("BINANCE_WEIGHT_LIMIT_PER_MIN", "1200")) +_ORDER_LIMIT_PER_SEC = int(os.getenv("BINANCE_ORDER_LIMIT_PER_SEC", "10")) + +# Seconds to sleep when we detect the window is nearly full (>80 % weight used). +_RATE_LIMIT_PAUSE_SECONDS = float(os.getenv("BINANCE_RATE_LIMIT_PAUSE_SECONDS", "5.0")) + + +# --------------------------------------------------------------------------- +# Exceptions that should trigger a retry +# --------------------------------------------------------------------------- +try: + from binance.error import ClientError as BinanceFuturesClientError +except ImportError: + BinanceFuturesClientError = None # futures connector not installed + +try: + from binance.exceptions import BinanceAPIException, BinanceRequestException +except ImportError: + BinanceAPIException = Exception + BinanceRequestException = Exception + +# Build the tuple of retriable exception types dynamically. +_RETRIABLE_EXCEPTIONS = [BinanceAPIException, BinanceRequestException, ConnectionError, TimeoutError] +if BinanceFuturesClientError is not None: + _RETRIABLE_EXCEPTIONS.append(BinanceFuturesClientError) + +_RETRIABLE_EXCEPTIONS = tuple(set(_RETRIABLE_EXCEPTIONS)) + + +def _is_retriable(exc: BaseException) -> bool: + """ + Decide whether an exception should trigger a retry. + + We skip retry for permanent client errors (4xx) that are not rate-limit + responses (HTTP 429 / 418) because retrying them is pointless. + """ + # Always retry network-level errors. + if isinstance(exc, (ConnectionError, TimeoutError)): + return True + + # Binance futures ClientError carries an HTTP status code. + if BinanceFuturesClientError is not None and isinstance(exc, BinanceFuturesClientError): + status = getattr(exc, "status_code", None) + if status in (429, 418): + return True # Rate limited — definitely retry. + if status and 400 <= status < 500: + return False # Other 4xx (bad request etc.) — don't retry. + return True # 5xx or unknown — retry. + + # python-binance BinanceAPIException. + if isinstance(exc, BinanceAPIException): + code = getattr(exc, "status_code", None) + if code in (429, 418): + return True + if code and 400 <= code < 500: + return False + return True + + # Any other exception in the retriable set: retry. + return isinstance(exc, _RETRIABLE_EXCEPTIONS) + + +def _log_retry(retry_state: RetryCallState) -> None: + """Called by tenacity before each sleep between retries.""" + exc = retry_state.outcome.exception() if retry_state.outcome else None + attempt = retry_state.attempt_number + sleep_time = getattr(retry_state.next_action, "sleep", "?") + logger.warning( + "Binance API call failed (attempt %d). Retrying in %.1fs. Error: %s", + attempt, + sleep_time, + exc, + ) + + +# --------------------------------------------------------------------------- +# The core retry decorator +# --------------------------------------------------------------------------- +binance_retry = retry( + reraise=True, + retry=retry_if_exception_type(_RETRIABLE_EXCEPTIONS), + stop=stop_after_attempt(int(os.getenv("BINANCE_MAX_RETRIES", "5"))), + wait=wait_exponential( + multiplier=float(os.getenv("BINANCE_RETRY_MULTIPLIER", "1")), + min=float(os.getenv("BINANCE_RETRY_MIN_WAIT", "1")), + max=float(os.getenv("BINANCE_RETRY_MAX_WAIT", "60")), + ), + before_sleep=_log_retry, +) +""" +Decorator: wrap any Binance API call with automatic exponential-backoff retry. + + Attempts : 5 (BINANCE_MAX_RETRIES env var) + Back-off : 1 s → 2 s → 4 s → … up to 60 s (BINANCE_RETRY_MIN/MAX_WAIT) + Multiplier: 1 (BINANCE_RETRY_MULTIPLIER) + +Example:: + + @binance_retry + def fetch_klines(client, symbol): + return client.klines(symbol=symbol, interval="1m") +""" + + +# --------------------------------------------------------------------------- +# Response-header inspection helpers +# --------------------------------------------------------------------------- + +def check_rate_limit_headers(response_headers: dict) -> None: + """ + Inspect Binance HTTP response headers and log rate-limit usage. + + Pauses execution (_RATE_LIMIT_PAUSE_SECONDS) when used weight exceeds + _WEIGHT_WARN_FRACTION of _WEIGHT_LIMIT_PER_MIN, giving the window time + to roll over. + + Args: + response_headers: The raw headers dict from a requests.Response or + equivalent mapping (keys are case-insensitive). + """ + # Normalise header keys to lower-case for consistent lookup. + headers = {k.lower(): v for k, v in (response_headers or {}).items()} + + used_weight = _parse_int(headers.get("x-mbx-used-weight-1m")) + order_count_1s = _parse_int(headers.get("x-mbx-order-count-1s")) + order_count_1d = _parse_int(headers.get("x-mbx-order-count-1d")) + retry_after = _parse_int(headers.get("retry-after")) + + # Always log current utilisation at DEBUG level. + if used_weight is not None: + utilisation_pct = (used_weight / _WEIGHT_LIMIT_PER_MIN) * 100 + logger.debug( + "Binance rate-limit — weight used: %d / %d (%.1f%%)", + used_weight, _WEIGHT_LIMIT_PER_MIN, utilisation_pct, + ) + + if order_count_1s is not None: + logger.debug("Binance order count (1 s): %d / %d", order_count_1s, _ORDER_LIMIT_PER_SEC) + + if order_count_1d is not None: + logger.debug("Binance order count (1 day): %d", order_count_1d) + + # Warn and pause when approaching the weight limit. + warn_threshold = _WEIGHT_WARN_FRACTION * _WEIGHT_LIMIT_PER_MIN + if used_weight is not None and used_weight >= warn_threshold: + logger.warning( + "⚠️ Binance rate-limit approaching: %d / %d weight used (%.0f%%). " + "Pausing %.1f s to avoid HTTP 429.", + used_weight, _WEIGHT_LIMIT_PER_MIN, + (used_weight / _WEIGHT_LIMIT_PER_MIN) * 100, + _RATE_LIMIT_PAUSE_SECONDS, + ) + time.sleep(_RATE_LIMIT_PAUSE_SECONDS) + + # If Binance told us explicitly to back off, honour it. + if retry_after is not None and retry_after > 0: + logger.warning( + "🛑 Binance returned Retry-After: %d s. Pausing to comply.", retry_after + ) + time.sleep(retry_after) + + +def rate_limited_call(api_func: Callable, *args, response_attr: str = "headers", **kwargs) -> Any: + """ + Call *api_func* with retry logic and automatic rate-limit header checking. + + This is a convenience wrapper for callers that have access to a raw + requests.Response object. If the library returns something other than a + Response (e.g. a dict), header checking is skipped gracefully. + + Args: + api_func : The Binance SDK method to call. + *args : Positional arguments forwarded to api_func. + response_attr: Attribute name on the result that holds headers + (default "headers"; only used if it exists). + **kwargs : Keyword arguments forwarded to api_func. + + Returns: + The return value of api_func. + """ + @binance_retry + def _call(): + result = api_func(*args, **kwargs) + # Check headers if accessible (raw requests.Response objects). + raw_headers = getattr(result, response_attr, None) + if isinstance(raw_headers, dict): + check_rate_limit_headers(raw_headers) + return result + + return _call() + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _parse_int(value: Optional[str]) -> Optional[int]: + """Safely parse a string header value to int.""" + if value is None: + return None + try: + return int(value) + except (ValueError, TypeError): + return None