From ec2784f2121ee225fb827c562edf1c726e97fbc4 Mon Sep 17 00:00:00 2001 From: maddefientist Date: Thu, 29 Jan 2026 13:40:54 -0800 Subject: [PATCH 1/2] fix: escape JSON properly in Discord release notification *slaps roof of jq* This bad boy can fit so many escaped quotes in it. The changelog content was breaking JSON - now using jq for proper escaping. --- .github/workflows/release.yml | 45 ++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5ae2a3c..7b8251d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -82,21 +82,34 @@ jobs: DISCORD_WEBHOOK: ${{ secrets.DISCORD_RELEASE_WEBHOOK }} run: | VERSION="${{ steps.version.outputs.VERSION }}" - CHANGELOG=$(echo '${{ steps.changelog.outputs.CHANGELOG }}' | head -c 1000) - - curl -H "Content-Type: application/json" \ - -d "{ - \"embeds\": [{ - \"title\": \"๐Ÿš€ SlopeSniper v${VERSION} Released!\", - \"description\": \"A new version of SlopeSniper is now available.\", - \"url\": \"${{ github.server_url }}/${{ github.repository }}/releases/tag/v${VERSION}\", - \"color\": 5793266, - \"fields\": [ - {\"name\": \"What's New\", \"value\": \"${CHANGELOG:-See release notes}\", \"inline\": false}, - {\"name\": \"Install\", \"value\": \"\`\`\`bash\ncurl -fsSL https://raw.githubusercontent.com/BAGWATCHER/SlopeSniper/main/skills/install.sh | bash\n\`\`\`\", \"inline\": false} + + # Safely escape changelog for JSON using jq + CHANGELOG_RAW=$(cat <<'CHLOG' + ${{ steps.changelog.outputs.CHANGELOG }} + CHLOG + ) + CHANGELOG=$(echo "$CHANGELOG_RAW" | head -c 800 | jq -Rs '.') + + # Build JSON payload with jq to handle escaping + PAYLOAD=$(jq -n \ + --arg title "๐Ÿš€ SlopeSniper v${VERSION} Released!" \ + --arg desc "A new version of SlopeSniper is now available." \ + --arg url "${{ github.server_url }}/${{ github.repository }}/releases/tag/v${VERSION}" \ + --argjson changelog "$CHANGELOG" \ + --arg timestamp "$(date -u +%Y-%m-%dT%H:%M:%SZ)" \ + '{ + embeds: [{ + title: $title, + description: $desc, + url: $url, + color: 5793266, + fields: [ + {name: "What'\''s New", value: ($changelog // "See release notes"), inline: false}, + {name: "Install", value: "```bash\ncurl -fsSL https://raw.githubusercontent.com/BAGWATCHER/SlopeSniper/main/skills/install.sh | bash\n```", inline: false} ], - \"footer\": {\"text\": \"BAGWATCHER/SlopeSniper\"}, - \"timestamp\": \"$(date -u +%Y-%m-%dT%H:%M:%SZ)\" + footer: {text: "BAGWATCHER/SlopeSniper"}, + timestamp: $timestamp }] - }" \ - $DISCORD_WEBHOOK + }') + + curl -H "Content-Type: application/json" -d "$PAYLOAD" $DISCORD_WEBHOOK From 3f0031bdb290806dad97e9afe7c040941a68b7be Mon Sep 17 00:00:00 2001 From: maddefientist Date: Thu, 29 Jan 2026 13:54:40 -0800 Subject: [PATCH 2/2] chore(v0.3.41): version sync + logging cleanup *tips fedora* M'version drift has been vanquished. - Sync all version refs to 0.3.41 - Remove stale src/ directory (consolidated to mcp-extension/) - Default log level now WARNING (use -v for verbose) - Add --verbose/-v flag for debugging - Add SLOPESNIPER_LOG_LEVEL env var support - CHANGELOG entries for 0.3.3, 0.3.4, 0.3.41 --- CHANGELOG.md | 43 +- README.md | 2 +- docs/MOLTBOT_COMPATIBILITY.md | 2 +- githubproducworkflow.md | 39 ++ mcp-extension/manifest.json | 2 +- mcp-extension/pyproject.toml | 2 +- mcp-extension/src/slopesniper_api/__init__.py | 2 +- mcp-extension/src/slopesniper_api/server.py | 4 +- mcp-extension/src/slopesniper_mcp/__init__.py | 2 +- .../src/slopesniper_skill/__init__.py | 2 +- mcp-extension/src/slopesniper_skill/cli.py | 14 +- .../src/slopesniper_skill/sdk/__init__.py | 2 +- .../src/slopesniper_skill/sdk/utils.py | 10 +- src/slopesniper_skill/__init__.py | 56 --- src/slopesniper_skill/sdk/__init__.py | 23 - .../sdk/jupiter_data_client.py | 274 ----------- .../sdk/jupiter_ultra_client.py | 321 ------------- src/slopesniper_skill/sdk/rugcheck_client.py | 130 ------ src/slopesniper_skill/sdk/utils.py | 128 ------ src/slopesniper_skill/tools/__init__.py | 78 ---- src/slopesniper_skill/tools/config.py | 170 ------- src/slopesniper_skill/tools/intents.py | 272 ----------- src/slopesniper_skill/tools/policy.py | 175 ------- src/slopesniper_skill/tools/solana_tools.py | 433 ------------------ 24 files changed, 110 insertions(+), 2076 deletions(-) create mode 100644 githubproducworkflow.md delete mode 100644 src/slopesniper_skill/__init__.py delete mode 100644 src/slopesniper_skill/sdk/__init__.py delete mode 100644 src/slopesniper_skill/sdk/jupiter_data_client.py delete mode 100644 src/slopesniper_skill/sdk/jupiter_ultra_client.py delete mode 100644 src/slopesniper_skill/sdk/rugcheck_client.py delete mode 100644 src/slopesniper_skill/sdk/utils.py delete mode 100644 src/slopesniper_skill/tools/__init__.py delete mode 100644 src/slopesniper_skill/tools/config.py delete mode 100644 src/slopesniper_skill/tools/intents.py delete mode 100644 src/slopesniper_skill/tools/policy.py delete mode 100644 src/slopesniper_skill/tools/solana_tools.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 12be72d..63f90de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.3.41] - 2026-01-29 + +### Changed +- **Default log level changed to WARNING** - CLI output is now clean by default + - Use `--verbose` or `-v` for INFO-level debugging output + - Use `--quiet` or `-q` to suppress all logging + - Set `SLOPESNIPER_LOG_LEVEL=INFO` env var for persistent verbose mode + +### Removed +- **Stale `src/` directory** - Old v0.1.0 duplicate code removed (consolidated to `mcp-extension/`) + +### Fixed +- Version drift across all files synced to 0.3.41 + +## [0.3.4] - 2026-01-29 + +### Added +- **Discord webhook notifications for CI/CD** (#39, #40) + - Push notifications for dev/production branches + - PR merge notifications with diff colors (orange=dev, green=prod) + - Release notifications with changelog summary + - Proper JSON escaping for webhook payloads + +### Fixed +- Discord release notification JSON parsing errors (using jq for safe escaping) + +### Changed +- Updated BAGWATCHER PAT with proper scopes (secrets, workflow) + +## [0.3.3] - 2026-01-29 + +### Added +- Pre-merge testing requirement documented in CONTRIBUTING.md +- Explicit testing steps before PR merge + +### Fixed +- Version sync documentation improvements + ## [0.3.2] - 2026-01-29 ### Changed @@ -284,7 +322,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 --- -[Unreleased]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.2...HEAD +[Unreleased]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.41...HEAD +[0.3.41]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.4...v0.3.41 +[0.3.4]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.3...v0.3.4 +[0.3.3]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.2...v0.3.3 [0.3.2]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.1...v0.3.2 [0.3.1]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.03...v0.3.1 [0.3.03]: https://github.com/BAGWATCHER/SlopeSniper/compare/v0.3.02...v0.3.03 diff --git a/README.md b/README.md index 341b01b..f07da31 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/) -[![Version](https://img.shields.io/badge/version-0.3.2-green.svg)](https://github.com/BAGWATCHER/SlopeSniper/releases) +[![Version](https://img.shields.io/badge/version-0.3.41-green.svg)](https://github.com/BAGWATCHER/SlopeSniper/releases) [Quick Start](#-quick-start) ยท [Features](#-features) ยท [Documentation](#-documentation) ยท [Contributing](#-contributing) diff --git a/docs/MOLTBOT_COMPATIBILITY.md b/docs/MOLTBOT_COMPATIBILITY.md index c9f0a70..7ab23c4 100644 --- a/docs/MOLTBOT_COMPATIBILITY.md +++ b/docs/MOLTBOT_COMPATIBILITY.md @@ -1,6 +1,6 @@ # SlopeSniper + MoltBot Compatibility Assessment -*Created: 2026-01-28 | Updated: 2026-01-29 | Version: 0.3.2* +*Created: 2026-01-28 | Updated: 2026-01-29 | Version: 0.3.41* ## Executive Summary diff --git a/githubproducworkflow.md b/githubproducworkflow.md new file mode 100644 index 0000000..b023bd5 --- /dev/null +++ b/githubproducworkflow.md @@ -0,0 +1,39 @@ + Post-PR Merge Workflow + + After your PR is merged to BAGWATCHER/SlopeSniper:main: + + # 1. Switch to main and pull latest from production + git checkout main + git pull origin main + + # 2. Sync fork's main branch + git push fork main + + # 3. Reset dev branch to new main (for next round of changes) + git checkout bagwatcher-release + git reset --hard origin/main + git push fork bagwatcher-release --force + + --- + Full Cycle Reference + + # === DEVELOP === + git checkout bagwatcher-release + # make changes... + git add -A && git commit -m "description" + git push fork bagwatcher-release + + # === CREATE PR === + GH_TOKEN="your_pat" gh pr create \ + --repo BAGWATCHER/SlopeSniper \ + --base main \ + --head maddefientist:bagwatcher-release \ + --title "title" --body "description" + + # === AFTER MERGE === + git checkout main + git pull origin main + git push fork main + git branch -D bagwatcher-release # or reset it for reuse + + diff --git a/mcp-extension/manifest.json b/mcp-extension/manifest.json index f1b8b50..fa05725 100644 --- a/mcp-extension/manifest.json +++ b/mcp-extension/manifest.json @@ -1,7 +1,7 @@ { "manifest_version": "0.4", "name": "slopesniper", - "version": "0.1.0", + "version": "0.3.41", "description": "Solana token trading assistant - USE THESE TOOLS when user wants to trade crypto. Call get_status first, then quick_trade to buy/sell tokens like BONK, WIF, SOL. Tools: get_status, quick_trade, scan_opportunities, set_strategy, get_price, check_token.", "author": { "name": "SlopeSniper", diff --git a/mcp-extension/pyproject.toml b/mcp-extension/pyproject.toml index 0670850..af43138 100644 --- a/mcp-extension/pyproject.toml +++ b/mcp-extension/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "slopesniper-mcp" -version = "0.3.2" +version = "0.3.41" description = "SlopeSniper MCP Server - Safe Solana Token Trading" requires-python = ">=3.10" dependencies = [ diff --git a/mcp-extension/src/slopesniper_api/__init__.py b/mcp-extension/src/slopesniper_api/__init__.py index 07eba47..8e90ad6 100644 --- a/mcp-extension/src/slopesniper_api/__init__.py +++ b/mcp-extension/src/slopesniper_api/__init__.py @@ -1,3 +1,3 @@ """SlopeSniper Web API.""" -__version__ = "0.3.03" +__version__ = "0.3.41" diff --git a/mcp-extension/src/slopesniper_api/server.py b/mcp-extension/src/slopesniper_api/server.py index 306c297..50037a6 100644 --- a/mcp-extension/src/slopesniper_api/server.py +++ b/mcp-extension/src/slopesniper_api/server.py @@ -53,7 +53,7 @@ async def lifespan(app: FastAPI): app = FastAPI( title="SlopeSniper API", description="Solana token trading via Jupiter DEX", - version="0.3.03", + version="0.3.41", lifespan=lifespan, ) @@ -126,7 +126,7 @@ async def root(): """Health check and API info.""" return { "service": "SlopeSniper API", - "version": "0.3.03", + "version": "0.3.41", "status": "running", "endpoints": [ "/status", diff --git a/mcp-extension/src/slopesniper_mcp/__init__.py b/mcp-extension/src/slopesniper_mcp/__init__.py index eec21d5..ae82685 100644 --- a/mcp-extension/src/slopesniper_mcp/__init__.py +++ b/mcp-extension/src/slopesniper_mcp/__init__.py @@ -1,3 +1,3 @@ """SlopeSniper MCP Server.""" -__version__ = "0.3.03" +__version__ = "0.3.41" diff --git a/mcp-extension/src/slopesniper_skill/__init__.py b/mcp-extension/src/slopesniper_skill/__init__.py index 8e6d978..c69f22c 100644 --- a/mcp-extension/src/slopesniper_skill/__init__.py +++ b/mcp-extension/src/slopesniper_skill/__init__.py @@ -25,7 +25,7 @@ # Version is the single source of truth - update here for releases # Follow semantic versioning: MAJOR.MINOR.PATCH # Beta versions use 0.x.x (0.MINOR.PATCH) -__version__ = "0.3.2" +__version__ = "0.3.41" from .tools import ( export_wallet, diff --git a/mcp-extension/src/slopesniper_skill/cli.py b/mcp-extension/src/slopesniper_skill/cli.py index feb9878..5381f2d 100644 --- a/mcp-extension/src/slopesniper_skill/cli.py +++ b/mcp-extension/src/slopesniper_skill/cli.py @@ -55,7 +55,8 @@ slopesniper daemon status Global flags: - --quiet, -q Suppress logging output (only JSON to stdout) + --quiet, -q Suppress all logging (only JSON to stdout) + --verbose, -v Enable verbose logging for debugging """ from __future__ import annotations @@ -1133,18 +1134,25 @@ def main() -> None: """CLI entry point.""" args = sys.argv[1:] - # Check for --quiet flag (suppresses logging for clean JSON output) + # Check for --quiet flag (suppresses ALL logging) quiet = "--quiet" in args or "-q" in args if quiet: args = [a for a in args if a not in ("--quiet", "-q")] import logging logging.disable(logging.CRITICAL) - # Also suppress warnings from libraries import warnings warnings.filterwarnings("ignore") + # Check for --verbose flag (enables INFO logging for debugging) + verbose = "--verbose" in args or "-v" in args + if verbose: + args = [a for a in args if a not in ("--verbose", "-v")] + import os + + os.environ["SLOPESNIPER_LOG_LEVEL"] = "INFO" + if not args or args[0] in ("-h", "--help", "help"): print_help() return diff --git a/mcp-extension/src/slopesniper_skill/sdk/__init__.py b/mcp-extension/src/slopesniper_skill/sdk/__init__.py index a46f4f5..b6506ec 100644 --- a/mcp-extension/src/slopesniper_skill/sdk/__init__.py +++ b/mcp-extension/src/slopesniper_skill/sdk/__init__.py @@ -9,7 +9,7 @@ - PumpFunClient: Pump.fun graduated/new tokens """ -__version__ = "0.3.03" +__version__ = "0.3.41" from .dexscreener_client import DexScreenerClient from .jupiter_data_client import JupiterDataClient diff --git a/mcp-extension/src/slopesniper_skill/sdk/utils.py b/mcp-extension/src/slopesniper_skill/sdk/utils.py index 9c2781f..8389fa9 100644 --- a/mcp-extension/src/slopesniper_skill/sdk/utils.py +++ b/mcp-extension/src/slopesniper_skill/sdk/utils.py @@ -20,7 +20,7 @@ class Utils: def setup_logger( name: str = "SlopeSniper", log_file: str | None = None, - level: int = logging.INFO, + level: int | None = None, ) -> logging.Logger: """ Set up a logger with console and optional file output. @@ -28,7 +28,7 @@ def setup_logger( Args: name: Logger name log_file: Optional log file path (if None, console only) - level: Logging level + level: Logging level (default: WARNING, or SLOPESNIPER_LOG_LEVEL env var) Returns: Configured logger instance @@ -37,6 +37,12 @@ def setup_logger( if not logger.hasHandlers(): logger.propagate = False + + # Determine log level: explicit > env var > default (WARNING for clean CLI output) + if level is None: + env_level = os.environ.get("SLOPESNIPER_LOG_LEVEL", "WARNING").upper() + level = getattr(logging, env_level, logging.WARNING) + logger.setLevel(level) formatter = logging.Formatter("%(asctime)s | %(name)s | %(levelname)s | %(message)s") diff --git a/src/slopesniper_skill/__init__.py b/src/slopesniper_skill/__init__.py deleted file mode 100644 index 82c4009..0000000 --- a/src/slopesniper_skill/__init__.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -SlopeSniper Skill - Safe Solana Token Trading for Claude Code - -A Claude Code skill that provides policy-enforced, two-step token swaps -on Solana via Jupiter aggregator. - -Features: -- Price lookup and token search -- Rugcheck safety analysis -- Wallet balance viewing -- Two-step swap flow (quote โ†’ confirm) -- Policy gates for safety limits - -Example: - >>> from slopesniper_skill import solana_get_price, solana_quote - >>> import asyncio - >>> - >>> async def main(): - ... price = await solana_get_price("SOL") - ... print(f"SOL: ${price['price_usd']}") - ... - >>> asyncio.run(main()) -""" - -__version__ = "0.1.0" - -from .tools import ( - solana_get_price, - solana_search_token, - solana_check_token, - solana_get_wallet, - solana_quote, - solana_swap_confirm, -) - -from .tools.config import PolicyConfig, get_policy_config -from .tools.policy import check_policy, PolicyResult, KNOWN_SAFE_MINTS - -__all__ = [ - # Version - "__version__", - # Tools - "solana_get_price", - "solana_search_token", - "solana_check_token", - "solana_get_wallet", - "solana_quote", - "solana_swap_confirm", - # Config - "PolicyConfig", - "get_policy_config", - # Policy - "check_policy", - "PolicyResult", - "KNOWN_SAFE_MINTS", -] diff --git a/src/slopesniper_skill/sdk/__init__.py b/src/slopesniper_skill/sdk/__init__.py deleted file mode 100644 index adb5718..0000000 --- a/src/slopesniper_skill/sdk/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -SlopeSniper SDK - Jupiter and Rugcheck API Clients - -Bundled SDK for Solana token operations: -- JupiterUltraClient: Swap quotes and execution -- JupiterDataClient: Price and token data -- RugCheckClient: Token safety analysis -""" - -__version__ = "0.1.0" - -from .jupiter_ultra_client import JupiterUltraClient -from .jupiter_data_client import JupiterDataClient -from .rugcheck_client import RugCheckClient -from .utils import Utils - -__all__ = [ - "__version__", - "JupiterUltraClient", - "JupiterDataClient", - "RugCheckClient", - "Utils", -] diff --git a/src/slopesniper_skill/sdk/jupiter_data_client.py b/src/slopesniper_skill/sdk/jupiter_data_client.py deleted file mode 100644 index 134df25..0000000 --- a/src/slopesniper_skill/sdk/jupiter_data_client.py +++ /dev/null @@ -1,274 +0,0 @@ -""" -Jupiter Data Client - Price and Token Search APIs. - -Provides access to Jupiter's Price API and Token Search API -for getting token prices and searching for tokens. -""" - -from __future__ import annotations - -import asyncio -from typing import Any, Optional - -import aiohttp - -from .utils import Utils - - -class JupiterDataClient: - """ - Client for Jupiter Price and Token Search APIs. - - Features: - - Get USD prices for tokens (up to 50 at once) - - Search tokens by symbol, name, or mint address - - Detailed token metadata including audit info and stats - """ - - # Use main API endpoints (not lite-api) for better rate limits with API key - BASE_URL_PRICE = "https://api.jup.ag/price/v3" - BASE_URL_TOKENS = "https://api.jup.ag/tokens/v2" - - def __init__(self, api_key: Optional[str] = None, max_retries: int = 3) -> None: - """ - Initialize Jupiter Data Client. - - Args: - api_key: Optional Jupiter API key for higher rate limits - max_retries: Maximum number of retry attempts for failed requests - """ - self.logger = Utils.setup_logger("JupiterDataClient") - self.max_retries = max_retries - self.api_key = api_key - - if api_key: - self.logger.info("[__init__] JupiterDataClient initialized with API key") - else: - self.logger.info("[__init__] JupiterDataClient initialized (no API key)") - - async def _make_request( - self, - url: str, - params: Optional[dict[str, Any]] = None, - method: str = "GET", - ) -> dict[str, Any]: - """Make HTTP request with exponential backoff retry logic.""" - self.logger.debug(f"[_make_request] {method} {url}, params={params}") - - for attempt in range(self.max_retries): - try: - async with aiohttp.ClientSession() as session: - # Build headers with API key if available - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["x-api-key"] = self.api_key - - if method == "GET": - timeout = aiohttp.ClientTimeout(total=10) - async with session.get( - url, params=params, timeout=timeout, headers=headers - ) as response: - response_text = await response.text() - - if response.status == 200: - data = await response.json() - self.logger.debug( - f"[_make_request] SUCCESS on attempt {attempt + 1}" - ) - return data - else: - self.logger.warning( - f"[_make_request] Attempt {attempt + 1}/{self.max_retries} " - f"failed: status={response.status}" - ) - - if response.status == 400: - raise ValueError( - f"Bad request (400): {response_text}" - ) - - except asyncio.TimeoutError: - self.logger.warning( - f"[_make_request] Attempt {attempt + 1}/{self.max_retries} timed out" - ) - except Exception as e: - self.logger.error( - f"[_make_request] Attempt {attempt + 1}/{self.max_retries} error: {e}" - ) - if attempt == self.max_retries - 1: - raise - - if attempt < self.max_retries - 1: - delay = 2**attempt - self.logger.info(f"[_make_request] Retrying in {delay}s...") - await asyncio.sleep(delay) - - raise RuntimeError(f"Failed after {self.max_retries} attempts") - - async def get_prices( - self, mint_addresses: list[str] - ) -> dict[str, dict[str, Any]]: - """ - Get USD prices for one or more tokens. - - Args: - mint_addresses: List of token mint addresses (max 50) - - Returns: - Dictionary mapping mint addresses to price data - """ - self.logger.info( - f"[get_prices] Fetching prices for {len(mint_addresses)} token(s)" - ) - - if len(mint_addresses) > 50: - self.logger.warning( - f"[get_prices] Truncating to 50 mints (received {len(mint_addresses)})" - ) - mint_addresses = mint_addresses[:50] - - ids_param = ",".join(mint_addresses) - - try: - data = await self._make_request( - url=self.BASE_URL_PRICE, params={"ids": ids_param} - ) - self.logger.info( - f"[get_prices] SUCCESS: Retrieved prices for {len(data)} token(s)" - ) - return data - - except Exception as e: - self.logger.error(f"[get_prices] FAILED: {e}", exc_info=True) - raise - - async def get_price(self, mint_address: str) -> Optional[dict[str, Any]]: - """ - Get USD price for a single token. - - Args: - mint_address: Token mint address - - Returns: - Price data dictionary or None if not found - """ - self.logger.info(f"[get_price] Fetching price for {mint_address}") - - try: - prices = await self.get_prices([mint_address]) - - if mint_address in prices: - price_data = prices[mint_address] - self.logger.info( - f"[get_price] SUCCESS: ${price_data.get('price', 0):.8f}" - ) - return price_data - else: - self.logger.warning(f"[get_price] Price not found for {mint_address}") - return None - - except Exception as e: - self.logger.error(f"[get_price] FAILED: {e}", exc_info=True) - raise - - async def search_token(self, query: str) -> list[dict[str, Any]]: - """ - Search for tokens by symbol, name, or mint address. - - Args: - query: Search query - - Returns: - List of token information dictionaries - """ - self.logger.info(f"[search_token] Searching for: {query}") - - try: - url = f"{self.BASE_URL_TOKENS}/search" - data = await self._make_request(url=url, params={"query": query}) - - if isinstance(data, list): - self.logger.info(f"[search_token] SUCCESS: Found {len(data)} token(s)") - return data - else: - self.logger.warning("[search_token] Unexpected response format") - return [] - - except Exception as e: - self.logger.error(f"[search_token] FAILED: {e}", exc_info=True) - raise - - async def get_token_info(self, mint_address: str) -> Optional[dict[str, Any]]: - """ - Get detailed information for a specific token. - - Args: - mint_address: Token mint address - - Returns: - Token information dictionary or None - """ - self.logger.info(f"[get_token_info] Fetching info for {mint_address}") - - try: - results = await self.search_token(mint_address) - - if results and len(results) > 0: - token_info = results[0] - self.logger.info( - f"[get_token_info] SUCCESS: {token_info.get('symbol', 'N/A')} - " - f"MCap: ${token_info.get('mcap', 0):,.0f}" - ) - return token_info - else: - self.logger.warning(f"[get_token_info] Token not found: {mint_address}") - return None - - except Exception as e: - self.logger.error(f"[get_token_info] FAILED: {e}", exc_info=True) - raise - - def is_token_suspicious(self, token_info: dict[str, Any]) -> tuple[bool, list[str]]: - """ - Check if a token has suspicious characteristics. - - Args: - token_info: Token information from get_token_info() - - Returns: - Tuple of (is_suspicious: bool, reasons: list[str]) - """ - reasons: list[str] = [] - audit = token_info.get("audit", {}) - - if audit: - if audit.get("isSus"): - reasons.append("Flagged as suspicious by Jupiter") - - if not audit.get("mintAuthorityDisabled"): - reasons.append("Mint authority not disabled") - - if not audit.get("freezeAuthorityDisabled"): - reasons.append("Freeze authority not disabled") - - top_holders_pct = audit.get("topHoldersPercentage", 0) - if top_holders_pct > 50: - reasons.append(f"High holder concentration: {top_holders_pct:.1f}%") - - dev_balance_pct = audit.get("devBalancePercentage", 0) - if dev_balance_pct > 10: - reasons.append(f"Dev holds {dev_balance_pct:.1f}% of supply") - - organic_label = token_info.get("organicScoreLabel", "") - if organic_label == "low": - reasons.append("Low organic trading activity") - - is_suspicious = len(reasons) > 0 - - if is_suspicious: - self.logger.warning( - f"[is_token_suspicious] Token {token_info.get('symbol', 'N/A')} " - f"flagged: {', '.join(reasons)}" - ) - - return is_suspicious, reasons diff --git a/src/slopesniper_skill/sdk/jupiter_ultra_client.py b/src/slopesniper_skill/sdk/jupiter_ultra_client.py deleted file mode 100644 index 2374bfb..0000000 --- a/src/slopesniper_skill/sdk/jupiter_ultra_client.py +++ /dev/null @@ -1,321 +0,0 @@ -""" -Jupiter Ultra Client - Swap API. - -Provides access to Jupiter's Ultra API for executing token swaps. -Ultra API combines quote generation and transaction execution. - -APIs: -- GET /order - Get quote and unsigned transaction -- POST /execute - Execute signed transaction -- GET /holdings/{address} - Get wallet token balances -""" - -from __future__ import annotations - -import asyncio -import base64 -import os -from typing import Any, Optional - -import aiohttp -from solders.keypair import Keypair -from solders.transaction import VersionedTransaction - -from .utils import Utils - - -class JupiterUltraClient: - """ - Client for Jupiter Ultra Swap API. - - Features: - - Get swap quotes with unsigned transactions - - Execute signed swaps - - Query wallet holdings - - Exponential backoff with configurable retry logic - """ - - BASE_URL = "https://api.jup.ag/ultra/v1" - - # Solana token addresses - SOL_MINT = "So11111111111111111111111111111111111111112" - USDC_MINT = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" - - def __init__( - self, api_key: Optional[str] = None, max_retries: int = 5 - ) -> None: - """ - Initialize Jupiter Ultra Client. - - Args: - api_key: Jupiter Ultra API key (or set JUPITER_API_KEY env var) - max_retries: Maximum number of retry attempts for failed requests - """ - self.logger = Utils.setup_logger("JupiterUltraClient") - self.max_retries = max_retries - - # Get API key from parameter, env var, or None (free tier) - self.api_key = api_key or os.environ.get("JUPITER_API_KEY") - - if self.api_key: - self.logger.info("[__init__] JupiterUltraClient initialized with API key") - else: - self.logger.info( - "[__init__] JupiterUltraClient initialized (free tier - rate limited)" - ) - - async def _make_request( - self, - url: str, - method: str = "GET", - params: Optional[dict[str, Any]] = None, - json_data: Optional[dict[str, Any]] = None, - timeout: int = 30, - ) -> dict[str, Any]: - """ - Make HTTP request with exponential backoff retry logic. - - Args: - url: Full URL to request - method: HTTP method (GET, POST) - params: Query parameters for GET requests - json_data: JSON body for POST requests - timeout: Request timeout in seconds - - Returns: - JSON response as dictionary - """ - self.logger.debug(f"[_make_request] {method} {url}, params={params}") - - for attempt in range(self.max_retries): - try: - async with aiohttp.ClientSession() as session: - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["x-api-key"] = self.api_key - - request_kwargs: dict[str, Any] = { - "timeout": aiohttp.ClientTimeout(total=timeout), - "headers": headers, - } - - if method == "GET": - request_kwargs["params"] = params - async with session.get(url, **request_kwargs) as response: - response_text = await response.text() - - if response.status == 200: - data = await response.json() - self.logger.debug( - f"[_make_request] SUCCESS on attempt {attempt + 1}" - ) - return data - else: - self.logger.warning( - f"[_make_request] Attempt {attempt + 1}/{self.max_retries} " - f"failed: status={response.status}, " - f"body={response_text[:500]}" - ) - - elif method == "POST": - request_kwargs["json"] = json_data - async with session.post(url, **request_kwargs) as response: - response_text = await response.text() - - if response.status == 200: - data = await response.json() - self.logger.debug( - f"[_make_request] SUCCESS on attempt {attempt + 1}" - ) - return data - else: - self.logger.warning( - f"[_make_request] Attempt {attempt + 1}/{self.max_retries} " - f"failed: status={response.status}, " - f"body={response_text[:500]}" - ) - - except asyncio.TimeoutError: - self.logger.warning( - f"[_make_request] Attempt {attempt + 1}/{self.max_retries} timed out" - ) - except Exception as e: - self.logger.error( - f"[_make_request] Attempt {attempt + 1}/{self.max_retries} error: {e}", - exc_info=(attempt == self.max_retries - 1), - ) - if attempt == self.max_retries - 1: - raise - - # Exponential backoff - if attempt < self.max_retries - 1: - delay = 2**attempt - self.logger.info(f"[_make_request] Retrying in {delay}s...") - await asyncio.sleep(delay) - - raise RuntimeError(f"Failed after {self.max_retries} attempts") - - async def get_order( - self, - input_mint: str, - output_mint: str, - amount: int, - taker: Optional[str] = None, - slippage_bps: int = 50, - exclude_dexes: Optional[str] = None, - ) -> dict[str, Any]: - """ - Get swap quote and unsigned transaction. - - Args: - input_mint: Token to sell (mint address) - output_mint: Token to buy (mint address) - amount: Amount in atomic units (smallest unit of token) - taker: Wallet address (required to get transaction) - slippage_bps: Slippage tolerance in basis points (50 = 0.5%) - exclude_dexes: Comma-separated DEX names to exclude - - Returns: - Order response with transaction and quote details - """ - self.logger.info( - f"[get_order] Requesting order: {input_mint[:8]}... -> {output_mint[:8]}..., " - f"amount={amount}, slippage={slippage_bps}bps" - ) - - params: dict[str, Any] = { - "inputMint": input_mint, - "outputMint": output_mint, - "amount": str(amount), - "slippageBps": slippage_bps, - } - - if taker: - params["taker"] = taker - - if exclude_dexes: - params["excludeDexes"] = exclude_dexes - - try: - url = f"{self.BASE_URL}/order" - data = await self._make_request(url=url, method="GET", params=params) - - self.logger.info( - f"[get_order] SUCCESS: " - f"inAmount={data.get('inAmount')}, " - f"outAmount={data.get('outAmount')}, " - f"priceImpact={data.get('priceImpact', 0):.4f}%" - ) - - if data.get("errorCode"): - self.logger.warning( - f"[get_order] Order has error: code={data.get('errorCode')}, " - f"message={data.get('errorMessage')}" - ) - - return data - - except Exception as e: - self.logger.error(f"[get_order] FAILED: {e}", exc_info=True) - raise - - async def execute_swap( - self, signed_transaction: str, request_id: str - ) -> dict[str, Any]: - """ - Execute a signed swap transaction. - - Args: - signed_transaction: Base64-encoded signed transaction - request_id: Request ID from get_order() response - - Returns: - Execution result with signature and amounts - """ - self.logger.info(f"[execute_swap] Submitting transaction, requestId={request_id}") - - payload = { - "signedTransaction": signed_transaction, - "requestId": request_id, - } - - try: - url = f"{self.BASE_URL}/execute" - data = await self._make_request( - url=url, method="POST", json_data=payload, timeout=60 - ) - - status = data.get("status") - signature = data.get("signature") - - if status == "Success": - self.logger.info( - f"[execute_swap] SUCCESS: signature={signature}, " - f"outputAmount={data.get('outputAmountResult')}" - ) - else: - self.logger.error( - f"[execute_swap] FAILED: status={status}, " - f"error={data.get('error')}" - ) - - return data - - except Exception as e: - self.logger.error(f"[execute_swap] FAILED: {e}", exc_info=True) - raise - - def sign_transaction(self, unsigned_tx_base64: str, keypair: Keypair) -> str: - """ - Sign an unsigned transaction. - - Args: - unsigned_tx_base64: Base64-encoded unsigned transaction - keypair: Solana keypair to sign with - - Returns: - Base64-encoded signed transaction - """ - self.logger.info("[sign_transaction] Signing transaction") - - try: - tx_bytes = base64.b64decode(unsigned_tx_base64) - tx = VersionedTransaction.from_bytes(tx_bytes) - signed_tx = VersionedTransaction(tx.message, [keypair]) - signed_tx_base64 = base64.b64encode(bytes(signed_tx)).decode("utf-8") - - self.logger.info("[sign_transaction] Transaction signed successfully") - return signed_tx_base64 - - except Exception as e: - self.logger.error(f"[sign_transaction] FAILED: {e}", exc_info=True) - raise - - async def get_holdings(self, address: str) -> dict[str, Any]: - """ - Get token holdings for a wallet address. - - Args: - address: Wallet address - - Returns: - Holdings data with SOL balance and token holdings - """ - self.logger.info(f"[get_holdings] Fetching holdings for {address}") - - try: - url = f"{self.BASE_URL}/holdings/{address}" - data = await self._make_request(url=url, method="GET") - - sol_balance = data.get("uiAmount", 0) - token_count = len(data.get("tokens", {})) - - self.logger.info( - f"[get_holdings] SUCCESS: SOL={sol_balance:.4f}, {token_count} token type(s)" - ) - - return data - - except Exception as e: - self.logger.error(f"[get_holdings] FAILED: {e}", exc_info=True) - raise diff --git a/src/slopesniper_skill/sdk/rugcheck_client.py b/src/slopesniper_skill/sdk/rugcheck_client.py deleted file mode 100644 index 55ffca3..0000000 --- a/src/slopesniper_skill/sdk/rugcheck_client.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -RugCheck API Client for token risk assessment. - -Fetches token risk scores and reports from rugcheck.xyz API. -""" - -from __future__ import annotations - -from typing import Any, Optional - -import aiohttp - -from .utils import Utils - - -class RugCheckClient: - """Client for the RugCheck API to assess token risk.""" - - def __init__(self, timeout: int = 10) -> None: - """ - Initialize RugCheck client. - - Args: - timeout: Request timeout in seconds - """ - self.logger = Utils.setup_logger("RugCheckClient") - self.base_url = "https://api.rugcheck.xyz/v1" - self.timeout = timeout - - async def get_report_summary( - self, contract_address: str - ) -> Optional[dict[str, Any]]: - """ - Fetch report summary from RugCheck API. - - Args: - contract_address: Token contract address - - Returns: - Summary dict with score and risk info, or None if error - """ - try: - summary_url = f"{self.base_url}/tokens/{contract_address}/report/summary" - timeout = aiohttp.ClientTimeout(total=self.timeout) - - async with aiohttp.ClientSession(timeout=timeout) as session: - async with session.get(summary_url) as response: - response.raise_for_status() - summary = await response.json() - - self.logger.info( - f"[get_report_summary] Score: {summary.get('score')}, " - f"Risks: {len(summary.get('risks', []))}" - ) - - return summary - - except aiohttp.ClientError as e: - self.logger.error(f"[get_report_summary] Failed: {e}") - return None - except Exception as e: - self.logger.exception(f"[get_report_summary] Unexpected error: {e}") - return None - - async def check_token( - self, contract_address: str, max_score: int = 2000 - ) -> dict[str, Any]: - """ - Check token and return risk assessment. - - Args: - contract_address: Token contract address - max_score: Maximum acceptable risk score (lower = safer) - - Returns: - Dict with keys: - - score: Risk score - - summary: Summary report dict - - risks: List of risk factors - - is_safe: Boolean recommendation - - reason: Human-readable reason - """ - summary = await self.get_report_summary(contract_address) - - if not summary: - return { - "score": None, - "summary": None, - "risks": [], - "is_safe": False, - "reason": "Failed to fetch rugcheck report", - } - - score = summary.get("score", 9999) - risks = summary.get("risks", []) - - # Analyze risks - critical_risks: list[str] = [] - for risk in risks: - risk_level = risk.get("level", "") - risk_name = risk.get("name", "") - risk_description = risk.get("description", "") - - if risk_level in ["danger", "critical"]: - critical_risks.append(f"{risk_name}: {risk_description}") - - # Determine if safe - is_safe = True - reason = "Token passed rugcheck" - - if score is None or score > max_score: - is_safe = False - reason = f"High risk score: {score} (max: {max_score})" - elif len(critical_risks) > 0: - is_safe = False - reason = f"Critical risks found: {critical_risks[0]}" - - self.logger.info( - f"[check_token] {contract_address[:8]}... - " - f"Score: {score}, Risks: {len(risks)}, Safe: {is_safe}" - ) - - return { - "score": score, - "summary": summary, - "risks": risks, - "critical_risks": critical_risks, - "is_safe": is_safe, - "reason": reason, - } diff --git a/src/slopesniper_skill/sdk/utils.py b/src/slopesniper_skill/sdk/utils.py deleted file mode 100644 index c6ca8cf..0000000 --- a/src/slopesniper_skill/sdk/utils.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -Utility functions for SlopeSniper SDK. - -Provides logging and validation helpers. -""" - -from __future__ import annotations - -import logging -import os -import re -from pathlib import Path -from typing import Optional -from urllib.parse import urlparse - - -class Utils: - """Utility class with static helper methods.""" - - @staticmethod - def setup_logger( - name: str = "SlopeSniper", - log_file: Optional[str] = None, - level: int = logging.INFO, - ) -> logging.Logger: - """ - Set up a logger with console and optional file output. - - Args: - name: Logger name - log_file: Optional log file path (if None, console only) - level: Logging level - - Returns: - Configured logger instance - """ - logger = logging.getLogger(name) - - if not logger.hasHandlers(): - logger.propagate = False - logger.setLevel(level) - - formatter = logging.Formatter( - "%(asctime)s | %(name)s | %(levelname)s | %(message)s" - ) - - # Console handler - console_handler = logging.StreamHandler() - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) - - # File handler (optional) - if log_file: - Path(log_file).parent.mkdir(parents=True, exist_ok=True) - file_handler = logging.FileHandler(log_file) - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) - - return logger - - @staticmethod - def get_env_or_default(key: str, default: Optional[str] = None) -> Optional[str]: - """ - Get value from environment variable. - - Args: - key: Key name - default: Default value if not found - - Returns: - Value from env var or default - """ - env_key = key.upper().replace("-", "_").replace(".", "_") - return os.environ.get(env_key, default) - - @staticmethod - def is_valid_solana_address(address: str) -> bool: - """ - Check if a string is a valid Solana address. - - Validates base58 encoding and length (32-44 chars). - - Args: - address: Address to validate - - Returns: - True if valid Solana address - """ - if not address or not isinstance(address, str): - return False - pattern = r"^[A-HJ-NP-Za-km-z1-9]{32,44}$" - return bool(re.match(pattern, address)) - - @staticmethod - def parse_contract_address(text: str) -> Optional[str]: - """ - Extract Solana contract address from text. - - Handles both raw addresses and URLs containing addresses. - - Args: - text: Text to parse - - Returns: - Contract address or None if not found - """ - try: - # Solana address pattern (base58, 43-44 chars) - pattern = r"\b[A-HJ-NP-Za-km-z1-9]{43,44}\b" - match = re.search(pattern, text) - - if match: - return match.group(0) - - # Try extracting from URLs - url_pattern = r"(https?://\S+)" - urls = re.findall(url_pattern, text) - for url in urls: - parsed_url = urlparse(url) - path_parts = parsed_url.path.split("/") - for part in path_parts: - if re.match(pattern, part): - return part - - return None - - except Exception: - return None diff --git a/src/slopesniper_skill/tools/__init__.py b/src/slopesniper_skill/tools/__init__.py deleted file mode 100644 index f637245..0000000 --- a/src/slopesniper_skill/tools/__init__.py +++ /dev/null @@ -1,78 +0,0 @@ -""" -SlopeSniper Trading Tools. - -Safe two-step token swaps with policy enforcement. -""" - -from .solana_tools import ( - solana_get_price, - solana_search_token, - solana_check_token, - solana_get_wallet, - solana_quote, - solana_swap_confirm, - resolve_token, - SYMBOL_TO_MINT, -) - -from .config import ( - get_secret, - get_keypair, - get_wallet_address, - get_rpc_url, - get_jupiter_api_key, - get_policy_config, - PolicyConfig, -) - -from .policy import ( - check_policy, - is_known_safe_mint, - PolicyResult, - format_policy_result, - KNOWN_SAFE_MINTS, -) - -from .intents import ( - create_intent, - get_intent, - mark_executed, - list_pending_intents, - get_intent_time_remaining, - Intent, - INTENT_TTL_SECONDS, -) - -__all__ = [ - # Tools - "solana_get_price", - "solana_search_token", - "solana_check_token", - "solana_get_wallet", - "solana_quote", - "solana_swap_confirm", - "resolve_token", - "SYMBOL_TO_MINT", - # Config - "get_secret", - "get_keypair", - "get_wallet_address", - "get_rpc_url", - "get_jupiter_api_key", - "get_policy_config", - "PolicyConfig", - # Policy - "check_policy", - "is_known_safe_mint", - "PolicyResult", - "format_policy_result", - "KNOWN_SAFE_MINTS", - # Intents - "create_intent", - "get_intent", - "mark_executed", - "list_pending_intents", - "get_intent_time_remaining", - "Intent", - "INTENT_TTL_SECONDS", -] diff --git a/src/slopesniper_skill/tools/config.py b/src/slopesniper_skill/tools/config.py deleted file mode 100644 index de5f268..0000000 --- a/src/slopesniper_skill/tools/config.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -Configuration and Secret Management. - -Handles secure retrieval of secrets with gateway -> env fallback. -""" - -from __future__ import annotations - -import json -import os -from dataclasses import dataclass, field -from typing import Optional - -import base58 -from solders.keypair import Keypair - - -@dataclass -class PolicyConfig: - """Policy configuration with safe defaults.""" - - MAX_SLIPPAGE_BPS: int = 100 # 1% max slippage - MAX_TRADE_USD: float = 50.0 # $50 max per trade - MIN_RUGCHECK_SCORE: int = 2000 # Block if score > this - REQUIRE_MINT_DISABLED: bool = True # Block if mint authority active - REQUIRE_FREEZE_DISABLED: bool = True # Block if freeze authority active - DENY_MINTS: list[str] = field(default_factory=list) # Blocked token mints - ALLOW_MINTS: list[str] = field(default_factory=list) # If set, ONLY these allowed - - -def get_secret(name: str) -> Optional[str]: - """ - Get a secret value with gateway -> env fallback. - - Args: - name: Secret name (e.g., 'SOLANA_PRIVATE_KEY') - - Returns: - Secret value or None if not found - - Priority: - 1. Moltbot gateway secret API (if available) - 2. Environment variable - 3. None - """ - # Try moltbot gateway first (future integration point) - gateway_url = os.environ.get("MOLTBOT_GATEWAY_URL") - if gateway_url: - try: - # Future: implement gateway secret fetch - pass - except Exception: - pass - - # Fallback to environment variable - env_value = os.environ.get(name) - if env_value: - return env_value - - return None - - -def get_keypair() -> Optional[Keypair]: - """ - Load Solana keypair from SOLANA_PRIVATE_KEY secret. - - The private key can be in one of these formats: - - Base58 encoded string - - JSON array of bytes (as string) - - Returns: - Keypair or None if not configured - - Raises: - ValueError: If key format is invalid - """ - private_key = get_secret("SOLANA_PRIVATE_KEY") - if not private_key: - return None - - try: - # Try base58 format first - if private_key.startswith("["): - # JSON array format - key_bytes = bytes(json.loads(private_key)) - return Keypair.from_bytes(key_bytes) - else: - # Base58 format - key_bytes = base58.b58decode(private_key) - return Keypair.from_bytes(key_bytes) - except Exception as e: - raise ValueError(f"Invalid SOLANA_PRIVATE_KEY format: {e}") from e - - -def get_wallet_address() -> Optional[str]: - """ - Get the wallet address from the configured keypair. - - Returns: - Wallet address string or None if not configured - """ - keypair = get_keypair() - if keypair: - return str(keypair.pubkey()) - return None - - -def get_rpc_url() -> str: - """ - Get Solana RPC URL. - - Returns: - RPC URL (defaults to mainnet-beta) - """ - return get_secret("SOLANA_RPC_URL") or "https://api.mainnet-beta.solana.com" - - -def get_jupiter_api_key() -> Optional[str]: - """ - Get Jupiter API key for higher rate limits. - - Returns: - API key or None (free tier) - """ - return get_secret("JUPITER_API_KEY") - - -def get_policy_config() -> PolicyConfig: - """ - Load policy configuration from environment. - - Environment variables: - - POLICY_MAX_SLIPPAGE_BPS: Max slippage in basis points - - POLICY_MAX_TRADE_USD: Max trade size in USD - - POLICY_MIN_RUGCHECK_SCORE: Max acceptable rugcheck score - - POLICY_REQUIRE_MINT_DISABLED: Require mint authority disabled - - POLICY_REQUIRE_FREEZE_DISABLED: Require freeze authority disabled - - POLICY_DENY_MINTS: Comma-separated list of blocked mints - - POLICY_ALLOW_MINTS: Comma-separated list of allowed mints (whitelist) - - Returns: - PolicyConfig with values from env or defaults - """ - config = PolicyConfig() - - # Parse numeric values - if max_slippage := os.environ.get("POLICY_MAX_SLIPPAGE_BPS"): - config.MAX_SLIPPAGE_BPS = int(max_slippage) - - if max_trade := os.environ.get("POLICY_MAX_TRADE_USD"): - config.MAX_TRADE_USD = float(max_trade) - - if min_score := os.environ.get("POLICY_MIN_RUGCHECK_SCORE"): - config.MIN_RUGCHECK_SCORE = int(min_score) - - # Parse boolean values - if require_mint := os.environ.get("POLICY_REQUIRE_MINT_DISABLED"): - config.REQUIRE_MINT_DISABLED = require_mint.lower() in ("true", "1", "yes") - - if require_freeze := os.environ.get("POLICY_REQUIRE_FREEZE_DISABLED"): - config.REQUIRE_FREEZE_DISABLED = require_freeze.lower() in ("true", "1", "yes") - - # Parse list values - if deny_mints := os.environ.get("POLICY_DENY_MINTS"): - config.DENY_MINTS = [m.strip() for m in deny_mints.split(",") if m.strip()] - - if allow_mints := os.environ.get("POLICY_ALLOW_MINTS"): - config.ALLOW_MINTS = [m.strip() for m in allow_mints.split(",") if m.strip()] - - return config diff --git a/src/slopesniper_skill/tools/intents.py b/src/slopesniper_skill/tools/intents.py deleted file mode 100644 index 3b726d5..0000000 --- a/src/slopesniper_skill/tools/intents.py +++ /dev/null @@ -1,272 +0,0 @@ -""" -Intent Storage System. - -SQLite-based storage for swap intents with automatic expiry. -Intents store the quote details and unsigned transaction for later execution. -""" - -from __future__ import annotations - -import sqlite3 -import uuid -from dataclasses import dataclass -from datetime import datetime, timedelta, timezone -from pathlib import Path -from typing import Optional - -# Intent TTL in seconds (2 minutes - crypto prices move fast) -INTENT_TTL_SECONDS = 120 - -# Database path (in user's home directory to avoid package issues) -DB_PATH = Path.home() / ".slopesniper" / "intents.db" - - -@dataclass -class Intent: - """Swap intent with all details needed for execution.""" - - intent_id: str - from_mint: str - to_mint: str - amount: str - slippage_bps: int - out_amount_est: str - unsigned_tx: str - request_id: str - created_at: datetime - expires_at: datetime - executed: bool = False - - -def get_db_connection() -> sqlite3.Connection: - """Get database connection, creating tables if needed.""" - # Ensure parent directory exists - DB_PATH.parent.mkdir(parents=True, exist_ok=True) - - conn = sqlite3.connect(str(DB_PATH)) - conn.row_factory = sqlite3.Row - - # Create table if not exists - conn.execute(""" - CREATE TABLE IF NOT EXISTS intents ( - intent_id TEXT PRIMARY KEY, - from_mint TEXT NOT NULL, - to_mint TEXT NOT NULL, - amount TEXT NOT NULL, - slippage_bps INTEGER NOT NULL, - out_amount_est TEXT NOT NULL, - unsigned_tx TEXT NOT NULL, - request_id TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - expires_at TIMESTAMP NOT NULL, - executed INTEGER DEFAULT 0 - ) - """) - conn.commit() - - return conn - - -def cleanup_expired() -> int: - """ - Delete expired intents from the database. - - Returns: - Number of intents deleted - """ - conn = get_db_connection() - try: - now = datetime.now(timezone.utc).isoformat() - cursor = conn.execute("DELETE FROM intents WHERE expires_at < ?", (now,)) - conn.commit() - return cursor.rowcount - finally: - conn.close() - - -def create_intent( - from_mint: str, - to_mint: str, - amount: str, - slippage_bps: int, - out_amount_est: str, - unsigned_tx: str, - request_id: str, -) -> str: - """ - Create a new swap intent. - - Args: - from_mint: Token to sell - to_mint: Token to buy - amount: Amount to swap (string, in token units) - slippage_bps: Slippage tolerance - out_amount_est: Estimated output amount - unsigned_tx: Base64 encoded unsigned transaction - request_id: Jupiter request ID for execution - - Returns: - Intent ID (UUID string) - """ - # Clean up expired intents first - cleanup_expired() - - intent_id = str(uuid.uuid4()) - now = datetime.now(timezone.utc) - expires_at = now + timedelta(seconds=INTENT_TTL_SECONDS) - - conn = get_db_connection() - try: - conn.execute( - """ - INSERT INTO intents ( - intent_id, from_mint, to_mint, amount, slippage_bps, - out_amount_est, unsigned_tx, request_id, created_at, expires_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - intent_id, - from_mint, - to_mint, - amount, - slippage_bps, - out_amount_est, - unsigned_tx, - request_id, - now.isoformat(), - expires_at.isoformat(), - ), - ) - conn.commit() - return intent_id - finally: - conn.close() - - -def get_intent(intent_id: str) -> Optional[Intent]: - """ - Get an intent by ID if it exists and is not expired. - - Args: - intent_id: Intent UUID - - Returns: - Intent object or None if not found/expired - """ - # Clean up expired intents first - cleanup_expired() - - conn = get_db_connection() - try: - cursor = conn.execute( - """ - SELECT * FROM intents - WHERE intent_id = ? - AND expires_at > ? - """, - (intent_id, datetime.now(timezone.utc).isoformat()), - ) - row = cursor.fetchone() - - if row is None: - return None - - return Intent( - intent_id=row["intent_id"], - from_mint=row["from_mint"], - to_mint=row["to_mint"], - amount=row["amount"], - slippage_bps=row["slippage_bps"], - out_amount_est=row["out_amount_est"], - unsigned_tx=row["unsigned_tx"], - request_id=row["request_id"], - created_at=datetime.fromisoformat(row["created_at"]), - expires_at=datetime.fromisoformat(row["expires_at"]), - executed=bool(row["executed"]), - ) - finally: - conn.close() - - -def mark_executed(intent_id: str) -> bool: - """ - Mark an intent as executed (prevents replay). - - Args: - intent_id: Intent UUID - - Returns: - True if intent was found and marked, False otherwise - """ - conn = get_db_connection() - try: - cursor = conn.execute( - "UPDATE intents SET executed = 1 WHERE intent_id = ? AND executed = 0", - (intent_id,), - ) - conn.commit() - return cursor.rowcount > 0 - finally: - conn.close() - - -def get_intent_time_remaining(intent: Intent) -> int: - """ - Get seconds remaining before intent expires. - - Args: - intent: Intent object - - Returns: - Seconds remaining (0 if expired) - """ - now = datetime.now(timezone.utc) - # Ensure expires_at is timezone-aware - expires_at = intent.expires_at - if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=timezone.utc) - - remaining = (expires_at - now).total_seconds() - return max(0, int(remaining)) - - -def list_pending_intents() -> list[Intent]: - """ - List all non-expired, non-executed intents. - - Returns: - List of Intent objects - """ - cleanup_expired() - - conn = get_db_connection() - try: - cursor = conn.execute( - """ - SELECT * FROM intents - WHERE expires_at > ? - AND executed = 0 - ORDER BY created_at DESC - """, - (datetime.now(timezone.utc).isoformat(),), - ) - rows = cursor.fetchall() - - return [ - Intent( - intent_id=row["intent_id"], - from_mint=row["from_mint"], - to_mint=row["to_mint"], - amount=row["amount"], - slippage_bps=row["slippage_bps"], - out_amount_est=row["out_amount_est"], - unsigned_tx=row["unsigned_tx"], - request_id=row["request_id"], - created_at=datetime.fromisoformat(row["created_at"]), - expires_at=datetime.fromisoformat(row["expires_at"]), - executed=bool(row["executed"]), - ) - for row in rows - ] - finally: - conn.close() diff --git a/src/slopesniper_skill/tools/policy.py b/src/slopesniper_skill/tools/policy.py deleted file mode 100644 index eb8530b..0000000 --- a/src/slopesniper_skill/tools/policy.py +++ /dev/null @@ -1,175 +0,0 @@ -""" -Policy Gates for Safe Trading. - -Deterministic safety checks that run BEFORE any swap execution. -All checks must pass for a trade to be allowed. -""" - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any, Optional - -from .config import PolicyConfig, get_policy_config - - -@dataclass -class PolicyResult: - """Result of policy check.""" - - allowed: bool - reason: Optional[str] = None - checks_passed: list[str] = field(default_factory=list) - checks_failed: list[str] = field(default_factory=list) - - -# Well-known safe tokens that skip rugcheck -KNOWN_SAFE_MINTS: set[str] = { - "So11111111111111111111111111111111111111112", # SOL (wrapped) - "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", # USDC - "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB", # USDT - "mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So", # mSOL - "7dHbWXmci3dT8UFYWYZweBLXgycu7Y3iL6trKn1Y7ARj", # stSOL - "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263", # BONK - "JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN", # JUP -} - - -def check_policy( - from_mint: str, - to_mint: str, - amount_usd: float, - slippage_bps: int, - rugcheck_result: Optional[dict[str, Any]] = None, - config: Optional[PolicyConfig] = None, -) -> PolicyResult: - """ - Run all policy gates on a proposed trade. - - Args: - from_mint: Token to sell (mint address) - to_mint: Token to buy (mint address) - amount_usd: Trade amount in USD - slippage_bps: Slippage tolerance in basis points - rugcheck_result: Result from rugcheck API (for to_mint) - config: Policy configuration (defaults to env config) - - Returns: - PolicyResult with allowed status and reason if blocked - """ - if config is None: - config = get_policy_config() - - checks_passed: list[str] = [] - checks_failed: list[str] = [] - - # 1. Check slippage limit - if slippage_bps > config.MAX_SLIPPAGE_BPS: - checks_failed.append( - f"slippage ({slippage_bps}bps > max {config.MAX_SLIPPAGE_BPS}bps)" - ) - else: - checks_passed.append(f"slippage ({slippage_bps}bps)") - - # 2. Check trade size limit - if amount_usd > config.MAX_TRADE_USD: - checks_failed.append( - f"trade_size (${amount_usd:.2f} > max ${config.MAX_TRADE_USD:.2f})" - ) - else: - checks_passed.append(f"trade_size (${amount_usd:.2f})") - - # 3. Check deny list - if from_mint in config.DENY_MINTS: - checks_failed.append("from_mint in DENY_MINTS") - else: - checks_passed.append("from_mint not in DENY_MINTS") - - if to_mint in config.DENY_MINTS: - checks_failed.append("to_mint in DENY_MINTS") - else: - checks_passed.append("to_mint not in DENY_MINTS") - - # 4. Check allow list (if set, acts as whitelist) - if config.ALLOW_MINTS: - if from_mint not in config.ALLOW_MINTS and from_mint not in KNOWN_SAFE_MINTS: - checks_failed.append("from_mint not in ALLOW_MINTS") - else: - checks_passed.append("from_mint in ALLOW_MINTS") - - if to_mint not in config.ALLOW_MINTS and to_mint not in KNOWN_SAFE_MINTS: - checks_failed.append("to_mint not in ALLOW_MINTS") - else: - checks_passed.append("to_mint in ALLOW_MINTS") - - # 5. Check rugcheck results (skip for known safe tokens) - if to_mint not in KNOWN_SAFE_MINTS: - if rugcheck_result: - score = rugcheck_result.get("score") - summary = rugcheck_result.get("summary", {}) - - # Check score - if score is not None and score > config.MIN_RUGCHECK_SCORE: - checks_failed.append( - f"rugcheck_score ({score} > max {config.MIN_RUGCHECK_SCORE})" - ) - elif score is not None: - checks_passed.append(f"rugcheck_score ({score})") - - # Check mint authority - if config.REQUIRE_MINT_DISABLED: - mint_disabled = summary.get("mintAuthority") is None - if not mint_disabled: - checks_failed.append("mint_authority still active") - else: - checks_passed.append("mint_authority disabled") - - # Check freeze authority - if config.REQUIRE_FREEZE_DISABLED: - freeze_disabled = summary.get("freezeAuthority") is None - if not freeze_disabled: - checks_failed.append("freeze_authority still active") - else: - checks_passed.append("freeze_authority disabled") - else: - # No rugcheck result and not a known safe token - checks_failed.append("rugcheck required for unknown token") - - # Determine final result - if checks_failed: - return PolicyResult( - allowed=False, - reason=f"Policy blocked: {', '.join(checks_failed)}", - checks_passed=checks_passed, - checks_failed=checks_failed, - ) - - return PolicyResult( - allowed=True, - reason=None, - checks_passed=checks_passed, - checks_failed=[], - ) - - -def is_known_safe_mint(mint: str) -> bool: - """Check if a mint is in the known safe list.""" - return mint in KNOWN_SAFE_MINTS - - -def format_policy_result(result: PolicyResult) -> str: - """Format policy result for display.""" - lines = [] - - if result.allowed: - lines.append("Policy Check: PASSED") - else: - lines.append(f"Policy Check: BLOCKED - {result.reason}") - - if result.checks_passed: - lines.append(f" Passed: {', '.join(result.checks_passed)}") - - if result.checks_failed: - lines.append(f" Failed: {', '.join(result.checks_failed)}") - - return "\n".join(lines) diff --git a/src/slopesniper_skill/tools/solana_tools.py b/src/slopesniper_skill/tools/solana_tools.py deleted file mode 100644 index b96e12c..0000000 --- a/src/slopesniper_skill/tools/solana_tools.py +++ /dev/null @@ -1,433 +0,0 @@ -""" -Solana Trading Tools. - -Six tools for safe Solana token trading with policy enforcement. -""" - -from __future__ import annotations - -from typing import Any, Optional - -from ..sdk import JupiterDataClient, JupiterUltraClient, RugCheckClient, Utils -from .config import get_jupiter_api_key, get_keypair, get_wallet_address -from .intents import create_intent, get_intent, mark_executed -from .policy import KNOWN_SAFE_MINTS, check_policy, is_known_safe_mint - - -# Well-known token symbols to mint addresses -SYMBOL_TO_MINT: dict[str, str] = { - "SOL": "So11111111111111111111111111111111111111112", - "WSOL": "So11111111111111111111111111111111111111112", - "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", - "USDT": "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB", - "MSOL": "mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So", - "STSOL": "7dHbWXmci3dT8UFYWYZweBLXgycu7Y3iL6trKn1Y7ARj", - "BONK": "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263", - "JUP": "JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN", - "WIF": "EKpQGSJtjMFqKZ9KQanSqYXRcF8fBopzLHYxdM65zcjm", - "PYTH": "HZ1JovNiVvGrGNiiYvEozEVgZ58xaU3RKwX8eACQBCt3", - "RAY": "4k3Dyjzvzp8eMZWUXbBCjEvwSkkk59S5iCNLY3QrkX6R", -} - -# Token decimals for common tokens -TOKEN_DECIMALS: dict[str, int] = { - "So11111111111111111111111111111111111111112": 9, # SOL - "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v": 6, # USDC - "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB": 6, # USDT -} - -# Default decimals for unknown tokens -DEFAULT_DECIMALS = 9 - - -def resolve_token(token: str) -> Optional[str]: - """ - Resolve a token symbol or mint address to a mint address. - - Args: - token: Token symbol (e.g., "SOL") or mint address - - Returns: - Mint address or None if not found - """ - # Check if it's already a mint address (base58, 32-44 chars) - if len(token) >= 32 and len(token) <= 44: - return token - - # Check known symbols - upper = token.upper() - if upper in SYMBOL_TO_MINT: - return SYMBOL_TO_MINT[upper] - - return None - - -def get_token_decimals(mint: str) -> int: - """Get decimals for a token (default 9 if unknown).""" - return TOKEN_DECIMALS.get(mint, DEFAULT_DECIMALS) - - -async def solana_get_price(token: str) -> dict[str, Any]: - """ - Get current USD price for a token. - - Args: - token: Token mint address OR symbol (e.g., "SOL", "BONK") - - Returns: - Dict with mint, symbol, price_usd, and optional market_cap - """ - # Resolve symbol to mint if needed - mint = resolve_token(token) - - if not mint: - # Try searching for the token - data_client = JupiterDataClient(api_key=get_jupiter_api_key()) - results = await data_client.search_token(token) - if results: - mint = results[0].get("address") - else: - return {"error": f"Token not found: {token}"} - - # Get price - data_client = JupiterDataClient(api_key=get_jupiter_api_key()) - price_data = await data_client.get_price(mint) - - if not price_data: - return {"error": f"Price not available for {mint}"} - - # Get token info for symbol/market cap - token_info = await data_client.get_token_info(mint) - - result: dict[str, Any] = { - "mint": mint, - "symbol": token_info.get("symbol") if token_info else None, - "price_usd": float(price_data.get("price", 0)), - } - - if token_info and "mcap" in token_info: - result["market_cap"] = token_info["mcap"] - - return result - - -async def solana_search_token(query: str) -> list[dict[str, Any]]: - """ - Search for tokens by name or symbol. - - Args: - query: Search term (e.g., "bonk", "pepe") - - Returns: - List of matching tokens with symbol, name, mint, verified, liquidity - """ - data_client = JupiterDataClient(api_key=get_jupiter_api_key()) - results = await data_client.search_token(query) - - tokens = [] - for token in results[:10]: # Limit to top 10 - tokens.append({ - "symbol": token.get("symbol", ""), - "name": token.get("name", ""), - "mint": token.get("address", ""), - "verified": token.get("verified", False), - "liquidity": token.get("liquidity"), - }) - - return tokens - - -async def solana_check_token(mint_address: str) -> dict[str, Any]: - """ - Run rugcheck safety analysis on a token. - - Args: - mint_address: Token mint address (NOT symbol) - - Returns: - Dict with is_safe, score, risk_factors, and reason - """ - if not Utils.is_valid_solana_address(mint_address): - return {"error": "Invalid mint address. Must be a Solana address, not a symbol."} - - # Known safe tokens always pass - if is_known_safe_mint(mint_address): - return { - "is_safe": True, - "score": 0, - "risk_factors": [], - "reason": "Known safe token (SOL/USDC/USDT/etc)", - } - - rugcheck = RugCheckClient() - result = await rugcheck.check_token(mint_address) - - risk_factors = [] - for risk in result.get("risks", []): - level = risk.get("level", "") - name = risk.get("name", "") - if level in ["danger", "critical", "warning"]: - risk_factors.append(f"[{level.upper()}] {name}") - - return { - "is_safe": result.get("is_safe", False), - "score": result.get("score"), - "risk_factors": risk_factors, - "reason": result.get("reason", ""), - } - - -async def solana_get_wallet(address: Optional[str] = None) -> dict[str, Any]: - """ - Get wallet balances and holdings. - - Args: - address: Wallet address (optional - defaults to user's configured wallet) - - Returns: - Dict with address, sol_balance, sol_value_usd, and tokens list - """ - # Use provided address or default to user's wallet - if not address: - address = get_wallet_address() - if not address: - return {"error": "No wallet configured. Set SOLANA_PRIVATE_KEY."} - - if not Utils.is_valid_solana_address(address): - return {"error": "Invalid wallet address"} - - ultra_client = JupiterUltraClient(api_key=get_jupiter_api_key()) - holdings = await ultra_client.get_holdings(address) - - # Parse holdings response - sol_balance = holdings.get("uiAmount", 0) - tokens_data = holdings.get("tokens", {}) - - tokens = [] - for mint, token_info in tokens_data.items(): - tokens.append({ - "mint": mint, - "symbol": token_info.get("symbol", ""), - "amount": token_info.get("uiAmount", 0), - "value_usd": token_info.get("usdValue"), - }) - - return { - "address": address, - "sol_balance": sol_balance, - "sol_value_usd": holdings.get("usdValue"), - "tokens": tokens, - } - - -async def solana_quote( - from_mint: str, - to_mint: str, - amount: str, - slippage_bps: int = 50, -) -> dict[str, Any]: - """ - Get a swap quote and create an intent (does NOT execute). - - Policy checks run here - blocks if trade fails safety checks. - - Args: - from_mint: Token to sell (MUST be mint address) - to_mint: Token to buy (MUST be mint address) - amount: Amount to swap (string, in token units e.g., "1.5" for 1.5 SOL) - slippage_bps: Slippage tolerance (default 50 = 0.5%) - - Returns: - Dict with intent_id, amounts, price_impact, route, and expiry - """ - # Validate inputs are mint addresses, not symbols - if not Utils.is_valid_solana_address(from_mint): - return { - "error": f"from_mint must be a valid mint address, not a symbol. Got: {from_mint}" - } - if not Utils.is_valid_solana_address(to_mint): - return { - "error": f"to_mint must be a valid mint address, not a symbol. Got: {to_mint}" - } - - # Get user's wallet - keypair = get_keypair() - if not keypair: - return {"error": "No wallet configured. Set SOLANA_PRIVATE_KEY."} - - taker = str(keypair.pubkey()) - - # Convert amount to atomic units - decimals = get_token_decimals(from_mint) - try: - amount_float = float(amount) - amount_atomic = int(amount_float * (10**decimals)) - except ValueError: - return {"error": f"Invalid amount: {amount}"} - - # Get price of from_token to calculate USD value - data_client = JupiterDataClient(api_key=get_jupiter_api_key()) - price_data = await data_client.get_price(from_mint) - if price_data: - price_usd = float(price_data.get("price", 0)) - amount_usd = amount_float * price_usd - else: - # If we can't get price, assume $0 (will pass USD check) - amount_usd = 0 - - # Run rugcheck on destination token (if not known safe) - rugcheck_result = None - if not is_known_safe_mint(to_mint): - rugcheck = RugCheckClient() - rugcheck_result = await rugcheck.check_token(to_mint) - - # Run policy checks - policy_result = check_policy( - from_mint=from_mint, - to_mint=to_mint, - amount_usd=amount_usd, - slippage_bps=slippage_bps, - rugcheck_result=rugcheck_result, - ) - - if not policy_result.allowed: - return { - "error": "Policy blocked", - "reason": policy_result.reason, - "checks_passed": policy_result.checks_passed, - "checks_failed": policy_result.checks_failed, - } - - # Get quote from Jupiter - ultra_client = JupiterUltraClient(api_key=get_jupiter_api_key()) - order = await ultra_client.get_order( - input_mint=from_mint, - output_mint=to_mint, - amount=amount_atomic, - taker=taker, - slippage_bps=slippage_bps, - ) - - if order.get("errorCode"): - return { - "error": "Quote failed", - "reason": order.get("errorMessage", "Unknown error"), - } - - if not order.get("transaction"): - return {"error": "No transaction returned from quote"} - - # Calculate output amount in UI units - out_decimals = get_token_decimals(to_mint) - out_amount_atomic = int(order.get("outAmount", 0)) - out_amount_ui = out_amount_atomic / (10**out_decimals) - - # Create intent - intent_id = create_intent( - from_mint=from_mint, - to_mint=to_mint, - amount=amount, - slippage_bps=slippage_bps, - out_amount_est=str(out_amount_ui), - unsigned_tx=order["transaction"], - request_id=order["requestId"], - ) - - # Get token symbols for route summary - from_symbol = None - to_symbol = None - - # Check known symbols first - for symbol, mint in SYMBOL_TO_MINT.items(): - if mint == from_mint: - from_symbol = symbol - if mint == to_mint: - to_symbol = symbol - - # Fallback to API lookup if needed - if not from_symbol or not to_symbol: - if not from_symbol: - info = await data_client.get_token_info(from_mint) - from_symbol = info.get("symbol", from_mint[:8]) if info else from_mint[:8] - if not to_symbol: - info = await data_client.get_token_info(to_mint) - to_symbol = info.get("symbol", to_mint[:8]) if info else to_mint[:8] - - # Calculate expiry - intent = get_intent(intent_id) - expires_at = intent.expires_at.isoformat() if intent else None - - return { - "intent_id": intent_id, - "from_mint": from_mint, - "to_mint": to_mint, - "in_amount": amount, - "out_amount_est": f"{out_amount_ui:.6f}".rstrip("0").rstrip("."), - "price_impact_pct": order.get("priceImpact", 0), - "route_summary": f"{from_symbol} -> {to_symbol}", - "expires_at": expires_at, - "policy_checks_passed": policy_result.checks_passed, - } - - -async def solana_swap_confirm(intent_id: str) -> dict[str, Any]: - """ - Execute a previously quoted intent. - - Args: - intent_id: UUID from solana_quote - - Returns: - Dict with success, signature, amounts, and explorer_url - """ - # Get the intent - intent = get_intent(intent_id) - - if not intent: - return {"error": "Intent not found or expired. Please create a new quote."} - - if intent.executed: - return {"error": "Intent already executed. Each quote can only be used once."} - - # Get user's keypair for signing - keypair = get_keypair() - if not keypair: - return {"error": "No wallet configured. Set SOLANA_PRIVATE_KEY."} - - # Sign the transaction - ultra_client = JupiterUltraClient(api_key=get_jupiter_api_key()) - signed_tx = ultra_client.sign_transaction(intent.unsigned_tx, keypair) - - # Execute the swap - result = await ultra_client.execute_swap(signed_tx, intent.request_id) - - # Mark intent as executed (prevent replay) - mark_executed(intent_id) - - status = result.get("status") - signature = result.get("signature", "") - - if status == "Success": - # Calculate actual output amount - out_decimals = get_token_decimals(intent.to_mint) - out_amount_atomic = int(result.get("outputAmountResult", 0)) - out_amount_ui = out_amount_atomic / (10**out_decimals) - - return { - "success": True, - "signature": signature, - "from_mint": intent.from_mint, - "to_mint": intent.to_mint, - "in_amount": intent.amount, - "out_amount_actual": f"{out_amount_ui:.6f}".rstrip("0").rstrip("."), - "explorer_url": f"https://solscan.io/tx/{signature}", - } - else: - return { - "success": False, - "error": result.get("error", "Swap failed"), - "signature": signature, - "from_mint": intent.from_mint, - "to_mint": intent.to_mint, - "in_amount": intent.amount, - }