Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions examples/mcp_server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# LlamaDeploy MCP Server Example

This example demonstrates how to deploy a LlamaDeploy workflow as an MCP (Model Context Protocol) remote server that
can be integrated with Claude Desktop or other MCP-compatible clients.

## Overview

The example includes a **StatusWorkflow** that checks the operational status of popular web services including GitHub,
Reddit, Cloudflare, Vercel, OpenAI, Linear, and Discord. The workflow fetches status information from each service's
status page API and returns the current operational status.

## Files

- `src/workflow.py` - Contains the StatusWorkflow implementation
- `deployment.yaml` - LlamaDeploy configuration file
- `README.md` - This documentation

## Quick Start

### 1. Launch the LlamaDeploy Server

```bash
llamactl serve deployment.yaml
```

This will start the server and make the workflow available as an MCP tool at `http://localhost:4501/mcp`.

### 2. Configure Claude Desktop

Add the following configuration to your Claude Desktop settings to connect to the MCP server:

```json
{
"mcpServers": {
"LlamaDeploy": {
"command": "npx",
"args": ["mcp-remote", "http://localhost:4501/mcp"]
}
}
}
```

### 3. Use in Claude Desktop

Once configured, you can ask Claude to check service statuses:
- "What's the status of GitHub?"
- "Check if OpenAI services are operational"
- "Is Discord working?"

## How It Works

1. The `StatusWorkflow` class defines a workflow with a single step that checks service status
2. The workflow accepts a service name and queries the corresponding status page API
3. LlamaDeploy serves this workflow as an MCP tool that can be called by Claude Desktop
4. The MCP integration allows Claude to invoke the workflow and return results in natural conversation

## Requirements

- LlamaDeploy installed and configured
- Node.js (for the `mcp-remote` package)
- Internet connection to access status page APIs
12 changes: 12 additions & 0 deletions examples/mcp_server/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: StatuspageTool

services:
status_workflow:
name: Statuspage
source:
type: local
name: src
python-dependencies:
- requests

path: src/workflow:statuspage_workflow
53 changes: 53 additions & 0 deletions examples/mcp_server/src/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio

import requests
from workflows import Workflow, step
from workflows.events import StartEvent, StopEvent

SERVICES = {
"github": "https://kctbh9vrtdwd.statuspage.io/api/v2/status.json",
"reddit": "https://reddit.statuspage.io/api/v2/status.json",
"cloudflare": "https://yh6f0r4529hb.statuspage.io/api/v2/status.json",
"vercel": "https://www.vercel-status.com/api/v2/status.json",
"openai": "https://status.openai.com/api/v2/status.json",
"linear": "https://linearstatus.com/api/v2/status.json",
"discord": "https://discordstatus.com/api/v2/status.json",
}


class InputService(StartEvent):
service_name: str


class StatusWorkflow(Workflow):
"""Reports the operational status for notable public services.

This workflow can report status for major services like OpenAI, Github and Discord
"""

@step
async def check_status(self, ev: InputService) -> StopEvent:
try:
res = requests.get(SERVICES[ev.service_name.lower()], timeout=5)
res.raise_for_status()
data = res.json()
status = data["status"]["description"]
return StopEvent(status)
except Exception as e:
return StopEvent(f"{ev.service_name}: ERROR - {e}")


statuspage_workflow = StatusWorkflow()


async def main():
print("🔍 Checking service statuses...\n")
for name in SERVICES:
print(
name,
await statuspage_workflow.run(start_event=InputService(service_name=name)),
)


if __name__ == "__main__":
asyncio.run(main())
56 changes: 52 additions & 4 deletions llama_deploy/apiserver/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
import subprocess
import sys
import tempfile
import warnings
from asyncio.subprocess import Process
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import Any, Tuple, Type

from dotenv import dotenv_values
from fastmcp import Context as MCPContext
from fastmcp import FastMCP
from fastmcp.server.http import StarletteWithLifespan
from pydantic import BaseModel
from workflows import Context, Workflow
from workflows.events import Event, StartEvent, StopEvent
from workflows.handler import WorkflowHandler

