From 6547acdf4bfd82e15df943b6db4235074cbd8f36 Mon Sep 17 00:00:00 2001 From: sarmadnawaz Date: Tue, 23 Dec 2025 19:49:30 +0500 Subject: [PATCH] feat: add UCLTool integration Add UCLTool wrapper that enables CrewAI agents to access tools and integrations through the ucl.dev platform. --- .../src/crewai_tools/tools/__init__.py | 2 + .../src/crewai_tools/tools/ucl_tool/README.md | 203 ++++++++++++++ .../crewai_tools/tools/ucl_tool/__init__.py | 3 + .../crewai_tools/tools/ucl_tool/ucl_tool.py | 256 ++++++++++++++++++ lib/crewai-tools/tests/tools/ucl_tool_test.py | 222 +++++++++++++++ 5 files changed, 686 insertions(+) create mode 100644 lib/crewai-tools/src/crewai_tools/tools/ucl_tool/README.md create mode 100644 lib/crewai-tools/src/crewai_tools/tools/ucl_tool/__init__.py create mode 100644 lib/crewai-tools/src/crewai_tools/tools/ucl_tool/ucl_tool.py create mode 100644 lib/crewai-tools/tests/tools/ucl_tool_test.py diff --git a/lib/crewai-tools/src/crewai_tools/tools/__init__.py b/lib/crewai-tools/src/crewai_tools/tools/__init__.py index 51d32ddc25..adaa98e85b 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/__init__.py +++ b/lib/crewai-tools/src/crewai_tools/tools/__init__.py @@ -168,6 +168,7 @@ ) from crewai_tools.tools.tavily_search_tool.tavily_search_tool import TavilySearchTool from crewai_tools.tools.txt_search_tool.txt_search_tool import TXTSearchTool +from crewai_tools.tools.ucl_tool.ucl_tool import UCLTool from crewai_tools.tools.vision_tool.vision_tool import VisionTool from crewai_tools.tools.weaviate_tool.vector_search import WeaviateVectorSearchTool from crewai_tools.tools.website_search.website_search_tool import WebsiteSearchTool @@ -264,6 +265,7 @@ "TXTSearchTool", "TavilyExtractorTool", "TavilySearchTool", + "UCLTool", "VisionTool", "WeaviateVectorSearchTool", "WebsiteSearchTool", diff --git a/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/README.md b/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/README.md new file mode 100644 index 0000000000..5c0877a11d --- /dev/null +++ b/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/README.md @@ -0,0 +1,203 @@ +# UCLTool Documentation + +## Description + +UCLTool is a wrapper around the UCL (Unified Context Layer) API that gives your CrewAI agents access to a wide variety of tools and integrations through the Fastn.ai platform. UCL provides a unified interface to connect with multiple services and APIs. + +## Installation + +To incorporate this tool into your project, follow the installation instructions below: + +```shell +pip install requests +pip install 'crewai[tools]' +``` + +## Configuration + +Before using UCLTool, you need to obtain the following credentials from your ucl.dev workspace: + +- **UCL_WORKSPACE_ID**: Your UCL workspace identifier +- **UCL_API_KEY**: Your workspace API key +- **UCL_MCP_GATEWAY_ID**: Your workspace MCP Gateway identifier + +You can either set these as environment variables or pass them directly to the configuration. + +### Environment Variables + +```shell +export UCL_WORKSPACE_ID="your-workspace-id" +export UCL_API_KEY="your-api-key" +export UCL_MCP_GATEWAY_ID="your-mcp-gateway-id" +``` + +## Example + +The following example demonstrates how to initialize the tool and use UCL actions with CrewAI: + +### 1. Create UCL Configuration + +```python +from crewai_tools import UCLTool + +# Create configuration +config = UCLTool.from_config( + workspace_id="your-workspace-id", + api_key="your-api-key", + mcp_gateway_id="your-mcp-gateway-id", +) +``` + +### 2. Get Available Tools + +You can fetch all available tools or filter them by a prompt: + +```python +# Get all available tools (up to 500) +tools = UCLTool.get_tools(config) + +# Get tools related to specific keywords +tools = UCLTool.get_tools(config, prompt="slack messaging", limit=10) +``` + +### 3. Create a Specific Tool + +If you know the specific action you want to use: + +```python +tool = UCLTool.from_action( + config=config, + action_id="your-action-id", + tool_name="send_slack_message", + description="Send a message to a Slack channel", + input_schema={ + "type": "object", + "properties": { + "channel": {"type": "string", "description": "Slack channel name"}, + "message": {"type": "string", "description": "Message content"}, + }, + "required": ["channel", "message"], + }, +) +``` + +### 4. Define Agent with UCL Tools + +```python +from crewai import Agent, Task + +# Get tools for a specific use case +tools = UCLTool.get_tools(config, prompt="email and calendar", limit=20) + +# Create an agent with UCL tools +agent = Agent( + role="Communication Assistant", + goal="Help users manage their communications and scheduling", + backstory=( + "You are an AI assistant that helps users send emails, " + "manage calendar events, and handle communications across " + "various platforms using UCL integrations." + ), + verbose=True, + tools=tools, +) +``` + +### 5. Execute Task + +```python +task = Task( + description="Send a welcome email to new-user@example.com with subject 'Welcome to our platform'", + agent=agent, + expected_output="Confirmation that the email was sent successfully", +) + +result = task.execute() +print(result) +``` + +## Complete Example + +```python +from crewai import Agent, Crew, Task +from crewai_tools import UCLTool + +# Step 1: Configure UCL +config = UCLTool.from_config( + workspace_id="your-workspace-id", + api_key="your-api-key", + mcp_gateway_id="your-mcp-gateway-id", +) + +# Step 2: Get relevant tools +tools = UCLTool.get_tools(config, prompt="slack notifications", limit=10) + +# Step 3: Create agent +notification_agent = Agent( + role="Notification Manager", + goal="Send notifications to team members via Slack", + backstory="You are responsible for keeping the team informed about important updates.", + verbose=True, + tools=tools, +) + +# Step 4: Define task +task = Task( + description="Send a notification to the #general channel about the deployment completion", + agent=notification_agent, + expected_output="Confirmation of notification sent", +) + +# Step 5: Create and run crew +crew = Crew( + agents=[notification_agent], + tasks=[task], + verbose=True, +) + +result = crew.kickoff() +print(result) +``` + +## API Reference + +### UCLTool.from_config() + +Creates a UCL configuration object. + +**Parameters:** +- `workspace_id` (str): UCL Workspace ID +- `api_key` (str): Workspace API Key +- `mcp_gateway_id` (str): Workspace MCP Gateway ID +- `base_url` (str, optional): Base URL for UCL API. Default: `https://live.fastn.ai` +- `stage` (str, optional): API stage. Default: `LIVE` + +**Returns:** `UCLToolConfig` + +### UCLTool.get_tools() + +Fetches and creates tools from the UCL API. + +**Parameters:** +- `config` (UCLToolConfig): UCL configuration object +- `prompt` (str, optional): Filter tools by keywords +- `limit` (int, optional): Maximum number of tools to fetch. Default: 500 + +**Returns:** `list[UCLTool]` + +### UCLTool.from_action() + +Creates a UCLTool from a specific action. + +**Parameters:** +- `config` (UCLToolConfig): UCL configuration object +- `action_id` (str): UCL Action ID +- `tool_name` (str): Name of the tool +- `description` (str): Tool description +- `input_schema` (dict): JSON schema for tool input parameters + +**Returns:** `UCLTool` + +## More Information + +For more detailed information about UCL and available integrations, visit [ucl.dev](https://docs.ucl.dev/getting-started/about-unified-context-layer). diff --git a/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/__init__.py b/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/__init__.py new file mode 100644 index 0000000000..241866ad08 --- /dev/null +++ b/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/__init__.py @@ -0,0 +1,3 @@ +from crewai_tools.tools.ucl_tool.ucl_tool import UCLTool + +__all__ = ["UCLTool"] diff --git a/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/ucl_tool.py b/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/ucl_tool.py new file mode 100644 index 0000000000..35c9876f46 --- /dev/null +++ b/lib/crewai-tools/src/crewai_tools/tools/ucl_tool/ucl_tool.py @@ -0,0 +1,256 @@ +"""UCL (Unified Context Layer) tools wrapper for crewAI.""" + +import typing as t + +import requests +from crewai.tools import BaseTool, EnvVar +from pydantic import BaseModel, Field, create_model +import typing_extensions as te + + +class UCLToolConfig(BaseModel): + """Configuration for UCL Tool.""" + + workspace_id: str = Field(..., description="UCL Workspace ID") + api_key: str = Field(..., description="Workspace API Key") + mcp_gateway_id: str = Field(..., description="Workspace MCP Gateway ID") + base_url: str = Field( + default="https://live.fastn.ai", + description="Base URL for UCL API", + ) + stage: str = Field(default="LIVE", description="API stage") + + +class UCLTool(BaseTool): + """Wrapper for UCL (Unified Context Layer) tools.""" + + ucl_action: t.Callable = Field(default=None, exclude=True) + action_id: str = Field(default="", description="UCL Action ID") + tool_name: str = Field(default="", description="UCL Tool Name") + config: UCLToolConfig = Field(default=None, exclude=True) + env_vars: list[EnvVar] = Field( + default_factory=lambda: [ + EnvVar( + name="UCL_WORKSPACE_ID", + description="UCL Workspace ID", + required=True, + ), + EnvVar( + name="UCL_API_KEY", + description="UCL Workspace API Key", + required=True, + ), + EnvVar( + name="UCL_MCP_GATEWAY_ID", + description="UCL MCP Gateway ID", + required=True, + ), + ] + ) + + def _run(self, *args: t.Any, **kwargs: t.Any) -> t.Any: + """Run the UCL action with given arguments.""" + if self.ucl_action is not None: + return self.ucl_action(**kwargs) + return {"error": "UCL action not configured"} + + @staticmethod + def _get_headers(config: UCLToolConfig) -> dict[str, str]: + """Get headers for UCL API requests.""" + return { + "stage": config.stage, + "x-fastn-space-id": config.workspace_id, + "x-fastn-api-key": config.api_key, + "x-fastn-space-agent-id": config.mcp_gateway_id, + "Content-Type": "application/json", + } + + @staticmethod + def _fetch_tools( + config: UCLToolConfig, + prompt: str | None = None, + limit: int = 10, + ) -> list[dict[str, t.Any]]: + """Fetch available tools from UCL API.""" + url = f"{config.base_url}/api/ucl/getTools" + headers = UCLTool._get_headers(config) + + payload: dict[str, t.Any] = {"input": {"limit": limit}} + if prompt: + payload["input"]["prompt"] = prompt + + response = requests.post(url, headers=headers, json=payload, timeout=30) + response.raise_for_status() + + return response.json() + + @staticmethod + def _execute_tool( + config: UCLToolConfig, + action_id: str, + tool_name: str, + parameters: dict[str, t.Any], + ) -> dict[str, t.Any]: + """Execute a UCL tool.""" + url = f"{config.base_url}/api/ucl/executeTool" + headers = UCLTool._get_headers(config) + + payload = { + "input": { + "actionId": action_id, + "parameters": parameters, + "toolName": tool_name, + } + } + + response = requests.post(url, headers=headers, json=payload, timeout=60) + response.raise_for_status() + + result = response.json() + + if "body" in result: + return result["body"] + elif "rawBody" in result: + return {"raw": result["rawBody"]} + return result + + @staticmethod + def _json_schema_to_pydantic_model( + schema: dict[str, t.Any], + model_name: str = "DynamicModel", + ) -> type[BaseModel]: + """Convert JSON schema to Pydantic model.""" + properties = schema.get("properties", {}) + required = set(schema.get("required", [])) + + field_definitions: dict[str, t.Any] = {} + + type_mapping = { + "string": str, + "integer": int, + "number": float, + "boolean": bool, + "array": list, + "object": dict, + } + + for field_name, field_schema in properties.items(): + field_type = type_mapping.get(field_schema.get("type", "string"), str) + field_description = field_schema.get("description", "") + + if field_name in required: + field_definitions[field_name] = ( + field_type, + Field(..., description=field_description), + ) + else: + field_definitions[field_name] = ( + t.Optional[field_type], + Field(default=None, description=field_description), + ) + + return create_model(model_name, **field_definitions) + + @classmethod + def from_config( + cls, + workspace_id: str, + api_key: str, + mcp_gateway_id: str, + base_url: str = "https://live.fastn.ai", + stage: str = "LIVE", + ) -> UCLToolConfig: + """Create a UCL configuration object.""" + return UCLToolConfig( + workspace_id=workspace_id, + api_key=api_key, + mcp_gateway_id=mcp_gateway_id, + base_url=base_url, + stage=stage, + ) + + @classmethod + def from_action( + cls, + config: UCLToolConfig, + action_id: str, + tool_name: str, + description: str, + input_schema: dict[str, t.Any], + **kwargs: t.Any, + ) -> te.Self: + """Create a UCLTool from a specific action.""" + + def execute_action(**params: t.Any) -> dict[str, t.Any]: + """Execute the UCL action.""" + return cls._execute_tool( + config=config, + action_id=action_id, + tool_name=tool_name, + parameters=params, + ) + + execute_action.__name__ = tool_name + execute_action.__doc__ = description + + args_schema = cls._json_schema_to_pydantic_model( + input_schema, + model_name=f"{tool_name.replace('-', '_').replace(' ', '_')}Schema", + ) + + return cls( + name=tool_name, + description=description, + args_schema=args_schema, + ucl_action=execute_action, + action_id=action_id, + tool_name=tool_name, + config=config, + **kwargs, + ) + + @classmethod + def get_tools( + cls, + config: UCLToolConfig, + prompt: str | None = None, + limit: int = 10, + **kwargs: t.Any, + ) -> list[te.Self]: + """ + Fetch and create tools from UCL API. + + Args: + config: UCL configuration object + prompt: Optional prompt to filter tools by keywords + limit: Maximum number of tools to fetch (default: 10) + **kwargs: Additional arguments to pass to tool creation + + Returns: + List of UCLTool instances + """ + tools_data = cls._fetch_tools(config, prompt=prompt, limit=limit) + tools: list[te.Self] = [] + + for tool_data in tools_data: + action_id = tool_data.get("actionId", "") + function_data = tool_data.get("function", {}) + + tool_name = function_data.get("name", "") + description = function_data.get("description", "No description available") + input_schema = function_data.get("inputSchema", {"properties": {}}) + + if not tool_name or not action_id: + continue + + tool = cls.from_action( + config=config, + action_id=action_id, + tool_name=tool_name, + description=description, + input_schema=input_schema, + **kwargs, + ) + tools.append(tool) + + return tools diff --git a/lib/crewai-tools/tests/tools/ucl_tool_test.py b/lib/crewai-tools/tests/tools/ucl_tool_test.py new file mode 100644 index 0000000000..6da0bcb132 --- /dev/null +++ b/lib/crewai-tools/tests/tools/ucl_tool_test.py @@ -0,0 +1,222 @@ +from unittest.mock import patch, MagicMock + +import pytest + +from crewai_tools.tools.ucl_tool.ucl_tool import UCLTool, UCLToolConfig + + +@pytest.fixture +def ucl_config(): + return UCLToolConfig( + workspace_id="test-workspace-id", + api_key="test-api-key", + mcp_gateway_id="test-mcp-gateway-id", + ) + + +def test_ucl_config_creation(): + config = UCLTool.from_config( + workspace_id="test-workspace", + api_key="test-key", + mcp_gateway_id="test-gateway", + ) + assert config.workspace_id == "test-workspace" + assert config.api_key == "test-key" + assert config.mcp_gateway_id == "test-gateway" + assert config.base_url == "https://live.fastn.ai" + assert config.stage == "LIVE" + + +def test_ucl_config_custom_base_url(): + config = UCLTool.from_config( + workspace_id="test-workspace", + api_key="test-key", + mcp_gateway_id="test-gateway", + base_url="https://custom.api.com", + stage="DEV", + ) + assert config.base_url == "https://custom.api.com" + assert config.stage == "DEV" + + +def test_get_headers(ucl_config): + headers = UCLTool._get_headers(ucl_config) + assert headers["stage"] == "LIVE" + assert headers["x-fastn-space-id"] == "test-workspace-id" + assert headers["x-fastn-api-key"] == "test-api-key" + assert headers["x-fastn-space-agent-id"] == "test-mcp-gateway-id" + assert headers["Content-Type"] == "application/json" + + +@patch("requests.post") +def test_fetch_tools(mock_post, ucl_config): + mock_response = MagicMock() + mock_response.json.return_value = [ + { + "actionId": "action-123", + "type": "function", + "function": { + "name": "send_email", + "description": "Send an email", + "inputSchema": { + "type": "object", + "properties": { + "to": {"type": "string", "description": "Recipient email"}, + "subject": {"type": "string", "description": "Email subject"}, + }, + "required": ["to", "subject"], + }, + }, + } + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + tools_data = UCLTool._fetch_tools(ucl_config, prompt="email", limit=10) + + assert len(tools_data) == 1 + assert tools_data[0]["actionId"] == "action-123" + assert tools_data[0]["function"]["name"] == "send_email" + + mock_post.assert_called_once() + call_args = mock_post.call_args + assert call_args[1]["json"] == {"input": {"limit": 10, "prompt": "email"}} + + +@patch("requests.post") +def test_execute_tool(mock_post, ucl_config): + mock_response = MagicMock() + mock_response.json.return_value = { + "body": {"success": True, "message": "Email sent"}, + "statusCode": 200, + } + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + result = UCLTool._execute_tool( + config=ucl_config, + action_id="action-123", + tool_name="send_email", + parameters={"to": "test@example.com", "subject": "Test"}, + ) + + assert result == {"success": True, "message": "Email sent"} + mock_post.assert_called_once() + + +@patch("requests.post") +def test_get_tools(mock_post, ucl_config): + mock_response = MagicMock() + mock_response.json.return_value = [ + { + "actionId": "action-123", + "type": "function", + "function": { + "name": "send_slack_message", + "description": "Send a message to Slack", + "inputSchema": { + "type": "object", + "properties": { + "channel": {"type": "string", "description": "Channel name"}, + "message": {"type": "string", "description": "Message content"}, + }, + "required": ["channel", "message"], + }, + }, + }, + { + "actionId": "action-456", + "type": "function", + "function": { + "name": "get_slack_channels", + "description": "List Slack channels", + "inputSchema": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + }, + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + tools = UCLTool.get_tools(ucl_config, prompt="slack", limit=10) + + assert len(tools) == 2 + assert tools[0].name == "send_slack_message" + assert tools[0].description == "Send a message to Slack" + assert tools[1].name == "get_slack_channels" + + +def test_from_action(ucl_config): + tool = UCLTool.from_action( + config=ucl_config, + action_id="test-action", + tool_name="test_tool", + description="A test tool", + input_schema={ + "type": "object", + "properties": { + "param1": {"type": "string", "description": "First parameter"}, + }, + "required": ["param1"], + }, + ) + + assert tool.name == "test_tool" + assert tool.description == "A test tool" + assert tool.action_id == "test-action" + + +def test_json_schema_to_pydantic_model(): + schema = { + "type": "object", + "properties": { + "name": {"type": "string", "description": "User name"}, + "age": {"type": "integer", "description": "User age"}, + "active": {"type": "boolean", "description": "Is active"}, + }, + "required": ["name"], + } + + model = UCLTool._json_schema_to_pydantic_model(schema, "TestModel") + + assert model.__name__ == "TestModel" + fields = model.model_fields + assert "name" in fields + assert "age" in fields + assert "active" in fields + + +@patch("requests.post") +def test_tool_run(mock_post, ucl_config): + # Mock the execute response + mock_response = MagicMock() + mock_response.json.return_value = { + "body": {"result": "success"}, + "statusCode": 200, + } + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + tool = UCLTool.from_action( + config=ucl_config, + action_id="test-action", + tool_name="test_tool", + description="A test tool", + input_schema={ + "type": "object", + "properties": { + "param1": {"type": "string", "description": "First parameter"}, + }, + "required": ["param1"], + }, + ) + + result = tool._run(param1="test_value") + assert result == {"result": "success"} + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])