Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,49 @@ export SSL_CERT_FILE=$(pwd)/certs/cert.pem
```
And you'll see the server get hit.

### Starting a Hub

The mcp-server can register worker hubs, which are other MCP servers that register to it. To start the mcpserver as a hub:

```bash
# Start a hub in one terminal
mcpserver start --hub --hub-secret potato
```

In another terminal, start a worker using the token that is generated. Add some functions for fun.

```bash
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --join-secret potato --port 7777
```

Note that you can also set the secret in the environemnt.

```bash
export MCPSERVER_JOIN_SECRET=potato
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777
```

Register the worker sytem type instead as flux:

```bash
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --system-type flux
```

Test doing queries for status:

```bash
# Get listing of workers and metadata
python3 ./examples/mcp-query.py

# Get a specific tool metadata from the worker
python3 ./examples/mcp-query.py http://localhost:7777/mcp get_status

# Call a namespaced tool on the hub (e.g., get the status)
python3 ./examples/mcp-query.py http://localhost:8000/mcp n_781e903e4f10_get_status
```

You can test it without the join secret, or a wrong join secret, to see it fail.

### Design Choices

Here are a few design choices (subject to change, of course). I am starting with re-implementing our fractale agents with this framework. For that, instead of agents being tied to specific functions (as classes on their agent functions) we will have a flexible agent class that changes function based on a chosen prompt. It will use mcp functions, prompts, and resources. In addition:
Expand All @@ -263,7 +306,9 @@ Here are a few design choices (subject to change, of course). I am starting with

## TODO

- Full operator with Flux example (Flux operator with HPC apps and jobspec translation)
- [ ] need to expose tools from system (child worker) instances
- [ ] need to decide on dispatch strategy / algorithm
- [ ] add in fluxion queue stats via RPC call to flux status

## License

Expand Down
104 changes: 104 additions & 0 deletions examples/mcp-query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import asyncio
import sys
import json
import argparse
from fastmcp import Client
from rich.console import Console
from rich.tree import Tree
from rich.json import JSON
from rich.panel import Panel
from rich.table import Table

console = Console()

DEFAULT_URL = "http://localhost:8000/mcp"
DEFAULT_TOOL = "get_fleet_status"

def render_fleet_tree(data: dict):
"""
Renders the nested Hub/Worker hierarchy as a Tree.
"""
tree = Tree("🌐 [bold cyan]mcpserver Hierarchy[/bold cyan]")

if not data or not isinstance(data, dict):
tree.add("[yellow]No worker data returned.[/yellow]")
return tree

for worker_id, info in data.items():
# Status Icon
online = info.get("online", False)
icon = "✅" if online else "❌"
color = "green" if online else "red"

# Worker Node
worker_node = tree.add(f"{icon} [bold {color}]{worker_id}[/bold {color}]")
worker_node.add(f"[dim]Type:[/dim] [yellow]{info.get('type', 'generic')}[/yellow]")

if online:
# Recursively handle metadata or nested fleets
status_data = info.get("status", {})
meta_node = worker_node.add("📊 [bold white]Metadata[/bold white]")

# Handle standard fields
for k, v in status_data.items():
if k == "fleet":
# This node is an intermediate Hub!
meta_node.add("🔗 [bold magenta]Sub-Fleet Attached (Intermediate Hub)[/bold magenta]")
elif k == "labels" and isinstance(v, dict):
labels_node = worker_node.add("🏷️ [bold blue]Labels[/bold blue]")
for lk, lv in v.items():
labels_node.add(f"{lk}: [blue]{lv}[/blue]")
elif isinstance(v, (dict, list)):
continue # Skip nested complex objects in the top-level tree for cleanliness
else:
meta_node.add(f"{k}: [green]{v}[/green]")
else:
worker_node.add(f"[red]Error: {info.get('error', 'Unknown failure')}[/red]")

return tree

async def query_mcp(url, tool_name):
console.print(f"[bold blue]📡 Connecting to:[/bold blue] {url}")

try:
async with Client(url) as client:
with console.status(f"[bold yellow]Calling {tool_name}...[/bold yellow]"):
result = await client.call_tool(tool_name, {})

# Extract data from FastMCP's result wrapper
data = result
if hasattr(result, "content"):
text_content = result.content[0].text
try:
# Try to parse text as JSON if it looks like it
data = json.loads(text_content.replace("'", '"'))
except:
data = text_content

# MAKE IT PRETTY.
if tool_name == "get_fleet_status" and isinstance(data, dict):
console.print("\n")
console.print(Panel(render_fleet_tree(data), border_style="cyan", expand=False))

elif isinstance(data, (dict, list)):
console.print("\n")
console.print(Panel(
JSON.from_data(data),
title=f"[bold green]Result: {tool_name}[/bold green]",
border_style="green",
expand=False
))
else:
console.print(f"\n[bold green]Result:[/bold green] {data}")

except Exception as e:
console.print(f"\n[bold red]❌ Request Failed:[/bold red] {e}")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Query an MCP server/hub and print results prettily.")
parser.add_argument("url", nargs="?", default=DEFAULT_URL, help=f"Server URL (default: {DEFAULT_URL})")
parser.add_argument("tool", nargs="?", default=DEFAULT_TOOL, help=f"Tool to call (default: {DEFAULT_TOOL})")

args = parser.parse_args()

asyncio.run(query_mcp(args.url, args.tool))
1 change: 0 additions & 1 deletion mcpserver/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def get_parser():
default=False,
action="store_true",
)

parser.add_argument(
"--quiet",
dest="quiet",
Expand Down
52 changes: 51 additions & 1 deletion mcpserver/cli/args.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

import os
import socket

default_port = os.environ.get("MCPSERVER_PORT") or 8000
default_host = os.environ.get("MCPSERVER_HOST") or "0.0.0.0"
Expand All @@ -22,7 +23,7 @@ def populate_start_args(start):
start.add_argument(
"-t",
"--transport",
default="stdio",
default="http",
help="Transport to use (defaults to stdin)",
choices=["stdio", "http", "sse", "streamable-http"],
)
Expand Down Expand Up @@ -50,3 +51,52 @@ def populate_start_args(start):
action="store_true",
default=False,
)

# Hub Group
hub_group = start.add_argument_group("🦞 Hub Mode")
hub_group.add_argument(
"--hub",
action="store_true",
help="Start the server in Hub mode to aggregate remote workers.",
)
hub_group.add_argument(
"--hub-secret",
default=os.environ.get("MCP_HUB_SECRET"),
help="Secret key required for workers to register. (Auto-generated if omitted)",
)
start.add_argument(
"--system-type",
default="generic",
help="System type/template (e.g., 'generic', 'flux', 'kubernetes') or a full python module path.",
)

# Worker Registration Group
worker_group = start.add_argument_group("🐝 Worker Registration")
worker_group.add_argument(
"--join", help="URL of the MCP Hub to join (e.g., http://hub-host:8089)"
)
worker_group.add_argument(
"--join-secret",
help="The registration secret provided by the Hub.",
default=os.environ.get("MCPSERVER_JOIN_SECRET"),
)
worker_group.add_argument(
"--register-id",
help="Unique ID for this worker. Defaults to the hostname.",
default=socket.gethostname(),
)
worker_group.add_argument(
"--public-url",
help="The URL the Hub should use to reach this worker (e.g. http://ip:port/mcp)",
)
worker_group.add_argument(
"--worker-type",
default="generic",
help="Category of worker (e.g., 'flux', 'kubernetes', 'storage')",
)
worker_group.add_argument(
"--label",
action="append",
dest="labels",
help="Custom labels in key=value format (e.g., --label gpu=h100). Can be used multiple times.",
)
10 changes: 8 additions & 2 deletions mcpserver/cli/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
manager.register()


def get_manager(mcp, cfg):
def get_manager(mcp, cfg, system_type=None):
"""
Get the common tool manager and register tools.
"""
Expand All @@ -20,7 +20,13 @@ def get_manager(mcp, cfg):
print(f" ✅ Registered: {endpoint.name}")

# Load into the manager (tools, resources, prompts)
for tool in manager.load_tools(mcp, cfg.include, cfg.exclude):
# We pass the system_name and system path here
for tool in manager.load_tools(
mcp,
cfg.include,
cfg.exclude,
system_type=system_type,
):
print(f" ✅ Registered: {tool.name}")

# Visual to show user we have ssl
Expand Down
37 changes: 34 additions & 3 deletions mcpserver/cli/start.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import warnings
from contextlib import asynccontextmanager

import uvicorn
from fastapi import FastAPI
Expand All @@ -9,10 +11,12 @@
"ignore", category=DeprecationWarning, module="uvicorn.protocols.websockets"
)


from mcpserver.app import init_mcp
from mcpserver.cli.manager import get_manager
from mcpserver.core.config import MCPConfig
from mcpserver.core.hub import HubManager
from mcpserver.core.worker import WorkerManager
from mcpserver.logger import logger

# These are routes also served here
from mcpserver.routes import *
Expand All @@ -25,18 +29,45 @@ def main(args, extra, **kwargs):
"""
if args.config is not None:
print(f"📖 Loading config from {args.config}")
cfg = MCPConfig.from_yaml(args.config)
cfg = MCPConfig.from_yaml(args.config, args)
else:
cfg = MCPConfig.from_args(args)

# Get the tool manager and register discovered tools
mcp = init_mcp(cfg.exclude, cfg.include, args.mask_error_details)
get_manager(mcp, cfg)
get_manager(mcp, cfg, system_type=args.system_type)

# Create ASGI app from MCP server
mcp_app = mcp.http_app(path=cfg.server.path)
app = FastAPI(title="MCP Server", lifespan=mcp_app.lifespan)

# Setup Hub (parent role)
if args.hub:
mcp.hub_manager = HubManager.from_args(mcp, args)

# Setup Worker (child role) - triggered by --join. We reqiure join secret.
if args.join:
if not args.join_secret:
logger.exit("A --join-secret is required to register with a hub.")
mcp.worker_manager = WorkerManager.from_args(mcp, args, cfg)
mcp_app = mcp.http_app(path=cfg.server.path)

@asynccontextmanager
async def lifespan(app: FastAPI):
# startup logic for Worker registration
if hasattr(mcp, "worker_manager"):
asyncio.create_task(mcp.worker_manager.run_registration())

# Execute FastMCP's internal lifespan context
async with mcp_app.router.lifespan_context(app):
yield

app = FastAPI(title="MCP Server", lifespan=lifespan)

# Bind the /register endpoint if we are a Hub
if args.hub:
mcp.hub_manager.bind_to_app(app)

# Mount the MCP server. Note from V: we can use mount with antother FastMCP
# mcp.run can also be replaced with mcp.run_async
app.mount("/", mcp_app)
Expand Down
19 changes: 14 additions & 5 deletions mcpserver/core/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass, field
from dataclasses import dataclass, field, fields, replace
from typing import Any, Dict, List, Optional

import yaml
Expand Down Expand Up @@ -54,19 +54,28 @@ class MCPConfig:
resources: List[Capability] = field(default_factory=list)

@classmethod
def from_yaml(cls, path: str):
def from_yaml(cls, path: str, args=None):
args = args or {}
with open(path, "r") as f:
data = yaml.safe_load(f) or {}

return cls.from_dict(data)
return cls.from_dict(data, vars(args))

@classmethod
def from_dict(cls, data: Dict[str, Any]):
"""Helper to recursively build dataclasses from a dictionary."""
def from_dict(cls, data: Dict[str, Any], args=None):
"""
Helper to recursively build dataclasses from a dictionary.
"""
args = args or {}
# Build ServerConfig
server_data = data.get("server", {})
server_cfg = ServerConfig(**server_data)

# Command line takes precedence
field_names = {field.name for field in fields(ServerConfig)}
filtered_args = {k: v for k, v in args.items() if k in field_names}
server_cfg = replace(server_cfg, **filtered_args)

# Build Settings (Flattened in the dataclass)
settings = data.get("settings", {})

Expand Down
Loading