from llama_deploy.apiserver.source_managers.base import SyncPolicy
Expand Down Expand Up @@ -45,6 +51,7 @@ def __init__(
config: DeploymentConfig,
base_path: Path,
deployment_path: Path,
mcp: FastMCP,
local: bool = False,
) -> None:
"""Creates a Deployment instance.
Expand All @@ -57,6 +64,7 @@ def __init__(
self._local = local
self._name = config.name
self._base_path = base_path
self._mcp = mcp
# If not local, isolate the deployment in a folder with the same name to avoid conflicts
self._deployment_path = (
deployment_path if local else deployment_path / config.name
Expand Down Expand Up @@ -234,7 +242,9 @@ def _load_services(self, config: DeploymentConfig) -> dict[str, Workflow]:
sys.path.append(str(pythonpath))

module = importlib.import_module(module_name)
workflow_services[service_id] = getattr(module, workflow_name)
workflow = getattr(module, workflow_name)
workflow_services[service_id] = workflow
self._make_mcp_service(workflow, service_id)

service_state.labels(self._name, service_id).state("ready")

Expand Down Expand Up @@ -367,6 +377,39 @@ def _install_dependencies(service_config: Service, source_root: Path) -> None:
msg = f"Unable to install service dependencies using command '{e.cmd}': {e.stderr}"
raise DeploymentError(msg) from None

def _make_mcp_service(self, workflow: Workflow, service_name: str) -> None:
# Dynamically get the start event class -- this is a bit of a hack
StartEventT = workflow._start_event_class
if StartEventT is StartEvent:
msg = (
f"Cannot expose service {service_name} as an MCP tool: "
"workflow must declare a custom StartEvent class."
)
warnings.warn(msg)
return

@self._mcp.tool(
name=service_name, description=workflow.__doc__
) # pragma: no cover
async def _workflow_tool(run_args: StartEventT, context: MCPContext) -> Any: # type:ignore
# Handle edge cases where the start event is an Event or a BaseModel
# If the workflow does not have a custom StartEvent class, then we need to handle the event differently
if isinstance(run_args, Event) and StartEventT != StartEvent:
handler = workflow.run(start_event=run_args)
elif isinstance(run_args, BaseModel):
handler = workflow.run(**run_args.model_dump()) # type: ignore
elif isinstance(run_args, dict):
start_event = StartEventT.model_validate(run_args)
handler = workflow.run(start_event=start_event)
else:
raise ValueError(f"Invalid start event type: {type(run_args)}")

async for event in handler.stream_events():
if not isinstance(event, StopEvent):
await context.log(level="info", message=event.model_dump_json())

return await handler


class Manager:
"""The Manager orchestrates deployments and their runtime.
Expand Down Expand Up @@ -395,6 +438,8 @@ def __init__(self, max_deployments: int = 10) -> None:
self._last_control_plane_port = 8002
self._simple_message_queue_server: asyncio.Task | None = None
self._serving = False
self._mcp = FastMCP("LlamaDeploy")
self._mcp_app = self._mcp.http_app(path="/")

@property
def deployment_names(self) -> list[str]:
Expand All @@ -407,6 +452,10 @@ def deployments_path(self) -> Path:
raise ValueError("Deployments path not set")
return self._deployments_path

@property
def mcp_app(self) -> StarletteWithLifespan:
return self._mcp_app

def set_deployments_path(self, path: Path | None) -> None:
self._deployments_path = (
path or Path(tempfile.gettempdir()) / "llama_deploy" / "deployments"
Expand All @@ -427,9 +476,7 @@ async def serve(self) -> None:
# Waits indefinitely since `event` will never be set
await event.wait()
except asyncio.CancelledError:
if self._simple_message_queue_server is not None:
self._simple_message_queue_server.cancel()
await self._simple_message_queue_server
return

async def deploy(
self,
Expand Down Expand Up @@ -468,6 +515,7 @@ async def deploy(
base_path=Path(base_path),
deployment_path=self.deployments_path,
local=local,
mcp=self._mcp,
)
self._deployments[config.name] = deployment
await deployment.start()
Expand Down
5 changes: 4 additions & 1 deletion llama_deploy/apiserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
apiserver_state.state("starting")

manager.set_deployments_path(settings.deployments_path)
app.mount("/mcp", manager.mcp_app)

t = asyncio.create_task(manager.serve())
await asyncio.sleep(0)

Expand All @@ -39,7 +41,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
logger.error(f"Failed to deploy {yaml_file}: {str(e)}")

apiserver_state.state("running")
yield
async with manager.mcp_app.lifespan(app):
yield

t.cancel()

Expand Down
12 changes: 1 addition & 11 deletions tests/apiserver/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Any, Iterator
from typing import Iterator
from unittest import mock

import pytest
Expand All @@ -8,8 +8,6 @@
from workflows.events import StartEvent, StopEvent

from llama_deploy.apiserver.app import app
from llama_deploy.apiserver.deployment import Deployment
from llama_deploy.apiserver.deployment_config_parser import DeploymentConfig


class SmallWorkflow(Workflow):
Expand All @@ -33,14 +31,6 @@ def data_path() -> Path:
return data_p.resolve()


@pytest.fixture
def mocked_deployment(data_path: Path, mock_importlib: Any) -> Iterator[Deployment]:
config = DeploymentConfig.from_yaml(data_path / "git_service.yaml")
with mock.patch("llama_deploy.apiserver.deployment.SOURCE_MANAGERS") as sm_dict:
sm_dict["git"] = mock.MagicMock()
yield Deployment(config=config, base_path=data_path, deployment_path=Path("."))


@pytest.fixture
def http_client() -> TestClient:
return TestClient(app)
Loading
Loading