diff --git a/tools/src/aden_tools/credentials/__init__.py b/tools/src/aden_tools/credentials/__init__.py index 6538256fb8..93d00f7914 100644 --- a/tools/src/aden_tools/credentials/__init__.py +++ b/tools/src/aden_tools/credentials/__init__.py @@ -39,6 +39,7 @@ - github.py: GitHub API credentials - hubspot.py: HubSpot CRM credentials - slack.py: Slack workspace credentials +- x.py: X (Twitter) API credentials Note: Tools that don't need credentials simply omit the 'credentials' parameter from their register_tools() function. This convention is enforced by CI tests. @@ -65,6 +66,7 @@ ) from .slack import SLACK_CREDENTIALS from .store_adapter import CredentialStoreAdapter +from .x import X_CREDENTIALS # Merged registry of all credentials CREDENTIAL_SPECS = { @@ -74,6 +76,7 @@ **GITHUB_CREDENTIALS, **HUBSPOT_CREDENTIALS, **SLACK_CREDENTIALS, + **X_CREDENTIALS, } __all__ = [ @@ -104,4 +107,5 @@ "GITHUB_CREDENTIALS", "HUBSPOT_CREDENTIALS", "SLACK_CREDENTIALS", + "X_CREDENTIALS", ] diff --git a/tools/src/aden_tools/credentials/x.py b/tools/src/aden_tools/credentials/x.py new file mode 100644 index 0000000000..78743fa3a5 --- /dev/null +++ b/tools/src/aden_tools/credentials/x.py @@ -0,0 +1,106 @@ +""" +X (Twitter) tool credentials. + +Contains credentials for X API v2 integration. +Bearer token for read-only operations, OAuth 1.0a keys for write operations. +""" + +from .base import CredentialSpec + +_X_TOOLS = [ + "x_post_tweet", + "x_reply_tweet", + "x_delete_tweet", + "x_search_tweets", + "x_get_mentions", + "x_send_dm", +] + +X_CREDENTIALS = { + "x_bearer_token": CredentialSpec( + env_var="X_BEARER_TOKEN", + tools=_X_TOOLS, + required=True, + startup_required=False, + help_url="https://developer.x.com/en/portal/dashboard", + description="X (Twitter) API v2 Bearer Token for read-only operations", + direct_api_key_supported=True, + api_key_instructions="""To get an X API Bearer Token: +1. Go to https://developer.x.com/en/portal/dashboard +2. Create a Project & App (or select existing) +3. Go to Keys & Tokens tab +4. Copy the Bearer Token +5. Set it as X_BEARER_TOKEN environment variable""", + health_check_endpoint="https://api.x.com/2/users/me", + health_check_method="GET", + credential_id="x_bearer_token", + credential_key="api_key", + credential_group="x", + ), + "x_api_key": CredentialSpec( + env_var="X_API_KEY", + tools=_X_TOOLS, + required=False, + startup_required=False, + help_url="https://developer.x.com/en/portal/dashboard", + description="X (Twitter) API Consumer Key for OAuth 1.0a write operations", + direct_api_key_supported=True, + api_key_instructions="""To get your X API Consumer Key: +1. Go to https://developer.x.com/en/portal/dashboard +2. Select your app > Keys and Tokens +3. Under Consumer Keys, copy the API Key""", + credential_id="x_api_key", + credential_key="api_key", + credential_group="x", + ), + "x_api_secret": CredentialSpec( + env_var="X_API_SECRET", + tools=_X_TOOLS, + required=False, + startup_required=False, + help_url="https://developer.x.com/en/portal/dashboard", + description="X (Twitter) API Consumer Secret for OAuth 1.0a write operations", + direct_api_key_supported=True, + api_key_instructions="""To get your X API Consumer Secret: +1. Go to https://developer.x.com/en/portal/dashboard +2. Select your app > Keys and Tokens +3. Under Consumer Keys, copy the API Secret""", + credential_id="x_api_secret", + credential_key="api_key", + credential_group="x", + ), + "x_access_token": CredentialSpec( + env_var="X_ACCESS_TOKEN", + tools=_X_TOOLS, + required=False, + startup_required=False, + help_url="https://developer.x.com/en/portal/dashboard", + description="X (Twitter) User Access Token for OAuth 1.0a write operations", + direct_api_key_supported=True, + api_key_instructions="""To get your X Access Token: +1. Go to https://developer.x.com/en/portal/dashboard +2. Select your app > Keys and Tokens +3. Under Authentication Tokens, generate Access Token and Secret +4. Copy the Access Token""", + credential_id="x_access_token", + credential_key="api_key", + credential_group="x", + ), + "x_access_token_secret": CredentialSpec( + env_var="X_ACCESS_TOKEN_SECRET", + tools=_X_TOOLS, + required=False, + startup_required=False, + help_url="https://developer.x.com/en/portal/dashboard", + description="X (Twitter) User Access Token Secret for OAuth 1.0a write operations", + direct_api_key_supported=True, + api_key_instructions="""To get your X Access Token Secret: +1. Go to https://developer.x.com/en/portal/dashboard +2. Select your app > Keys and Tokens +3. Under Authentication Tokens, generate Access Token and Secret +4. Copy the Access Token Secret""", + credential_id="x_access_token_secret", + credential_key="api_key", + credential_group="x", + ), +} diff --git a/tools/src/aden_tools/tools/__init__.py b/tools/src/aden_tools/tools/__init__.py index 54d9e58906..87f3bf3a9b 100644 --- a/tools/src/aden_tools/tools/__init__.py +++ b/tools/src/aden_tools/tools/__init__.py @@ -45,6 +45,7 @@ from .slack_tool import register_tools as register_slack from .web_scrape_tool import register_tools as register_web_scrape from .web_search_tool import register_tools as register_web_search +from .x_tool import register_tools as register_x def register_all_tools( @@ -75,6 +76,7 @@ def register_all_tools( register_email(mcp, credentials=credentials) register_hubspot(mcp, credentials=credentials) register_slack(mcp, credentials=credentials) + register_x(mcp, credentials=credentials) # Register file system toolkits register_view_file(mcp) @@ -198,6 +200,13 @@ def register_all_tools( "slack_kick_user_from_channel", "slack_delete_file", "slack_get_team_stats", + # X (Twitter) tools + "x_search_tweets", + "x_get_mentions", + "x_post_tweet", + "x_reply_tweet", + "x_delete_tweet", + "x_send_dm", ] diff --git a/tools/src/aden_tools/tools/x_tool/README.md b/tools/src/aden_tools/tools/x_tool/README.md new file mode 100644 index 0000000000..c8b34bd01b --- /dev/null +++ b/tools/src/aden_tools/tools/x_tool/README.md @@ -0,0 +1,84 @@ +# X (Twitter) Tool + +Hive integration for the X (Twitter) API v2. + +Enables agents to post tweets, reply to users, search recent tweets, and monitor mentions — allowing social automation workflows directly inside Hive. + +## Features + +- Post tweets +- Reply to tweets +- Delete tweets +- Search recent tweets +- Fetch user mentions + +## Tools + +| Tool | Description | +|--------|-------------| +| x_post_tweet | Post a new tweet | +| x_reply_tweet | Reply to an existing tweet | +| x_delete_tweet | Delete a tweet | +| x_search_tweets | Search recent tweets by query | +| x_get_mentions | Fetch mentions for a user | + + +## Authentication + +This integration uses an **X API v2 Bearer Token**. + +### Option 1 — Environment variable + +export X_BEARER_TOKEN=your_token_here + +### Option 2 — Hive credential store (recommended) + +Configure credential id: + +x + +Hive will automatically inject credentials into all `x_*` tools. + +## How to get a Bearer Token + +1. Go to https://developer.x.com/ +2. Create a Project & App +3. Enable API v2 access +4. Open **Keys & Tokens** +5. Copy the **Bearer Token** + +## Example Usage + +### Post a tweet + +x_post_tweet("Hello from Hive 🚀") + +### Reply to a tweet + +x_reply_tweet(tweet_id="123456789", text="Thanks for the mention!") + +### Search tweets + +x_search_tweets(query="AI agents", max_results=5) + +### Get mentions + +x_get_mentions(user_id="2244994945") + +## Notes + +- Uses lightweight httpx client (no external SDK) +- Follows HubSpot tool architecture for consistency +- Compatible with Hive CredentialStoreAdapter +- Handles rate limits and common HTTP errors gracefully +- Max results capped at 100 per request (API limit) + +## Development + +Run tests: + +pytest + +Start MCP server: + +python mcp_server.py diff --git a/tools/src/aden_tools/tools/x_tool/__init__.py b/tools/src/aden_tools/tools/x_tool/__init__.py new file mode 100644 index 0000000000..bdd6a26c1d --- /dev/null +++ b/tools/src/aden_tools/tools/x_tool/__init__.py @@ -0,0 +1,11 @@ +""" +X (Twitter) Tool - Post tweets, reply, search, and read mentions via X API v2. + +Supports: +- Bearer tokens (X_BEARER_TOKEN) +- OAuth2 tokens via credential store +""" + +from .x_tool import register_tools + +__all__ = ["register_tools"] diff --git a/tools/src/aden_tools/tools/x_tool/test/test_x_credentials.py b/tools/src/aden_tools/tools/x_tool/test/test_x_credentials.py new file mode 100644 index 0000000000..86d241c357 --- /dev/null +++ b/tools/src/aden_tools/tools/x_tool/test/test_x_credentials.py @@ -0,0 +1,9 @@ +from aden_tools.credentials.integrations import INTEGRATION_CREDENTIALS + + +def test_x_credential_spec_exists(): + assert "x" in INTEGRATION_CREDENTIALS + spec = INTEGRATION_CREDENTIALS["x"] + + assert spec.env_var == "X_BEARER_TOKEN" + assert "x_post_tweet" in spec.tools diff --git a/tools/src/aden_tools/tools/x_tool/test/test_x_tool.py b/tools/src/aden_tools/tools/x_tool/test/test_x_tool.py new file mode 100644 index 0000000000..e6289fa78b --- /dev/null +++ b/tools/src/aden_tools/tools/x_tool/test/test_x_tool.py @@ -0,0 +1,23 @@ +import unittest +from unittest.mock import patch + +import httpx + +from aden_tools.tools.x_tool.x_tool import ( + _XClient, +) + + +class TestXClient(unittest.TestCase): + @patch("httpx.request") + def test_post(self, mock_req): + mock_req.return_value = httpx.Response(200, json={"data": {"id": "1", "text": "hi"}}) + + client = _XClient("fake") + res = client.request("POST", "/tweets", json={"text": "hi"}) + + self.assertIn("data", res) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/src/aden_tools/tools/x_tool/x_tool.py b/tools/src/aden_tools/tools/x_tool/x_tool.py new file mode 100644 index 0000000000..39491d06b3 --- /dev/null +++ b/tools/src/aden_tools/tools/x_tool/x_tool.py @@ -0,0 +1,425 @@ +""" +X (Twitter) Tool - Post tweets, reply, search, read mentions, and send DMs via X API v2. + +Authentication: +- Bearer token (X_BEARER_TOKEN): read-only operations (search, mentions). +- OAuth 1.0a User Context: write operations (post, reply, delete, DM). + Requires X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET. + +API Reference: https://developer.x.com/en/docs/twitter-api +""" + +from __future__ import annotations + +import hashlib +import hmac +import os +import time +import urllib.parse +import uuid +from typing import TYPE_CHECKING, Any + +import httpx +from fastmcp import FastMCP + +if TYPE_CHECKING: + from aden_tools.credentials import CredentialStoreAdapter + + +X_API_BASE = "https://api.x.com/2" + + +class _XClient: + """Internal client wrapping X API v2 calls with Bearer token auth.""" + + def __init__(self, bearer_token: str): + self._bearer_token = bearer_token + + @property + def _headers(self) -> dict[str, str]: + return { + "Authorization": f"Bearer {self._bearer_token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + def _handle_response(self, response: httpx.Response) -> dict[str, Any]: + if response.status_code == 401: + return {"error": "Invalid or expired X access token"} + if response.status_code == 403: + return { + "error": "Insufficient permissions — this operation may require OAuth 1.0a " + "user context authentication (not just a Bearer token).", + "help": "Set X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, and " + "X_ACCESS_TOKEN_SECRET for write operations.", + } + if response.status_code == 404: + return {"error": "Resource not found"} + if response.status_code == 429: + return {"error": "Rate limit exceeded. Try again later."} + if response.status_code >= 400: + try: + detail = response.json() + except Exception: + detail = response.text + return {"error": f"X API error (HTTP {response.status_code}): {detail}"} + return response.json() + + def get(self, endpoint: str, params: dict | None = None) -> dict[str, Any]: + """Make a GET request with Bearer auth.""" + response = httpx.get( + f"{X_API_BASE}{endpoint}", + headers=self._headers, + params=params, + timeout=30.0, + ) + return self._handle_response(response) + + +class _XOAuthClient: + """Internal client wrapping X API v2 calls with OAuth 1.0a user context auth.""" + + def __init__( + self, + api_key: str, + api_secret: str, + access_token: str, + access_token_secret: str, + ): + self._api_key = api_key + self._api_secret = api_secret + self._access_token = access_token + self._access_token_secret = access_token_secret + + def _generate_oauth_signature( + self, + method: str, + url: str, + oauth_params: dict[str, str], + body_params: dict[str, str] | None = None, + ) -> str: + """Generate OAuth 1.0a HMAC-SHA1 signature.""" + all_params = {**oauth_params} + if body_params: + all_params.update(body_params) + + # Sort and encode params + sorted_params = sorted(all_params.items()) + param_string = "&".join( + f"{urllib.parse.quote(k, safe='')}={urllib.parse.quote(str(v), safe='')}" + for k, v in sorted_params + ) + + # Create signature base string + base_string = ( + f"{method.upper()}&" + f"{urllib.parse.quote(url, safe='')}&" + f"{urllib.parse.quote(param_string, safe='')}" + ) + + # Create signing key + signing_key = ( + f"{urllib.parse.quote(self._api_secret, safe='')}&" + f"{urllib.parse.quote(self._access_token_secret, safe='')}" + ) + + # HMAC-SHA1 + import base64 + + signature = base64.b64encode( + hmac.new( + signing_key.encode("utf-8"), + base_string.encode("utf-8"), + hashlib.sha1, + ).digest() + ).decode("utf-8") + + return signature + + def _build_auth_header(self, method: str, url: str) -> str: + """Build the OAuth 1.0a Authorization header.""" + oauth_params = { + "oauth_consumer_key": self._api_key, + "oauth_nonce": uuid.uuid4().hex, + "oauth_signature_method": "HMAC-SHA1", + "oauth_timestamp": str(int(time.time())), + "oauth_token": self._access_token, + "oauth_version": "1.0", + } + + signature = self._generate_oauth_signature(method, url, oauth_params) + oauth_params["oauth_signature"] = signature + + header_parts = [ + f'{urllib.parse.quote(k, safe="")}="{urllib.parse.quote(v, safe="")}"' + for k, v in sorted(oauth_params.items()) + ] + return "OAuth " + ", ".join(header_parts) + + def _handle_response(self, response: httpx.Response) -> dict[str, Any]: + if response.status_code == 401: + return { + "error": "OAuth 1.0a authentication failed — check your API keys and tokens", + "help": "Verify X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, " + "and X_ACCESS_TOKEN_SECRET are correct.", + } + if response.status_code == 403: + try: + detail = response.json() + except Exception: + detail = response.text + return { + "error": f"Insufficient permissions (HTTP 403): {detail}", + "help": "Ensure your X app has Read+Write+Direct Message permissions " + "and tokens were regenerated AFTER enabling them.", + } + if response.status_code == 404: + return {"error": "Resource not found"} + if response.status_code == 429: + return {"error": "Rate limit exceeded. Try again later."} + if response.status_code >= 400: + try: + detail = response.json() + except Exception: + detail = response.text + return {"error": f"X API error (HTTP {response.status_code}): {detail}"} + return response.json() + + def post(self, endpoint: str, json_body: dict | None = None) -> dict[str, Any]: + """Make a POST request with OAuth 1.0a auth.""" + url = f"{X_API_BASE}{endpoint}" + auth_header = self._build_auth_header("POST", url) + headers = { + "Authorization": auth_header, + "Content-Type": "application/json", + "Accept": "application/json", + } + response = httpx.post(url, headers=headers, json=json_body, timeout=30.0) + return self._handle_response(response) + + def delete(self, endpoint: str) -> dict[str, Any]: + """Make a DELETE request with OAuth 1.0a auth.""" + url = f"{X_API_BASE}{endpoint}" + auth_header = self._build_auth_header("DELETE", url) + headers = { + "Authorization": auth_header, + "Accept": "application/json", + } + response = httpx.delete(url, headers=headers, timeout=30.0) + return self._handle_response(response) + + +def register_tools( + mcp: FastMCP, + credentials: CredentialStoreAdapter | None = None, +) -> None: + """Register X (Twitter) tools with the MCP server.""" + + def _get_credential(env_var: str, cred_name: str) -> str | None: + """Get a credential from the credential manager or environment.""" + if credentials is not None: + val = credentials.get(cred_name) + if val is not None and not isinstance(val, str): + raise TypeError( + f"Expected string for credential '{cred_name}', got {type(val).__name__}" + ) + return val + return os.getenv(env_var) + + def _get_bearer_client() -> _XClient | dict[str, str]: + """Get a Bearer-token client for read-only operations.""" + token = _get_credential("X_BEARER_TOKEN", "x_bearer_token") + if not token: + return { + "error": "X Bearer token not configured", + "help": "Set X_BEARER_TOKEN environment variable. " + "Get it from https://developer.x.com/ > Keys & Tokens.", + } + return _XClient(token) + + def _get_oauth_client() -> _XOAuthClient | dict[str, str]: + """Get an OAuth 1.0a client for write operations.""" + api_key = _get_credential("X_API_KEY", "x_api_key") + api_secret = _get_credential("X_API_SECRET", "x_api_secret") + access_token = _get_credential("X_ACCESS_TOKEN", "x_access_token") + access_secret = _get_credential("X_ACCESS_TOKEN_SECRET", "x_access_token_secret") + + if not all([api_key, api_secret, access_token, access_secret]): + missing = [] + if not api_key: + missing.append("X_API_KEY") + if not api_secret: + missing.append("X_API_SECRET") + if not access_token: + missing.append("X_ACCESS_TOKEN") + if not access_secret: + missing.append("X_ACCESS_TOKEN_SECRET") + return { + "error": f"X OAuth credentials not configured: {', '.join(missing)}", + "help": "Write operations (post, reply, delete, DM) require OAuth 1.0a. " + "Set all 4 env vars from https://developer.x.com/ > Keys & Tokens.", + } + return _XOAuthClient(api_key, api_secret, access_token, access_secret) + + # ── Read-only tools (Bearer token) ────────────────────────────── + + @mcp.tool() + def x_search_tweets(query: str, max_results: int = 10) -> dict: + """Search recent tweets by keyword or query. + + Uses Bearer token authentication (read-only). + + Args: + query: Search query string (supports X search operators). + max_results: Number of results to return (1-100, default 10). + + Returns: + Dict with matching tweets or error details. + """ + client = _get_bearer_client() + if isinstance(client, dict): + return client + params = {"query": query, "max_results": min(max(max_results, 1), 100)} + try: + return client.get("/tweets/search/recent", params=params) + except httpx.TimeoutException: + return {"error": "X API request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + @mcp.tool() + def x_get_mentions(user_id: str, max_results: int = 10) -> dict: + """Fetch recent mentions for a user. + + Uses Bearer token authentication (read-only). + + Args: + user_id: The X user ID to fetch mentions for. + max_results: Number of results to return (1-100, default 10). + + Returns: + Dict with mention tweets or error details. + """ + client = _get_bearer_client() + if isinstance(client, dict): + return client + params = {"max_results": min(max(max_results, 1), 100)} + try: + return client.get(f"/users/{user_id}/mentions", params=params) + except httpx.TimeoutException: + return {"error": "X API request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + # ── Write tools (OAuth 1.0a required) ─────────────────────────── + + @mcp.tool() + def x_post_tweet(text: str) -> dict: + """Post a new tweet. + + Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET, + X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET). + + Args: + text: Tweet text content (max 280 characters). + + Returns: + Dict with created tweet data or error details. + """ + if not text or not text.strip(): + return {"error": "Tweet text cannot be empty"} + if len(text) > 280: + return {"error": f"Tweet text exceeds 280 characters ({len(text)} chars)"} + + client = _get_oauth_client() + if isinstance(client, dict): + return client + try: + return client.post("/tweets", json_body={"text": text}) + except httpx.TimeoutException: + return {"error": "X API request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + @mcp.tool() + def x_reply_tweet(tweet_id: str, text: str) -> dict: + """Reply to an existing tweet. + + Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET, + X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET). + + Args: + tweet_id: The ID of the tweet to reply to. + text: Reply text content (max 280 characters). + + Returns: + Dict with created reply data or error details. + """ + if not text or not text.strip(): + return {"error": "Reply text cannot be empty"} + if len(text) > 280: + return {"error": f"Reply text exceeds 280 characters ({len(text)} chars)"} + + client = _get_oauth_client() + if isinstance(client, dict): + return client + body = {"text": text, "reply": {"in_reply_to_tweet_id": tweet_id}} + try: + return client.post("/tweets", json_body=body) + except httpx.TimeoutException: + return {"error": "X API request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + @mcp.tool() + def x_delete_tweet(tweet_id: str) -> dict: + """Delete a tweet. + + Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET, + X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET). + + Args: + tweet_id: The ID of the tweet to delete. + + Returns: + Dict with deletion confirmation or error details. + """ + client = _get_oauth_client() + if isinstance(client, dict): + return client + try: + return client.delete(f"/tweets/{tweet_id}") + except httpx.TimeoutException: + return {"error": "X API request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} + + @mcp.tool() + def x_send_dm(participant_id: str, text: str) -> dict: + """Send a direct message to a user on X. + + Requires OAuth 1.0a authentication (X_API_KEY, X_API_SECRET, + X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET). Your X app must have + Direct Message permissions enabled. + + Args: + participant_id: The X user ID of the DM recipient. + text: Message text content. + + Returns: + Dict with DM event data or error details. + """ + if not text or not text.strip(): + return {"error": "DM text cannot be empty"} + + client = _get_oauth_client() + if isinstance(client, dict): + return client + + body = {"text": text} + try: + return client.post(f"/dm_conversations/with/{participant_id}/messages", json_body=body) + except httpx.TimeoutException: + return {"error": "X API request timed out"} + except httpx.RequestError as e: + return {"error": f"Network error: {e}"} diff --git a/tools/tests/tools/test_x_tool.py b/tools/tests/tools/test_x_tool.py new file mode 100644 index 0000000000..914aa3a11d --- /dev/null +++ b/tools/tests/tools/test_x_tool.py @@ -0,0 +1,617 @@ +"""Tests for the X (Twitter) tool.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import httpx +import pytest +from fastmcp import FastMCP + +from aden_tools.tools.x_tool.x_tool import ( + X_API_BASE, + _XClient, + _XOAuthClient, + register_tools, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mcp_with_x(monkeypatch): + """MCP server with all X credentials set.""" + monkeypatch.setenv("X_BEARER_TOKEN", "test-bearer-token") + monkeypatch.setenv("X_API_KEY", "test-api-key") + monkeypatch.setenv("X_API_SECRET", "test-api-secret") + monkeypatch.setenv("X_ACCESS_TOKEN", "test-access-token") + monkeypatch.setenv("X_ACCESS_TOKEN_SECRET", "test-access-secret") + + mcp = FastMCP("test-x") + register_tools(mcp) + return mcp + + +@pytest.fixture +def mcp_bearer_only(monkeypatch): + """MCP server with only Bearer token set (no OAuth).""" + monkeypatch.setenv("X_BEARER_TOKEN", "test-bearer-token") + monkeypatch.delenv("X_API_KEY", raising=False) + monkeypatch.delenv("X_API_SECRET", raising=False) + monkeypatch.delenv("X_ACCESS_TOKEN", raising=False) + monkeypatch.delenv("X_ACCESS_TOKEN_SECRET", raising=False) + + mcp = FastMCP("test-x-bearer-only") + register_tools(mcp) + return mcp + + +@pytest.fixture +def mcp_no_creds(monkeypatch): + """MCP server with no X credentials set.""" + monkeypatch.delenv("X_BEARER_TOKEN", raising=False) + monkeypatch.delenv("X_API_KEY", raising=False) + monkeypatch.delenv("X_API_SECRET", raising=False) + monkeypatch.delenv("X_ACCESS_TOKEN", raising=False) + monkeypatch.delenv("X_ACCESS_TOKEN_SECRET", raising=False) + + mcp = FastMCP("test-x-no-creds") + register_tools(mcp) + return mcp + + +def _get_tool_fn(mcp, tool_name): + """Get a tool function from the MCP server.""" + return mcp._tool_manager._tools[tool_name].fn + + +def _make_response(status_code=200, json_data=None): + """Create a mock httpx response.""" + mock_resp = MagicMock() + mock_resp.status_code = status_code + mock_resp.json.return_value = json_data or {} + mock_resp.text = "{}" + return mock_resp + + +# --------------------------------------------------------------------------- +# TestXClient (Bearer token client) +# --------------------------------------------------------------------------- + + +class TestXClient: + """Test the Bearer token client.""" + + def setup_method(self): + self.client = _XClient("test-bearer") + + def test_headers(self): + headers = self.client._headers + assert headers["Authorization"] == "Bearer test-bearer" + assert headers["Content-Type"] == "application/json" + + @pytest.mark.parametrize( + "status_code,expected_substring", + [ + (401, "invalid or expired"), + (403, "insufficient permissions"), + (404, "not found"), + (429, "rate limit"), + ], + ) + def test_handle_response_errors(self, status_code, expected_substring): + response = _make_response(status_code) + result = self.client._handle_response(response) + assert "error" in result + assert expected_substring in result["error"].lower() + + def test_handle_response_success(self): + response = _make_response(200, {"data": [{"id": "1"}]}) + result = self.client._handle_response(response) + assert result == {"data": [{"id": "1"}]} + + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_get_request(self, mock_get): + mock_get.return_value = _make_response(200, {"data": []}) + result = self.client.get("/tweets/search/recent", params={"query": "test"}) + + mock_get.assert_called_once_with( + f"{X_API_BASE}/tweets/search/recent", + headers=self.client._headers, + params={"query": "test"}, + timeout=30.0, + ) + assert result == {"data": []} + + def test_handle_generic_4xx_error(self): + response = _make_response(400, {"detail": "Bad request"}) + result = self.client._handle_response(response) + assert "error" in result + assert "400" in result["error"] + + +# --------------------------------------------------------------------------- +# TestXOAuthClient +# --------------------------------------------------------------------------- + + +class TestXOAuthClient: + """Test the OAuth 1.0a client.""" + + def setup_method(self): + self.client = _XOAuthClient( + api_key="test-key", + api_secret="test-secret", + access_token="test-access", + access_token_secret="test-access-secret", + ) + + def test_oauth_signature_generation(self): + oauth_params = { + "oauth_consumer_key": "test-key", + "oauth_nonce": "testnonce", + "oauth_signature_method": "HMAC-SHA1", + "oauth_timestamp": "1234567890", + "oauth_token": "test-access", + "oauth_version": "1.0", + } + sig = self.client._generate_oauth_signature( + "POST", "https://api.x.com/2/tweets", oauth_params + ) + # Should be a non-empty base64 string + assert sig + assert len(sig) > 10 + + def test_build_auth_header(self): + header = self.client._build_auth_header("POST", "https://api.x.com/2/tweets") + assert header.startswith("OAuth ") + assert "oauth_consumer_key" in header + assert "oauth_signature" in header + assert "oauth_token" in header + + @patch("aden_tools.tools.x_tool.x_tool.httpx.post") + def test_post_request(self, mock_post): + mock_post.return_value = _make_response(200, {"data": {"id": "123", "text": "hi"}}) + result = self.client.post("/tweets", json_body={"text": "hi"}) + + assert mock_post.called + assert result == {"data": {"id": "123", "text": "hi"}} + + @patch("aden_tools.tools.x_tool.x_tool.httpx.delete") + def test_delete_request(self, mock_delete): + mock_delete.return_value = _make_response(200, {"data": {"deleted": True}}) + result = self.client.delete("/tweets/123") + + assert mock_delete.called + assert result == {"data": {"deleted": True}} + + @pytest.mark.parametrize( + "status_code,expected_substring", + [ + (401, "oauth 1.0a authentication failed"), + (403, "insufficient permissions"), + (404, "not found"), + (429, "rate limit"), + ], + ) + def test_handle_response_errors(self, status_code, expected_substring): + response = _make_response(status_code) + result = self.client._handle_response(response) + assert "error" in result + assert expected_substring in result["error"].lower() + + +# --------------------------------------------------------------------------- +# TestCredentials +# --------------------------------------------------------------------------- + + +class TestCredentials: + """Test credential validation for all tools.""" + + @pytest.mark.parametrize( + "tool_name", + ["x_search_tweets", "x_get_mentions"], + ) + def test_bearer_tools_missing_creds(self, mcp_no_creds, tool_name): + fn = _get_tool_fn(mcp_no_creds, tool_name) + if tool_name == "x_search_tweets": + result = fn(query="test") + else: + result = fn(user_id="123") + assert "error" in result + assert "bearer" in result["error"].lower() + assert "help" in result + + @pytest.mark.parametrize( + "tool_name", + ["x_post_tweet", "x_reply_tweet", "x_delete_tweet", "x_send_dm"], + ) + def test_oauth_tools_missing_creds(self, mcp_bearer_only, tool_name): + """Write tools should fail when OAuth creds are missing (even with bearer).""" + fn = _get_tool_fn(mcp_bearer_only, tool_name) + if tool_name == "x_post_tweet": + result = fn(text="test") + elif tool_name == "x_reply_tweet": + result = fn(tweet_id="1", text="test") + elif tool_name == "x_delete_tweet": + result = fn(tweet_id="1") + elif tool_name == "x_send_dm": + result = fn(participant_id="1", text="test") + assert "error" in result + assert "oauth" in result["error"].lower() + assert "help" in result + + +# --------------------------------------------------------------------------- +# TestSearchTweets +# --------------------------------------------------------------------------- + + +class TestSearchTweets: + """Test x_search_tweets tool.""" + + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_search_success(self, mock_get, mcp_with_x): + mock_get.return_value = _make_response( + 200, + { + "data": [{"id": "1", "text": "AI is cool"}], + "meta": {"result_count": 1}, + }, + ) + + fn = _get_tool_fn(mcp_with_x, "x_search_tweets") + result = fn(query="AI agents", max_results=5) + + assert "data" in result + call_params = mock_get.call_args.kwargs["params"] + assert call_params["query"] == "AI agents" + assert call_params["max_results"] == 5 + + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_search_max_results_capped(self, mock_get, mcp_with_x): + mock_get.return_value = _make_response(200, {"data": []}) + + fn = _get_tool_fn(mcp_with_x, "x_search_tweets") + fn(query="test", max_results=999) + + call_params = mock_get.call_args.kwargs["params"] + assert call_params["max_results"] == 100 + + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_search_timeout(self, mock_get, mcp_with_x): + mock_get.side_effect = httpx.TimeoutException("timeout") + + fn = _get_tool_fn(mcp_with_x, "x_search_tweets") + result = fn(query="test") + + assert "error" in result + assert "timed out" in result["error"].lower() + + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_search_network_error(self, mock_get, mcp_with_x): + mock_get.side_effect = httpx.ConnectError("Connection refused") + + fn = _get_tool_fn(mcp_with_x, "x_search_tweets") + result = fn(query="test") + + assert "error" in result + assert "network error" in result["error"].lower() + + +# --------------------------------------------------------------------------- +# TestGetMentions +# --------------------------------------------------------------------------- + + +class TestGetMentions: + """Test x_get_mentions tool.""" + + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_mentions_success(self, mock_get, mcp_with_x): + mock_get.return_value = _make_response( + 200, + { + "data": [{"id": "1", "text": "@user hello"}], + }, + ) + + fn = _get_tool_fn(mcp_with_x, "x_get_mentions") + result = fn(user_id="12345", max_results=5) + + assert "data" in result + assert mock_get.call_args.kwargs["params"]["max_results"] == 5 + + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_mentions_min_clamped(self, mock_get, mcp_with_x): + mock_get.return_value = _make_response(200, {"data": []}) + + fn = _get_tool_fn(mcp_with_x, "x_get_mentions") + fn(user_id="12345", max_results=-5) + + assert mock_get.call_args.kwargs["params"]["max_results"] == 1 + + +# --------------------------------------------------------------------------- +# TestPostTweet +# --------------------------------------------------------------------------- + + +class TestPostTweet: + """Test x_post_tweet tool.""" + + @patch("aden_tools.tools.x_tool.x_tool.httpx.post") + def test_post_success(self, mock_post, mcp_with_x): + mock_post.return_value = _make_response( + 200, + { + "data": {"id": "111", "text": "Hello world"}, + }, + ) + + fn = _get_tool_fn(mcp_with_x, "x_post_tweet") + result = fn(text="Hello world") + + assert "data" in result + assert result["data"]["id"] == "111" + + def test_post_empty_text(self, mcp_with_x): + fn = _get_tool_fn(mcp_with_x, "x_post_tweet") + result = fn(text="") + assert "error" in result + assert "empty" in result["error"].lower() + + def test_post_too_long(self, mcp_with_x): + fn = _get_tool_fn(mcp_with_x, "x_post_tweet") + result = fn(text="a" * 281) + assert "error" in result + assert "280" in result["error"] + + @patch("aden_tools.tools.x_tool.x_tool.httpx.post") + def test_post_timeout(self, mock_post, mcp_with_x): + mock_post.side_effect = httpx.TimeoutException("timeout") + + fn = _get_tool_fn(mcp_with_x, "x_post_tweet") + result = fn(text="Hello") + + assert "error" in result + assert "timed out" in result["error"].lower() + + +# --------------------------------------------------------------------------- +# TestReplyTweet +# --------------------------------------------------------------------------- + + +class TestReplyTweet: + """Test x_reply_tweet tool.""" + + @patch("aden_tools.tools.x_tool.x_tool.httpx.post") + def test_reply_success(self, mock_post, mcp_with_x): + mock_post.return_value = _make_response( + 200, + { + "data": {"id": "222", "text": "Great point!"}, + }, + ) + + fn = _get_tool_fn(mcp_with_x, "x_reply_tweet") + result = fn(tweet_id="111", text="Great point!") + + assert "data" in result + + def test_reply_empty_text(self, mcp_with_x): + fn = _get_tool_fn(mcp_with_x, "x_reply_tweet") + result = fn(tweet_id="111", text="") + assert "error" in result + + def test_reply_too_long(self, mcp_with_x): + fn = _get_tool_fn(mcp_with_x, "x_reply_tweet") + result = fn(tweet_id="111", text="b" * 281) + assert "error" in result + assert "280" in result["error"] + + +# --------------------------------------------------------------------------- +# TestDeleteTweet +# --------------------------------------------------------------------------- + + +class TestDeleteTweet: + """Test x_delete_tweet tool.""" + + @patch("aden_tools.tools.x_tool.x_tool.httpx.delete") + def test_delete_success(self, mock_delete, mcp_with_x): + mock_delete.return_value = _make_response( + 200, + { + "data": {"deleted": True}, + }, + ) + + fn = _get_tool_fn(mcp_with_x, "x_delete_tweet") + result = fn(tweet_id="111") + + assert result["data"]["deleted"] is True + + @patch("aden_tools.tools.x_tool.x_tool.httpx.delete") + def test_delete_not_found(self, mock_delete, mcp_with_x): + mock_delete.return_value = _make_response(404) + + fn = _get_tool_fn(mcp_with_x, "x_delete_tweet") + result = fn(tweet_id="999") + + assert "error" in result + assert "not found" in result["error"].lower() + + +# --------------------------------------------------------------------------- +# TestSendDM +# --------------------------------------------------------------------------- + + +class TestSendDM: + """Test x_send_dm tool.""" + + @patch("aden_tools.tools.x_tool.x_tool.httpx.post") + def test_dm_success(self, mock_post, mcp_with_x): + mock_post.return_value = _make_response( + 200, + { + "data": {"dm_event_id": "999", "text": "Hey there!"}, + }, + ) + + fn = _get_tool_fn(mcp_with_x, "x_send_dm") + result = fn(participant_id="12345", text="Hey there!") + + assert "data" in result + assert result["data"]["dm_event_id"] == "999" + + # Verify correct v2 1:1 endpoint usage + call_args = mock_post.call_args + assert call_args[0][0] == f"{X_API_BASE}/dm_conversations/with/12345/messages" + assert call_args[1]["json"] == {"text": "Hey there!"} + + def test_dm_empty_text(self, mcp_with_x): + fn = _get_tool_fn(mcp_with_x, "x_send_dm") + result = fn(participant_id="12345", text="") + assert "error" in result + assert "empty" in result["error"].lower() + + @patch("aden_tools.tools.x_tool.x_tool.httpx.post") + def test_dm_network_error(self, mock_post, mcp_with_x): + mock_post.side_effect = httpx.ConnectError("Connection refused") + + fn = _get_tool_fn(mcp_with_x, "x_send_dm") + result = fn(participant_id="12345", text="Hello") + + assert "error" in result + assert "network error" in result["error"].lower() + + def test_dm_missing_oauth_creds(self, mcp_bearer_only): + fn = _get_tool_fn(mcp_bearer_only, "x_send_dm") + result = fn(participant_id="12345", text="Hello") + assert "error" in result + assert "oauth" in result["error"].lower() + + +# --------------------------------------------------------------------------- +# TestAPIErrorHandling +# --------------------------------------------------------------------------- + + +class TestAPIErrorHandling: + """Test HTTP error code handling across tools.""" + + @pytest.mark.parametrize( + "status_code,expected_substring", + [ + (401, "invalid or expired"), + (403, "insufficient permissions"), + (404, "not found"), + (429, "rate limit"), + ], + ) + @patch("aden_tools.tools.x_tool.x_tool.httpx.get") + def test_bearer_error_codes(self, mock_get, status_code, expected_substring, mcp_with_x): + mock_get.return_value = _make_response(status_code) + + fn = _get_tool_fn(mcp_with_x, "x_search_tweets") + result = fn(query="test") + + assert "error" in result + assert expected_substring in result["error"].lower() + + @pytest.mark.parametrize( + "status_code,expected_substring", + [ + (401, "oauth 1.0a authentication failed"), + (403, "insufficient permissions"), + (404, "not found"), + (429, "rate limit"), + ], + ) + @patch("aden_tools.tools.x_tool.x_tool.httpx.post") + def test_oauth_error_codes(self, mock_post, status_code, expected_substring, mcp_with_x): + mock_post.return_value = _make_response(status_code) + + fn = _get_tool_fn(mcp_with_x, "x_post_tweet") + result = fn(text="test") + + assert "error" in result + assert expected_substring in result["error"].lower() + + +# --------------------------------------------------------------------------- +# TestToolRegistration +# --------------------------------------------------------------------------- + + +class TestToolRegistration: + """Test that all tools are properly registered.""" + + def test_all_tools_registered(self, mcp_with_x): + tools = mcp_with_x._tool_manager._tools + expected = [ + "x_search_tweets", + "x_get_mentions", + "x_post_tweet", + "x_reply_tweet", + "x_delete_tweet", + "x_send_dm", + ] + for name in expected: + assert name in tools, f"Tool '{name}' not registered" + + def test_tool_count(self, mcp_with_x): + tools = mcp_with_x._tool_manager._tools + x_tools = [name for name in tools if name.startswith("x_")] + assert len(x_tools) == 6 + + +# --------------------------------------------------------------------------- +# TestCredentialSpecs +# --------------------------------------------------------------------------- + + +class TestCredentialSpecs: + """Test credential spec definitions.""" + + def test_x_credential_specs_exist(self): + from aden_tools.credentials.x import X_CREDENTIALS + + expected_keys = [ + "x_bearer_token", + "x_api_key", + "x_api_secret", + "x_access_token", + "x_access_token_secret", + ] + for key in expected_keys: + assert key in X_CREDENTIALS, f"Missing credential spec: {key}" + + def test_bearer_token_spec(self): + from aden_tools.credentials.x import X_CREDENTIALS + + spec = X_CREDENTIALS["x_bearer_token"] + assert spec.env_var == "X_BEARER_TOKEN" + assert "x_search_tweets" in spec.tools + assert "x_post_tweet" in spec.tools + assert "x_send_dm" in spec.tools + assert spec.credential_group == "x" + + def test_oauth_specs_are_optional(self): + from aden_tools.credentials.x import X_CREDENTIALS + + for key in ["x_api_key", "x_api_secret", "x_access_token", "x_access_token_secret"]: + assert X_CREDENTIALS[key].required is False + + def test_specs_in_merged_registry(self): + from aden_tools.credentials import CREDENTIAL_SPECS + + assert "x_bearer_token" in CREDENTIAL_SPECS + assert "x_api_key" in CREDENTIAL_SPECS