From 963536cc22d251a9a10247a316f3db263187407f Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 7 Mar 2026 22:58:42 -0800 Subject: [PATCH 1/3] wip: adding support for hub and worker roles We want to be able to create a hierarchy of mcpservers. This means any given server can be a hub, a worker, both, or neither. Signed-off-by: vsoch --- README.md | 11 +++ mcpserver/cli/__init__.py | 1 - mcpserver/cli/args.py | 32 ++++++++- mcpserver/cli/start.py | 48 ++++++++++++- mcpserver/core/hub.py | 142 ++++++++++++++++++++++++++++++++++++++ mcpserver/core/worker.py | 58 ++++++++++++++++ mcpserver/utils/text.py | 9 +++ mcpserver/version.py | 2 +- 8 files changed, 299 insertions(+), 4 deletions(-) create mode 100644 mcpserver/core/hub.py create mode 100644 mcpserver/core/worker.py diff --git a/README.md b/README.md index cedef96..529c89d 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,17 @@ export SSL_CERT_FILE=$(pwd)/certs/cert.pem ``` And you'll see the server get hit. +### Starting a Hub + +The mcp-server can register worker hubs, which are other MCP servers that register to it. To start the mcpserver as a hub: + +```bash +# Start a hub in one terminal +mcpserver start --hub +``` + + + ### Design Choices Here are a few design choices (subject to change, of course). I am starting with re-implementing our fractale agents with this framework. For that, instead of agents being tied to specific functions (as classes on their agent functions) we will have a flexible agent class that changes function based on a chosen prompt. It will use mcp functions, prompts, and resources. In addition: diff --git a/mcpserver/cli/__init__.py b/mcpserver/cli/__init__.py index 664bb01..c5074e7 100644 --- a/mcpserver/cli/__init__.py +++ b/mcpserver/cli/__init__.py @@ -30,7 +30,6 @@ def get_parser(): default=False, action="store_true", ) - parser.add_argument( "--quiet", dest="quiet", diff --git a/mcpserver/cli/args.py b/mcpserver/cli/args.py index ab09204..80be047 100644 --- a/mcpserver/cli/args.py +++ b/mcpserver/cli/args.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import os +import socket default_port = os.environ.get("MCPSERVER_PORT") or 8000 default_host = os.environ.get("MCPSERVER_HOST") or "0.0.0.0" @@ -22,7 +23,7 @@ def populate_start_args(start): start.add_argument( "-t", "--transport", - default="stdio", + default="http", help="Transport to use (defaults to stdin)", choices=["stdio", "http", "sse", "streamable-http"], ) @@ -50,3 +51,32 @@ def populate_start_args(start): action="store_true", default=False, ) + + # Hub Group + hub_group = start.add_argument_group("šŸ¦ž Hub Mode") + hub_group.add_argument( + "--hub", + action="store_true", + help="Start the server in Hub mode to aggregate remote workers.", + ) + hub_group.add_argument( + "--hub-secret", + default=os.environ.get("MCP_HUB_SECRET"), + help="Secret key required for workers to register. (Auto-generated if omitted)", + ) + + # Worker Registration Group + worker_group = start.add_argument_group("šŸ Worker Registration") + worker_group.add_argument( + "--join", help="URL of the MCP Hub to join (e.g., http://hub-host:8089)" + ) + worker_group.add_argument("--join-secret", help="The registration secret provided by the Hub.") + worker_group.add_argument( + "--register-id", + help="Unique ID for this worker. Defaults to the hostname.", + default=socket.gethostname(), + ) + worker_group.add_argument( + "--public-url", + help="The URL the Hub should use to reach this worker (e.g. http://ip:port/mcp)", + ) diff --git a/mcpserver/cli/start.py b/mcpserver/cli/start.py index 3e847a4..a144271 100644 --- a/mcpserver/cli/start.py +++ b/mcpserver/cli/start.py @@ -1,4 +1,6 @@ +import asyncio import warnings +from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI @@ -9,10 +11,12 @@ "ignore", category=DeprecationWarning, module="uvicorn.protocols.websockets" ) - from mcpserver.app import init_mcp from mcpserver.cli.manager import get_manager from mcpserver.core.config import MCPConfig +from mcpserver.core.hub import HubManager +from mcpserver.core.worker import WorkerManager +from mcpserver.logger import logger # These are routes also served here from mcpserver.routes import * @@ -37,6 +41,48 @@ def main(args, extra, **kwargs): mcp_app = mcp.http_app(path=cfg.server.path) app = FastAPI(title="MCP Server", lifespan=mcp_app.lifespan) + # Setup Hub (parent role) + if args.hub: + mcp.hub_manager = HubManager( + mcp, host=cfg.server.host, port=cfg.server.port, secret=args.hub_secret + ) + + # Setup Worker (child role) - triggered by --join + if args.join: + + # Require a join secret + if not args.join_secret: + logger.exit("A --join-secret is required to register with a hub.") + public_url = ( + args.public_url or f"http://{cfg.server.host}:{cfg.server.port}{cfg.server.path}" + ) + mcp.worker_manager = WorkerManager( + mcp, + hub_url=args.join, + secret=args.join_secret, + worker_id=args.register_id, + public_url=public_url, + ) + + mcp_app = mcp.http_app(path=cfg.server.path) + + # 3. Modern Chained Lifespan Fix + @asynccontextmanager + async def lifespan(app: FastAPI): + # Startup: Logic for Worker registration + if hasattr(mcp, "worker_manager"): + asyncio.create_task(mcp.worker_manager.run_registration()) + + # Chain: Execute FastMCP's internal lifespan context + async with mcp_app.router.lifespan_context(app): + yield + + app = FastAPI(title="MCP Server", lifespan=lifespan) + + # Bind the /register endpoint if we are a Hub + if args.hub: + mcp.hub_manager.bind_to_app(app) + # Mount the MCP server. Note from V: we can use mount with antother FastMCP # mcp.run can also be replaced with mcp.run_async app.mount("/", mcp_app) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py new file mode 100644 index 0000000..9e0e2b9 --- /dev/null +++ b/mcpserver/core/hub.py @@ -0,0 +1,142 @@ +import asyncio +import secrets +from typing import Any, Dict + +from fastmcp import Client +from mcp.types import Tool +from rich import print + +import mcpserver.utils as utils +from mcpserver.logger import logger + + +class HubManager: + """ + A hub manager is a part role that can serve / expose children workers. + """ + + def __init__(self, mcp, host: str, port: int, secret: str = None): + self.mcp = mcp + self.host = host + self.port = port + self.secret = secret or secrets.token_urlsafe(32) + self.workers: Dict[str, Dict[str, Any]] = {} + self.registration_url = f"http://{host}:{port}/register" + self._print_banner() + self._register_hub_tools() + + def _print_banner(self): + """ + Print that hub mode is active, how to connect, etc. + """ + print(f"\nšŸ›”ļø Hub Mode Active") + print(f" Master Secret: {self.secret}") + print(" Workers must use this secret to join the hub") + print(f" mcpserver start --join {self.registration_url}\n") + + def _register_hub_tools(self): + """ + Specific tools for a hub to advertise functionality. + """ + + @self.mcp.tool(name="get_fleet_status") + async def get_fleet_status() -> dict: + """ + Aggregate real-time status from registered children. + """ + if not self.workers: + return {"message": "No workers registered."} + return await self.fetch_all_statuses() + + async def fetch_all_statuses(self) -> dict: + """ + Handy function to get all statuses + """ + + async def get_one(wid, info): + try: + async with info["client"] as sess: + return wid, {"online": True, "status": await sess.call_tool("get_status", {})} + except Exception as e: + return wid, {"online": False, "error": str(e)} + + results = await asyncio.gather(*[get_one(w, i) for w, i in self.workers.items()]) + return dict(results) + + def bind_to_app(self, app): + """ + We have to call this to bind the hub to the app. + """ + from fastapi import HTTPException, Request + + @app.post("/register") + async def register(request: Request): + if not secrets.compare_digest(request.headers.get("X-MCP-Token", ""), self.secret): + raise HTTPException(status_code=403) + data = await request.json() + wid, wurl = data["id"], data["url"] + self.workers[wid] = {"url": wurl, "client": Client(wurl)} + asyncio.create_task(self._reflect_child_tools(wid, wurl)) + return {"status": "success"} + + async def _reflect_child_tools(self, worker_id: str, url: str): + """ + Discover worker (child) tools + """ + try: + async with Client(url) as client: + tools = await client.list_tools() + print() + for tool in tools: + self._create_proxy(worker_id, url, tool) + except Exception as e: + logger.error(f"Failed to reflect tools for {worker_id}: {e}") + + def _create_proxy(self, worker_id: str, url: str, tool: Tool): + """ + Dynamically adds a proxied tool to the FastMCP instance. + """ + # Generate a safe function name and map original argument names + proxy_name = f"{utils.sanitize(worker_id)}_{utils.sanitize(tool.name)}" + + # Map original argument names to safe Python parameter names + properties = tool.inputSchema.get("properties", {}) + + # Map: {"safe_name": "original-name"} + arg_mapping = {utils.sanitize(k): k for k in properties.keys()} + + # Create the signature string: arg_1=None, arg_2=None + arg_string = ", ".join([f"{safe_name}=None" for safe_name in arg_mapping.keys()]) + + # 3. Build the dynamic function + exec_globals = { + "Client": Client, + "target_url": url, + "target_tool": tool.name, + "arg_mapping": arg_mapping, + "logger": logger, + } + namespace = {} + + # We use the arg_mapping inside the function to restore the original + # names (with hyphens) before calling the remote tool. + func_def = ( + f"async def {proxy_name}({arg_string}):\n" + f" # Map safe Python names back to original schema names\n" + f" raw_locals = locals()\n" + f" args = {{arg_mapping[k]: raw_locals[k] for k in arg_mapping if raw_locals[k] is not None}}\n" + f" async with Client(target_url) as client:\n" + f" return await client.call_tool(target_tool, args)" + ) + + try: + exec(func_def, exec_globals, namespace) + proxy_func = namespace[proxy_name] + proxy_func.__doc__ = tool.description + + # Register with FastMCP + self.mcp.tool(name=proxy_name)(proxy_func) + print(f"šŸ›°ļø Discovered worker tool: [blue]{proxy_name}[/blue]") + + except Exception as e: + logger.error(f"āŒ Failed to generate dynamic proxy for {tool.name}: {e}") diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py new file mode 100644 index 0000000..5e56bce --- /dev/null +++ b/mcpserver/core/worker.py @@ -0,0 +1,58 @@ +import asyncio +import socket + +import httpx + +from mcpserver.logger import logger + + +class WorkerManager: + """ + A worker mcpserver advertises its tools to a parent hub. + """ + + def __init__(self, mcp, hub_url, secret, worker_id=None, public_url=None): + self.mcp = mcp + self.hub_url = hub_url + self.secret = secret + self.worker_id = worker_id or socket.gethostname() + self.public_url = public_url + self._register_worker_tools() + + def _register_worker_tools(self): + """ + This function will be able to return a live status. + + Likely we will need to make this an interface that can be customized + depending on the worker type. + """ + + @self.mcp.tool(name="get_status") + async def get_status() -> dict: + """Reports local status and nested fleet status if acting as a Hub.""" + status = {"id": self.worker_id, "type": "leaf"} + # The Fractal Logic: If we are also a hub, include our children + if hasattr(self.mcp, "hub_manager"): + status["type"] = "intermediate_hub" + status["fleet"] = await self.mcp.hub_manager.fetch_all_statuses() + return status + + async def run_registration(self): + """ + Perform the dial-home registration. + + E.T. PHONE HOME!! (stop it, Vanessa) :_) + """ + await asyncio.sleep(1) # Wait for local server to be ready + async with httpx.AsyncClient() as client: + try: + res = await client.post( + self.hub_url, # user provides the /register url + json={"id": self.worker_id, "url": self.public_url}, + headers={"X-MCP-Token": self.secret}, + timeout=10, + ) + res.raise_for_status() + logger.info(f"āœ… Registered with parent hub: {self.hub_url}") + except Exception as e: + logger.error(f"āŒ Failed to register with hub: {e}") diff --git a/mcpserver/utils/text.py b/mcpserver/utils/text.py index 83b43c7..68be766 100644 --- a/mcpserver/utils/text.py +++ b/mcpserver/utils/text.py @@ -1,6 +1,15 @@ import re +def sanitize(name: str) -> str: + # Replace hyphens/dots with underscores + clean = name.replace("-", "_").replace(".", "_") + # Python identifiers cannot start with a digit + if clean[0].isdigit(): + clean = f"n_{clean}" + return clean + + def get_code_block(content, code_type=None): """ Parse a code block from the response diff --git a/mcpserver/version.py b/mcpserver/version.py index a61f309..1d6d6b6 100644 --- a/mcpserver/version.py +++ b/mcpserver/version.py @@ -1,4 +1,4 @@ -__version__ = "0.0.15" +__version__ = "0.0.16" AUTHOR = "Vanessa Sochat" AUTHOR_EMAIL = "vsoch@users.noreply.github.com" NAME = "mcp-serve" From 9ae8f4db1ee5e69d7969b8135776d87cb5da1593 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 8 Mar 2026 18:41:45 -0700 Subject: [PATCH 2/3] feat: base system tool to return status Signed-off-by: vsoch --- README.md | 26 ++++- examples/mcp-query.py | 105 ++++++++++++++++++ mcpserver/cli/args.py | 11 ++ mcpserver/cli/start.py | 27 +---- mcpserver/core/config.py | 19 +++- mcpserver/core/hub.py | 92 ++++++++++++--- mcpserver/core/worker.py | 84 +++++++++----- mcpserver/tools/base.py | 11 +- mcpserver/tools/manager.py | 14 ++- mcpserver/tools/prompts.py | 2 - mcpserver/tools/simple/tool.py | 3 - mcpserver/tools/status/prompts.py | 28 ----- mcpserver/tools/status/tool.py | 16 --- .../tools/{status => system}/__init__.py | 0 mcpserver/tools/system/tool.py | 47 ++++++++ mcpserver/utils/text.py | 4 + 16 files changed, 365 insertions(+), 124 deletions(-) create mode 100644 examples/mcp-query.py delete mode 100644 mcpserver/tools/prompts.py delete mode 100644 mcpserver/tools/status/prompts.py delete mode 100644 mcpserver/tools/status/tool.py rename mcpserver/tools/{status => system}/__init__.py (100%) create mode 100644 mcpserver/tools/system/tool.py diff --git a/README.md b/README.md index 529c89d..03867c4 100644 --- a/README.md +++ b/README.md @@ -256,10 +256,29 @@ The mcp-server can register worker hubs, which are other MCP servers that regist ```bash # Start a hub in one terminal -mcpserver start --hub +mcpserver start --hub --hub-secret potato ``` +In another terminal, start a worker using the token that is generated. Add some functions for fun. +```bash +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --join-secret portato --port 7777 +``` + +Test doing queries for status: + +```bash +# Get listing of workers and metadata +python3 ./examples/mcp-query.py + +# Get a specific tool metadata from the worker +python3 ./examples/mcp-query.py http://localhost:7777/mcp get_status + +# Call a namespaced tool on the hub (e.g., get the status) +python3 ./examples/mcp-query.py http://localhost:8000/mcp n_781e903e4f10_get_status +``` + +You can test it without the join secret, or a wrong join secret, to see it fail. ### Design Choices @@ -274,7 +293,10 @@ Here are a few design choices (subject to change, of course). I am starting with ## TODO -- Full operator with Flux example (Flux operator with HPC apps and jobspec translation) +- [ ] join secret should be allowed from environment +- [ ] debug why port not taking +- [ ] design --system-name and --system +- [ ] Need to handle worker disconnect and reconnect. ## License diff --git a/examples/mcp-query.py b/examples/mcp-query.py new file mode 100644 index 0000000..b0c37df --- /dev/null +++ b/examples/mcp-query.py @@ -0,0 +1,105 @@ +import asyncio +import sys +import json +import argparse +from fastmcp import Client +from rich.console import Console +from rich.tree import Tree +from rich.json import JSON +from rich.panel import Panel +from rich.table import Table + +console = Console() + +# --- Defaults --- +DEFAULT_URL = "http://localhost:8000/mcp" +DEFAULT_TOOL = "get_fleet_status" + +def render_fleet_tree(data: dict): + """ + Renders the nested Hub/Worker hierarchy as a Tree. + """ + tree = Tree("🌐 [bold cyan]mcpserver Hierarchy[/bold cyan]") + + if not data or not isinstance(data, dict): + tree.add("[yellow]No worker data returned.[/yellow]") + return tree + + for worker_id, info in data.items(): + # Status Icon + online = info.get("online", False) + icon = "āœ…" if online else "āŒ" + color = "green" if online else "red" + + # Worker Node + worker_node = tree.add(f"{icon} [bold {color}]{worker_id}[/bold {color}]") + worker_node.add(f"[dim]Type:[/dim] [yellow]{info.get('type', 'generic')}[/yellow]") + + if online: + # Recursively handle metadata or nested fleets + status_data = info.get("status", {}) + meta_node = worker_node.add("šŸ“Š [bold white]Metadata[/bold white]") + + # Handle standard fields + for k, v in status_data.items(): + if k == "fleet": + # This node is an intermediate Hub! + meta_node.add("šŸ”— [bold magenta]Sub-Fleet Attached (Intermediate Hub)[/bold magenta]") + elif k == "labels" and isinstance(v, dict): + labels_node = worker_node.add("šŸ·ļø [bold blue]Labels[/bold blue]") + for lk, lv in v.items(): + labels_node.add(f"{lk}: [blue]{lv}[/blue]") + elif isinstance(v, (dict, list)): + continue # Skip nested complex objects in the top-level tree for cleanliness + else: + meta_node.add(f"{k}: [green]{v}[/green]") + else: + worker_node.add(f"[red]Error: {info.get('error', 'Unknown failure')}[/red]") + + return tree + +async def query_mcp(url, tool_name): + console.print(f"[bold blue]šŸ“” Connecting to:[/bold blue] {url}") + + try: + async with Client(url) as client: + with console.status(f"[bold yellow]Calling {tool_name}...[/bold yellow]"): + result = await client.call_tool(tool_name, {}) + + # Extract data from FastMCP's result wrapper + data = result + if hasattr(result, "content"): + text_content = result.content[0].text + try: + # Try to parse text as JSON if it looks like it + data = json.loads(text_content.replace("'", '"')) + except: + data = text_content + + # --- PRETTY RENDERING --- + if tool_name == "get_fleet_status" and isinstance(data, dict): + console.print("\n") + console.print(Panel(render_fleet_tree(data), border_style="cyan", expand=False)) + + elif isinstance(data, (dict, list)): + console.print("\n") + console.print(Panel( + JSON.from_data(data), + title=f"[bold green]Result: {tool_name}[/bold green]", + border_style="green", + expand=False + )) + else: + console.print(f"\n[bold green]Result:[/bold green] {data}") + + except Exception as e: + console.print(f"\n[bold red]āŒ Request Failed:[/bold red] {e}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Query an MCP server/hub and print results prettily.") + parser.add_argument("url", nargs="?", default=DEFAULT_URL, help=f"Server URL (default: {DEFAULT_URL})") + parser.add_argument("tool", nargs="?", default=DEFAULT_TOOL, help=f"Tool to call (default: {DEFAULT_TOOL})") + + args = parser.parse_args() + + asyncio.run(query_mcp(args.url, args.tool)) \ No newline at end of file diff --git a/mcpserver/cli/args.py b/mcpserver/cli/args.py index 80be047..4f82d30 100644 --- a/mcpserver/cli/args.py +++ b/mcpserver/cli/args.py @@ -80,3 +80,14 @@ def populate_start_args(start): "--public-url", help="The URL the Hub should use to reach this worker (e.g. http://ip:port/mcp)", ) + worker_group.add_argument( + "--worker-type", + default="generic", + help="Category of worker (e.g., 'flux', 'kubernetes', 'storage')", + ) + worker_group.add_argument( + "--label", + action="append", + dest="labels", + help="Custom labels in key=value format (e.g., --label gpu=h100). Can be used multiple times.", + ) diff --git a/mcpserver/cli/start.py b/mcpserver/cli/start.py index a144271..a9069ac 100644 --- a/mcpserver/cli/start.py +++ b/mcpserver/cli/start.py @@ -29,7 +29,7 @@ def main(args, extra, **kwargs): """ if args.config is not None: print(f"šŸ“– Loading config from {args.config}") - cfg = MCPConfig.from_yaml(args.config) + cfg = MCPConfig.from_yaml(args.config, args) else: cfg = MCPConfig.from_args(args) @@ -43,37 +43,22 @@ def main(args, extra, **kwargs): # Setup Hub (parent role) if args.hub: - mcp.hub_manager = HubManager( - mcp, host=cfg.server.host, port=cfg.server.port, secret=args.hub_secret - ) + mcp.hub_manager = HubManager.from_args(mcp, args) - # Setup Worker (child role) - triggered by --join + # Setup Worker (child role) - triggered by --join. We reqiure join secret. if args.join: - - # Require a join secret if not args.join_secret: logger.exit("A --join-secret is required to register with a hub.") - public_url = ( - args.public_url or f"http://{cfg.server.host}:{cfg.server.port}{cfg.server.path}" - ) - mcp.worker_manager = WorkerManager( - mcp, - hub_url=args.join, - secret=args.join_secret, - worker_id=args.register_id, - public_url=public_url, - ) - + mcp.worker_manager = WorkerManager.from_args(mcp, args, cfg) mcp_app = mcp.http_app(path=cfg.server.path) - # 3. Modern Chained Lifespan Fix @asynccontextmanager async def lifespan(app: FastAPI): - # Startup: Logic for Worker registration + # startup logic for Worker registration if hasattr(mcp, "worker_manager"): asyncio.create_task(mcp.worker_manager.run_registration()) - # Chain: Execute FastMCP's internal lifespan context + # Execute FastMCP's internal lifespan context async with mcp_app.router.lifespan_context(app): yield diff --git a/mcpserver/core/config.py b/mcpserver/core/config.py index 5bbc483..a24e7b4 100644 --- a/mcpserver/core/config.py +++ b/mcpserver/core/config.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass, field, fields, replace from typing import Any, Dict, List, Optional import yaml @@ -54,19 +54,28 @@ class MCPConfig: resources: List[Capability] = field(default_factory=list) @classmethod - def from_yaml(cls, path: str): + def from_yaml(cls, path: str, args=None): + args = args or {} with open(path, "r") as f: data = yaml.safe_load(f) or {} - return cls.from_dict(data) + return cls.from_dict(data, vars(args)) @classmethod - def from_dict(cls, data: Dict[str, Any]): - """Helper to recursively build dataclasses from a dictionary.""" + def from_dict(cls, data: Dict[str, Any], args=None): + """ + Helper to recursively build dataclasses from a dictionary. + """ + args = args or {} # Build ServerConfig server_data = data.get("server", {}) server_cfg = ServerConfig(**server_data) + # Command line takes precedence + field_names = {field.name for field in fields(ServerConfig)} + filtered_args = {k: v for k, v in args.items() if k in field_names} + server_cfg = replace(server_cfg, **filtered_args) + # Build Settings (Flattened in the dataclass) settings = data.get("settings", {}) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index 9e0e2b9..4e2ab65 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -1,6 +1,7 @@ import asyncio +import json import secrets -from typing import Any, Dict +from typing import Any, Dict, Optional from fastmcp import Client from mcp.types import Tool @@ -21,10 +22,23 @@ def __init__(self, mcp, host: str, port: int, secret: str = None): self.port = port self.secret = secret or secrets.token_urlsafe(32) self.workers: Dict[str, Dict[str, Any]] = {} + + # Track registered proxies to prevent ValueError on worker re-registration + self._registered_proxies = set() + self.registration_url = f"http://{host}:{port}/register" self._print_banner() self._register_hub_tools() + @classmethod + def from_args(cls, mcp, args) -> Optional["HubManager"]: + """ + Factory to create a HubManager from CLI arguments. + """ + if not getattr(args, "hub", False): + return None + return cls(mcp, host=args.host, port=args.port, secret=args.hub_secret) + def _print_banner(self): """ Print that hub mode is active, how to connect, etc. @@ -50,15 +64,42 @@ async def get_fleet_status() -> dict: async def fetch_all_statuses(self) -> dict: """ - Handy function to get all statuses + Handy function to get all statuses. + Now extracts the actual payload from the MCP CallToolResult. """ async def get_one(wid, info): + base_metadata = { + "type": info.get("type", "generic"), + "labels": info.get("labels", {}), + "url": info["url"], + } try: async with info["client"] as sess: - return wid, {"online": True, "status": await sess.call_tool("get_status", {})} + # 1. Call the tool + mcp_result = await sess.call_tool("get_status", {}) + + # 2. Extract the text content from the MCP wrapper + # FastMCP result.content is a list of content blocks + raw_text = mcp_result.content[0].text + + # 3. Parse the string back into a dictionary + try: + # Handle potential single quotes from Python's str(dict) + status_data = json.loads(raw_text.replace("'", '"')) + except: + status_data = raw_text + + return ( + wid, + { + **base_metadata, + "online": True, + "status": status_data, + }, + ) except Exception as e: - return wid, {"online": False, "error": str(e)} + return wid, {**base_metadata, "online": False, "error": str(e)} results = await asyncio.gather(*[get_one(w, i) for w, i in self.workers.items()]) return dict(results) @@ -75,7 +116,16 @@ async def register(request: Request): raise HTTPException(status_code=403) data = await request.json() wid, wurl = data["id"], data["url"] - self.workers[wid] = {"url": wurl, "client": Client(wurl)} + + # Capture the new identity fields from the registration payload + # If the worker already existed, this updates the URL/Client + self.workers[wid] = { + "url": wurl, + "client": Client(wurl), + "type": data.get("type", "generic"), + "labels": data.get("labels", {}), + } + asyncio.create_task(self._reflect_child_tools(wid, wurl)) return {"status": "success"} @@ -86,19 +136,25 @@ async def _reflect_child_tools(self, worker_id: str, url: str): try: async with Client(url) as client: tools = await client.list_tools() + # Print a single newline for the discovery group print() for tool in tools: - self._create_proxy(worker_id, url, tool) + self._create_proxy(worker_id, tool) except Exception as e: logger.error(f"Failed to reflect tools for {worker_id}: {e}") - def _create_proxy(self, worker_id: str, url: str, tool: Tool): + def _create_proxy(self, worker_id: str, tool: Tool): """ Dynamically adds a proxied tool to the FastMCP instance. """ - # Generate a safe function name and map original argument names + # Generate a safe function name proxy_name = f"{utils.sanitize(worker_id)}_{utils.sanitize(tool.name)}" + # FIX: Check if this tool is already registered to avoid ValueError on re-registration + if proxy_name in self._registered_proxies: + print(f"šŸ›°ļø Re-discovered worker tool: [blue]{proxy_name}[/blue]") + return + # Map original argument names to safe Python parameter names properties = tool.inputSchema.get("properties", {}) @@ -109,23 +165,30 @@ def _create_proxy(self, worker_id: str, url: str, tool: Tool): arg_string = ", ".join([f"{safe_name}=None" for safe_name in arg_mapping.keys()]) # 3. Build the dynamic function + # FIX: We pass 'self' (the HubManager instance) as 'hub' to the exec scope. + # This allows the proxy function to look up the LATEST url for the worker + # every time it is called, instead of using a hardcoded stale URL. exec_globals = { "Client": Client, - "target_url": url, + "hub": self, + "worker_id": worker_id, "target_tool": tool.name, "arg_mapping": arg_mapping, "logger": logger, } namespace = {} - # We use the arg_mapping inside the function to restore the original - # names (with hyphens) before calling the remote tool. + # The function logic now resolves the URL at call-time from the hub's worker registry func_def = ( f"async def {proxy_name}({arg_string}):\n" - f" # Map safe Python names back to original schema names\n" + f" # Look up current worker info from the manager\n" + f" info = hub.workers.get(worker_id)\n" + f" if not info:\n" + f" return {{'error': f'Worker {{worker_id}} no longer registered'}}\n" + f" url = info['url']\n" f" raw_locals = locals()\n" f" args = {{arg_mapping[k]: raw_locals[k] for k in arg_mapping if raw_locals[k] is not None}}\n" - f" async with Client(target_url) as client:\n" + f" async with Client(url) as client:\n" f" return await client.call_tool(target_tool, args)" ) @@ -136,6 +199,9 @@ def _create_proxy(self, worker_id: str, url: str, tool: Tool): # Register with FastMCP self.mcp.tool(name=proxy_name)(proxy_func) + + # Mark as registered so we don't try to add it again on restart + self._registered_proxies.add(proxy_name) print(f"šŸ›°ļø Discovered worker tool: [blue]{proxy_name}[/blue]") except Exception as e: diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py index 5e56bce..89fe0fa 100644 --- a/mcpserver/core/worker.py +++ b/mcpserver/core/worker.py @@ -1,5 +1,6 @@ import asyncio import socket +from typing import Optional import httpx @@ -11,48 +12,75 @@ class WorkerManager: A worker mcpserver advertises its tools to a parent hub. """ - def __init__(self, mcp, hub_url, secret, worker_id=None, public_url=None): + def __init__( + self, + mcp, + hub_url, + secret, + worker_id=None, + public_url=None, + worker_type="generic", + labels=None, + ): self.mcp = mcp self.hub_url = hub_url self.secret = secret self.worker_id = worker_id or socket.gethostname() + self.worker_type = worker_type + self.labels = self._parse_labels(labels) self.public_url = public_url - self._register_worker_tools() - def _register_worker_tools(self): + @classmethod + def from_args(cls, mcp, args, cfg) -> Optional["WorkerManager"]: """ - This function will be able to return a live status. - - Likely we will need to make this an interface that can be customized - depending on the worker type. + Factory to create a WorkerManager from CLI arguments. """ + if not getattr(args, "join", None): + return None - @self.mcp.tool(name="get_status") - async def get_status() -> dict: - """Reports local status and nested fleet status if acting as a Hub.""" - status = {"id": self.worker_id, "type": "leaf"} - # The Fractal Logic: If we are also a hub, include our children - if hasattr(self.mcp, "hub_manager"): - status["type"] = "intermediate_hub" - status["fleet"] = await self.mcp.hub_manager.fetch_all_statuses() - return status + # Auto-construct public URL if not provided + public_url = ( + args.public_url or f"http://{cfg.server.host}:{cfg.server.port}{cfg.server.path}" + ) + return cls( + mcp, + hub_url=args.join, + secret=args.join_secret, + worker_id=args.register_id, + public_url=public_url, + worker_type=args.worker_type, + labels=args.labels, + ) - async def run_registration(self): + def _parse_labels(self, label_list) -> dict: + """ + Converts ['key=val', 'key2=val2'] to a dictionary. """ - Perform the dial-home registration. + labels = {} + if not label_list: + return labels + for item in label_list: + if "=" in item: + k, v = item.split("=", 1) + labels[k.strip()] = v.strip() + return labels - E.T. PHONE HOME!! (stop it, Vanessa) :_) + async def run_registration(self): + """ + worker registration payload. """ - await asyncio.sleep(1) # Wait for local server to be ready + await asyncio.sleep(1) async with httpx.AsyncClient() as client: + payload = { + "id": self.worker_id, + "url": self.public_url, + "type": self.worker_type, + "labels": self.labels, + } + headers = {"X-MCP-Token": self.secret} try: - res = await client.post( - self.hub_url, # user provides the /register url - json={"id": self.worker_id, "url": self.public_url}, - headers={"X-MCP-Token": self.secret}, - timeout=10, - ) + res = await client.post(f"{self.hub_url}/register", json=payload, headers=headers) res.raise_for_status() - logger.info(f"āœ… Registered with parent hub: {self.hub_url}") + logger.info(f"āœ… Registered as '{self.worker_id}' ({self.worker_type})") except Exception as e: - logger.error(f"āŒ Failed to register with hub: {e}") + logger.error(f"āŒ Registration failed: {e}") diff --git a/mcpserver/tools/base.py b/mcpserver/tools/base.py index 2233206..0bb26a3 100644 --- a/mcpserver/tools/base.py +++ b/mcpserver/tools/base.py @@ -9,8 +9,15 @@ class BaseTool(ABC): Each tool can provision prompts, resources, or tools. """ - def setup(self): - pass + def setup(self, manager=None): + self.manager = manager + + def get_status(self) -> dict: + """ + Optional: Override this to provide custom status + information for this specific tool set. + """ + return {} def get_mcp_tools(self) -> List[Callable]: return self.get_mcp_methods("_is_mcp_tool") diff --git a/mcpserver/tools/manager.py b/mcpserver/tools/manager.py index b6c75dd..90c82ce 100644 --- a/mcpserver/tools/manager.py +++ b/mcpserver/tools/manager.py @@ -19,6 +19,9 @@ class ToolManager: def __init__(self): self.tools = {} + # Active instances that can deliver metadata about status + self.instances = {} + def load_function(self, tool_path): """ Assume this is the function name provided @@ -101,10 +104,14 @@ def discover_tools(self, root_path: str, module_path: str) -> Dict[str, Path]: discovered[tool_id] = {"path": file_path, "module": import_path, "root": root_path} return discovered - def load_tools(self, mcp, include=None, exclude=None): + def load_tools(self, mcp, include=None, exclude=None, status_tool: str = None): """ Load a set of named tools, or default to all those discovered. """ + # Start with the system status tool + status_tool = status_tool or "mcpserver.tools.system.tool" + self.tools["system"] = {"module": status_tool} + # If no tools are selected... select all tools discovered names = self.tools include = "(%s)" % "|".join(include) if include else None @@ -131,6 +138,7 @@ def load_tools(self, mcp, include=None, exclude=None): instance = self.load_tool(name) if not instance: continue + self.instances[name] = instance # Add tools, resources, and prompts on the fly for ToolClass in [Tool, Resource, Prompt]: @@ -174,11 +182,9 @@ def load_tool(self, tool_id: str) -> BaseTool: for _, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj is not BaseTool: - # Instantiate instance = obj() - # Inject the filesystem-derived name instance.name = tool_id - instance.setup() + instance.setup(manager=self) return instance except ImportError as e: diff --git a/mcpserver/tools/prompts.py b/mcpserver/tools/prompts.py deleted file mode 100644 index a5bca61..0000000 --- a/mcpserver/tools/prompts.py +++ /dev/null @@ -1,2 +0,0 @@ -def format_rules(rules): - return "\n".join([f"- {r}" for r in rules]) diff --git a/mcpserver/tools/simple/tool.py b/mcpserver/tools/simple/tool.py index c2566d1..f75b800 100644 --- a/mcpserver/tools/simple/tool.py +++ b/mcpserver/tools/simple/tool.py @@ -7,9 +7,6 @@ class EchoTool(BaseTool): The EchoTool is primarily for testing. """ - def setup(self): - pass - @mcp.tool(name="simple_echo") def echo(self, message: str): """Echo the message back (return it)""" diff --git a/mcpserver/tools/status/prompts.py b/mcpserver/tools/status/prompts.py deleted file mode 100644 index 2a7c908..0000000 --- a/mcpserver/tools/status/prompts.py +++ /dev/null @@ -1,28 +0,0 @@ -import mcpserver.tools.prompts as prompts - -PERSONA = "You are a workflow status expert." - -CONTEXT = "We just completed a step in an orchestration. We need to determine the final status. If you see a return code and it is 0, you MUST indicate success." - -REQUIRES = [ - "You MUST return a single json structure with a single field 'action'", - "The 'action' must be 'failure' or 'success'", -] - - -def get_status_text(content): - return f""" -### PERSONA -{PERSONA} - -### CONTEXT -{CONTEXT} - -### GOAL -Look at the step output and determine if the step has failed or succeeded. -{content} - -### INSTRUCTIONS -You must adhere to these rules strictly: -{prompts.format_rules(REQUIRES)} -""" diff --git a/mcpserver/tools/status/tool.py b/mcpserver/tools/status/tool.py deleted file mode 100644 index c556a37..0000000 --- a/mcpserver/tools/status/tool.py +++ /dev/null @@ -1,16 +0,0 @@ -import mcpserver.tools.status.prompts as prompts -from mcpserver.tools.base import BaseTool -from mcpserver.tools.decorator import mcp - - -class StatusTool(BaseTool): - - @mcp.prompt( - name="check_finished_prompt", description="Look at step outputs and determined if finished" - ) - def check_finished_prompt(self, content: str) -> dict: - """ - Generates agent instructions for determining if a step is completed, successful, failed. - """ - prompt_text = prompts.get_status_text(content) - return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} diff --git a/mcpserver/tools/status/__init__.py b/mcpserver/tools/system/__init__.py similarity index 100% rename from mcpserver/tools/status/__init__.py rename to mcpserver/tools/system/__init__.py diff --git a/mcpserver/tools/system/tool.py b/mcpserver/tools/system/tool.py new file mode 100644 index 0000000..9bd1772 --- /dev/null +++ b/mcpserver/tools/system/tool.py @@ -0,0 +1,47 @@ +import os +import platform +import time +from typing import Any, Dict + +from mcpserver.tools.base import BaseTool +from mcpserver.tools.decorator import mcp + + +class SystemTool(BaseTool): + """ + Provides server metadata and a manifest of loaded tools. + """ + + def setup(self, manager=None): + # Store the manager reference provided by the standard loading strategy + self.manager = manager + + @mcp.tool(name="get_status") + def get_status(self) -> Dict[str, Any]: + """ + Returns a structured report of the server environment and loaded tools. + """ + # 1. Custom status implemented by this class + status = { + "timestamp": time.time(), + "system": { + "os": platform.system(), + "node": platform.node(), + "python": platform.python_version(), + "cwd": os.getcwd() if hasattr(os, "getcwd") else "unknown", + }, + "tools": {}, + } + + # 2. Metadata about tools (The Manifest) + # We look at the manager's active instances + if self.manager and hasattr(self.manager, "instances"): + for tool_id, inst in self.manager.instances.items(): + if inst == self: + continue + status["tools"][tool_id] = { + "class": inst.__class__.__name__, + "description": inst.__doc__.strip() if inst.__doc__ else "n/a", + } + + return status diff --git a/mcpserver/utils/text.py b/mcpserver/utils/text.py index 68be766..07ad3e2 100644 --- a/mcpserver/utils/text.py +++ b/mcpserver/utils/text.py @@ -10,6 +10,10 @@ def sanitize(name: str) -> str: return clean +def format_rules(rules): + return "\n".join([f"- {r}" for r in rules]) + + def get_code_block(content, code_type=None): """ Parse a code block from the response From 2881fe68dc2bff3282b408f422c19ff6aaeef6ca Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 8 Mar 2026 21:42:39 -0700 Subject: [PATCH 3/3] feat: add system backends, generic and flux Signed-off-by: vsoch --- README.md | 22 +++-- examples/mcp-query.py | 3 +- mcpserver/cli/args.py | 11 ++- mcpserver/cli/manager.py | 10 ++- mcpserver/cli/start.py | 2 +- mcpserver/core/hub.py | 11 ++- mcpserver/core/worker.py | 5 +- mcpserver/tools/manager.py | 22 +++-- mcpserver/tools/system/__init__.py | 9 +++ mcpserver/tools/system/flux.py | 42 ++++++++++ mcpserver/tools/system/generic.py | 55 +++++++++++++ mcpserver/tools/system/kubernetes.py | 116 +++++++++++++++++++++++++++ mcpserver/tools/system/tool.py | 47 ----------- 13 files changed, 284 insertions(+), 71 deletions(-) create mode 100644 mcpserver/tools/system/flux.py create mode 100644 mcpserver/tools/system/generic.py create mode 100644 mcpserver/tools/system/kubernetes.py delete mode 100644 mcpserver/tools/system/tool.py diff --git a/README.md b/README.md index 03867c4..d46b435 100644 --- a/README.md +++ b/README.md @@ -262,7 +262,20 @@ mcpserver start --hub --hub-secret potato In another terminal, start a worker using the token that is generated. Add some functions for fun. ```bash -mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --join-secret portato --port 7777 +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --join-secret potato --port 7777 +``` + +Note that you can also set the secret in the environemnt. + +```bash +export MCPSERVER_JOIN_SECRET=potato +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 +``` + +Register the worker sytem type instead as flux: + +```bash +mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --system-type flux ``` Test doing queries for status: @@ -293,10 +306,9 @@ Here are a few design choices (subject to change, of course). I am starting with ## TODO -- [ ] join secret should be allowed from environment -- [ ] debug why port not taking -- [ ] design --system-name and --system -- [ ] Need to handle worker disconnect and reconnect. +- [ ] need to expose tools from system (child worker) instances +- [ ] need to decide on dispatch strategy / algorithm +- [ ] add in fluxion queue stats via RPC call to flux status ## License diff --git a/examples/mcp-query.py b/examples/mcp-query.py index b0c37df..356a249 100644 --- a/examples/mcp-query.py +++ b/examples/mcp-query.py @@ -11,7 +11,6 @@ console = Console() -# --- Defaults --- DEFAULT_URL = "http://localhost:8000/mcp" DEFAULT_TOOL = "get_fleet_status" @@ -76,7 +75,7 @@ async def query_mcp(url, tool_name): except: data = text_content - # --- PRETTY RENDERING --- + # MAKE IT PRETTY. if tool_name == "get_fleet_status" and isinstance(data, dict): console.print("\n") console.print(Panel(render_fleet_tree(data), border_style="cyan", expand=False)) diff --git a/mcpserver/cli/args.py b/mcpserver/cli/args.py index 4f82d30..84ca855 100644 --- a/mcpserver/cli/args.py +++ b/mcpserver/cli/args.py @@ -64,13 +64,22 @@ def populate_start_args(start): default=os.environ.get("MCP_HUB_SECRET"), help="Secret key required for workers to register. (Auto-generated if omitted)", ) + start.add_argument( + "--system-type", + default="generic", + help="System type/template (e.g., 'generic', 'flux', 'kubernetes') or a full python module path.", + ) # Worker Registration Group worker_group = start.add_argument_group("šŸ Worker Registration") worker_group.add_argument( "--join", help="URL of the MCP Hub to join (e.g., http://hub-host:8089)" ) - worker_group.add_argument("--join-secret", help="The registration secret provided by the Hub.") + worker_group.add_argument( + "--join-secret", + help="The registration secret provided by the Hub.", + default=os.environ.get("MCPSERVER_JOIN_SECRET"), + ) worker_group.add_argument( "--register-id", help="Unique ID for this worker. Defaults to the hostname.", diff --git a/mcpserver/cli/manager.py b/mcpserver/cli/manager.py index 1b29345..885595a 100644 --- a/mcpserver/cli/manager.py +++ b/mcpserver/cli/manager.py @@ -6,7 +6,7 @@ manager.register() -def get_manager(mcp, cfg): +def get_manager(mcp, cfg, system_type=None): """ Get the common tool manager and register tools. """ @@ -20,7 +20,13 @@ def get_manager(mcp, cfg): print(f" āœ… Registered: {endpoint.name}") # Load into the manager (tools, resources, prompts) - for tool in manager.load_tools(mcp, cfg.include, cfg.exclude): + # We pass the system_name and system path here + for tool in manager.load_tools( + mcp, + cfg.include, + cfg.exclude, + system_type=system_type, + ): print(f" āœ… Registered: {tool.name}") # Visual to show user we have ssl diff --git a/mcpserver/cli/start.py b/mcpserver/cli/start.py index a9069ac..002f54e 100644 --- a/mcpserver/cli/start.py +++ b/mcpserver/cli/start.py @@ -35,7 +35,7 @@ def main(args, extra, **kwargs): # Get the tool manager and register discovered tools mcp = init_mcp(cfg.exclude, cfg.include, args.mask_error_details) - get_manager(mcp, cfg) + get_manager(mcp, cfg, system_type=args.system_type) # Create ASGI app from MCP server mcp_app = mcp.http_app(path=cfg.server.path) diff --git a/mcpserver/core/hub.py b/mcpserver/core/hub.py index 4e2ab65..d393748 100644 --- a/mcpserver/core/hub.py +++ b/mcpserver/core/hub.py @@ -76,14 +76,13 @@ async def get_one(wid, info): } try: async with info["client"] as sess: - # 1. Call the tool mcp_result = await sess.call_tool("get_status", {}) - # 2. Extract the text content from the MCP wrapper + # Extract the text content from the MCP wrapper # FastMCP result.content is a list of content blocks raw_text = mcp_result.content[0].text - # 3. Parse the string back into a dictionary + # Parse the string back into a dictionary try: # Handle potential single quotes from Python's str(dict) status_data = json.loads(raw_text.replace("'", '"')) @@ -150,7 +149,7 @@ def _create_proxy(self, worker_id: str, tool: Tool): # Generate a safe function name proxy_name = f"{utils.sanitize(worker_id)}_{utils.sanitize(tool.name)}" - # FIX: Check if this tool is already registered to avoid ValueError on re-registration + # Check if this tool is already registered to avoid ValueError on re-registration if proxy_name in self._registered_proxies: print(f"šŸ›°ļø Re-discovered worker tool: [blue]{proxy_name}[/blue]") return @@ -164,8 +163,8 @@ def _create_proxy(self, worker_id: str, tool: Tool): # Create the signature string: arg_1=None, arg_2=None arg_string = ", ".join([f"{safe_name}=None" for safe_name in arg_mapping.keys()]) - # 3. Build the dynamic function - # FIX: We pass 'self' (the HubManager instance) as 'hub' to the exec scope. + # Build the dynamic function + # We pass 'self' (the HubManager instance) as 'hub' to the exec scope. # This allows the proxy function to look up the LATEST url for the worker # every time it is called, instead of using a hardcoded stale URL. exec_globals = { diff --git a/mcpserver/core/worker.py b/mcpserver/core/worker.py index 89fe0fa..ea0a3e1 100644 --- a/mcpserver/core/worker.py +++ b/mcpserver/core/worker.py @@ -42,13 +42,16 @@ def from_args(cls, mcp, args, cfg) -> Optional["WorkerManager"]: public_url = ( args.public_url or f"http://{cfg.server.host}:{cfg.server.port}{cfg.server.path}" ) + + sys_type = getattr(args, "system_type", "generic") + worker_type = sys_type.split(".")[-1] if "." in sys_type else sys_type return cls( mcp, hub_url=args.join, secret=args.join_secret, worker_id=args.register_id, public_url=public_url, - worker_type=args.worker_type, + worker_type=worker_type, labels=args.labels, ) diff --git a/mcpserver/tools/manager.py b/mcpserver/tools/manager.py index 90c82ce..fc67222 100644 --- a/mcpserver/tools/manager.py +++ b/mcpserver/tools/manager.py @@ -11,6 +11,9 @@ # These are the function types we want to discover from fastmcp.tools import Tool +import mcpserver.defaults as defaults +import mcpserver.tools.system as systems + from .base import BaseTool @@ -104,13 +107,20 @@ def discover_tools(self, root_path: str, module_path: str) -> Dict[str, Path]: discovered[tool_id] = {"path": file_path, "module": import_path, "root": root_path} return discovered - def load_tools(self, mcp, include=None, exclude=None, status_tool: str = None): + def load_tools(self, mcp, include=None, exclude=None, system_type="generic"): """ Load a set of named tools, or default to all those discovered. """ - # Start with the system status tool - status_tool = status_tool or "mcpserver.tools.system.tool" - self.tools["system"] = {"module": status_tool} + if system_type in systems.system_tools: + # Map short name to internal library path + sys_module = f"mcpserver.tools.system.{system_type}" + else: + # Assume it is a custom external module path provided by the user + # e.g., 'my_custom_package.system_logic' + sys_module = system_type + + # Seed the system tool into discovery + self.tools["system"] = {"module": sys_module} # If no tools are selected... select all tools discovered names = self.tools @@ -199,13 +209,13 @@ def get_available_prompts(self): """ prompts = set() - # 2. Load them (to execute decorators) + # Load them (to execute decorators) for tool_id, path in self.tools.items(): mod = self.load_tool_module(tool_id, path) if not mod: continue - # 3. Inspect the classes/functions in the module + # Inspect the classes/functions in the module for name, obj in inspect.getmembers(mod): # We usually look for classes inheriting from BaseTool # But we can also just scan the class attributes diff --git a/mcpserver/tools/system/__init__.py b/mcpserver/tools/system/__init__.py index e69de29..7ad30d3 100644 --- a/mcpserver/tools/system/__init__.py +++ b/mcpserver/tools/system/__init__.py @@ -0,0 +1,9 @@ +from .flux import SystemTool as FluxSystemTool +from .generic import SystemTool as GenericSystemTool +from .kubernetes import SystemTool as KubernetesSystemTool + +system_tools = { + "flux": FluxSystemTool, + "generic": GenericSystemTool, + "kubernetes": KubernetesSystemTool, +} diff --git a/mcpserver/tools/system/flux.py b/mcpserver/tools/system/flux.py new file mode 100644 index 0000000..7e562ed --- /dev/null +++ b/mcpserver/tools/system/flux.py @@ -0,0 +1,42 @@ +import time +from typing import Any, Dict + +from mcpserver.tools.base import BaseTool +from mcpserver.tools.decorator import mcp + + +class SystemTool(BaseTool): + """ + System tool specialized for Flux Framework. + """ + + def setup(self, manager=None): + self.manager = manager + + @mcp.tool(name="get_status") + def get_status(self) -> Dict[str, Any]: + """ + Get status of the flux cluster. + """ + import flux + import flux.resource + + flux_meta = {"status": "error", "message": "Flux handle failed"} + try: + h = flux.Flux() + listing = flux.resource.list.resource_list(h).get() + flux_meta = { + "status": "online", + "free_cores": listing.free.ncores, + "up_nodes": listing.up.nnodes, + } + except Exception as e: + flux_meta["error"] = str(e) + + res = {"timestamp": time.time(), "system_type": "flux", "flux": flux_meta, "tools": {}} + if self.manager: + for tid, inst in self.manager.instances.items(): + if inst == self: + continue + res["tools"][tid] = {"class": inst.__class__.__name__} + return res diff --git a/mcpserver/tools/system/generic.py b/mcpserver/tools/system/generic.py new file mode 100644 index 0000000..e6d1b49 --- /dev/null +++ b/mcpserver/tools/system/generic.py @@ -0,0 +1,55 @@ +# mcpserver/tools/system/generic.py +import platform +import socket +import time +from typing import Any, Dict + +from mcpserver.tools.base import BaseTool +from mcpserver.tools.decorator import mcp + + +class SystemTool(BaseTool): + """ + Default system tool (generic) to return status + """ + + def setup(self, manager=None): + self.manager = manager + + @mcp.tool(name="get_status") + async def get_status(self) -> Dict[str, Any]: + from mcpserver.app import mcp as mcp_instance + + # This is the concept of the server identity + # If no worker_manager exists, we are a one-off standalone server + wm = getattr(mcp_instance, "worker_manager", None) + + res = { + "id": wm.worker_id if wm else socket.gethostname(), + "type": wm.worker_type if wm else "standalone", + "labels": wm.labels if wm else {}, + "timestamp": time.time(), + "system_type": "generic", + "environment": { + "os": platform.system(), + "python": platform.python_version(), + }, + "tools": {}, + } + + # Tool manifest. Should work even for one-off servers + if self.manager: + for tool_id, inst in self.manager.instances.items(): + if inst == self: + continue + res["tools"][tool_id] = { + "class": inst.__class__.__name__, + "description": inst.__doc__.strip() if inst.__doc__ else "n/a", + } + + # 3. Fleet Check (Only if this one-off happens to be a Hub) + hm = getattr(mcp_instance, "hub_manager", None) + if hm: + res["fleet"] = await hm.fetch_all_statuses() + + return res diff --git a/mcpserver/tools/system/kubernetes.py b/mcpserver/tools/system/kubernetes.py new file mode 100644 index 0000000..772270d --- /dev/null +++ b/mcpserver/tools/system/kubernetes.py @@ -0,0 +1,116 @@ +import os +import platform +import time +from typing import Any, Dict + +from mcpserver.tools.base import BaseTool +from mcpserver.tools.decorator import mcp + + +class SystemTool(BaseTool): + """ + System tool specialized for Kubernetes environments. + Discovers cluster topology, resource pressures, and tool manifests. + """ + + def setup(self, manager=None): + self.manager = manager + self._k8s_loaded = False + try: + from kubernetes import client, config + + try: + config.load_kube_config() + except: + config.load_incluster_config() + + self.v1 = client.CoreV1Api() + self._k8s_loaded = True + except Exception as e: + self._k8s_error = str(e) + + @mcp.tool(name="get_status") + async def get_status(self) -> Dict[str, Any]: + """ + Returns Kubernetes cluster status, node telemetry, and loaded tool manifest. + """ + from mcpserver.app import mcp as mcp_instance + from mcpserver.cli.manager import ToolManager + + manager = self.manager or ToolManager.get_instance() + wm = getattr(mcp_instance, "worker_manager", None) + + # Base identity + res = { + "timestamp": time.time(), + "system_type": "kubernetes", + "id": wm.worker_id if wm else platform.node(), + "kubeconfig_path": os.environ.get("KUBECONFIG", "~/.kube/config"), + "kubernetes": self._get_kube_stats(), + "tools": {}, + } + + if manager: + # Add Discovered Classes + for tool_id, inst in manager.instances.items(): + if inst == self: + continue + res["tools"][tool_id] = { + "class": inst.__class__.__name__, + "description": inst.__doc__.strip() if inst.__doc__ else "n/a", + } + + # Add Explicitly Registered Functions (from YAML) + if hasattr(manager, "explicit_metadata"): + for name, meta in manager.explicit_metadata.items(): + res["tools"][name] = meta + + # Handle Hub/Fleet recursion + hm = getattr(mcp_instance, "hub_manager", None) + if hm: + res["fleet"] = await hm.fetch_all_statuses() + + return res + + def _get_kube_stats(self) -> Dict[str, Any]: + """ + Queries the K8s API for node and 'queue' (pod) statistics. + """ + if not self._k8s_loaded: + return { + "status": "error", + "message": getattr(self, "_k8s_error", "K8s client not initialized"), + } + + stats = { + "status": "online", + "nodes": {"total": 0, "ready": 0, "capacity": {"cpu": 0, "mem_bytes": 0}}, + "workload_summary": {"running_pods": 0, "pending_pods": 0}, + } + + try: + # Gather Node Stats + nodes = self.v1.list_node() + stats["nodes"]["total"] = len(nodes.items) + for node in nodes.items: + # Check Ready status + if any(c.type == "Ready" and c.status == "True" for c in node.status.conditions): + stats["nodes"]["ready"] += 1 + + # Aggregate Capacity + stats["nodes"]["capacity"]["cpu"] += int(node.status.capacity.get("cpu", 0)) + # Note: Mem strings like '32Gi' need parsing for real math, returning raw for now + stats["nodes"]["capacity"]["mem_raw"] = node.status.capacity.get("memory") + + # Gather Pod Stats (The "Queue") + pods = self.v1.list_pod_for_all_namespaces() + for pod in pods.items: + if pod.status.phase == "Running": + stats["workload_summary"]["running_pods"] += 1 + elif pod.status.phase == "Pending": + stats["workload_summary"]["pending_pods"] += 1 + + except Exception as e: + return {"status": "partial_error", "error": str(e)} + + return stats diff --git a/mcpserver/tools/system/tool.py b/mcpserver/tools/system/tool.py deleted file mode 100644 index 9bd1772..0000000 --- a/mcpserver/tools/system/tool.py +++ /dev/null @@ -1,47 +0,0 @@ -import os -import platform -import time -from typing import Any, Dict - -from mcpserver.tools.base import BaseTool -from mcpserver.tools.decorator import mcp - - -class SystemTool(BaseTool): - """ - Provides server metadata and a manifest of loaded tools. - """ - - def setup(self, manager=None): - # Store the manager reference provided by the standard loading strategy - self.manager = manager - - @mcp.tool(name="get_status") - def get_status(self) -> Dict[str, Any]: - """ - Returns a structured report of the server environment and loaded tools. - """ - # 1. Custom status implemented by this class - status = { - "timestamp": time.time(), - "system": { - "os": platform.system(), - "node": platform.node(), - "python": platform.python_version(), - "cwd": os.getcwd() if hasattr(os, "getcwd") else "unknown", - }, - "tools": {}, - } - - # 2. Metadata about tools (The Manifest) - # We look at the manager's active instances - if self.manager and hasattr(self.manager, "instances"): - for tool_id, inst in self.manager.instances.items(): - if inst == self: - continue - status["tools"][tool_id] = { - "class": inst.__class__.__name__, - "description": inst.__doc__.strip() if inst.__doc__ else "n/a", - } - - return status