diff --git a/Makefile b/Makefile index 6ed88793..21e0f01a 100644 --- a/Makefile +++ b/Makefile @@ -18,13 +18,10 @@ help: # Start local development with Docker Compose in foreground dev: - @if [ ! -f .env ]; then \ - echo "❌ .env file not found!"; \ - echo ""; \ - echo "Please run setup first:"; \ - echo " make setup"; \ - echo ""; \ - exit 1; \ + @if [ -f .env ]; then \ + echo "ℹ️ Loading environment overrides from .env"; \ + else \ + echo "ℹ️ No .env file detected; relying on current environment."; \ fi @echo "🚀 Starting local development mode with Docker Compose..." @echo " This will:" diff --git a/packages/gateway/src/gateway/index.ts b/packages/gateway/src/gateway/index.ts index ac67597d..2b9165ac 100644 --- a/packages/gateway/src/gateway/index.ts +++ b/packages/gateway/src/gateway/index.ts @@ -1,10 +1,11 @@ #!/usr/bin/env bun -import type { IMessageQueue } from "@peerbot/core"; +import type { IMessageQueue, WorkerTokenData } from "@peerbot/core"; import { createLogger, verifyWorkerToken } from "@peerbot/core"; import type { Request, Response } from "express"; import { WorkerConnectionManager } from "./connection-manager"; import { WorkerJobRouter } from "./job-router"; +import { McpConfigService } from "../mcp/config-service"; const logger = createLogger("worker-gateway"); @@ -17,11 +18,13 @@ export class WorkerGateway { private connectionManager: WorkerConnectionManager; private jobRouter: WorkerJobRouter; private queue: IMessageQueue; + private mcpConfigService?: McpConfigService; - constructor(queue: IMessageQueue) { + constructor(queue: IMessageQueue, mcpConfigService?: McpConfigService) { this.queue = queue; this.connectionManager = new WorkerConnectionManager(); this.jobRouter = new WorkerJobRouter(queue, this.connectionManager); + this.mcpConfigService = mcpConfigService; } /** @@ -38,6 +41,12 @@ export class WorkerGateway { this.handleWorkerResponse(req, res) ); + if (this.mcpConfigService) { + app.get("/worker/mcp/config", (req: Request, res: Response) => + this.handleMcpConfigRequest(req, res) + ); + } + logger.info("Worker gateway routes registered"); } @@ -45,26 +54,12 @@ export class WorkerGateway { * Handle SSE connection from worker */ private async handleStreamConnection(req: Request, res: Response) { - const authHeader = req.headers.authorization; - - // Verify worker token - if (!authHeader || !authHeader.startsWith("Bearer ")) { - res - .status(401) - .json({ error: "Missing or invalid authorization header" }); - return; - } - - const token = authHeader.substring(7); - const tokenData = verifyWorkerToken(token); - - if (!tokenData) { - logger.warn("Invalid token"); - res.status(401).json({ error: "Invalid token" }); + const auth = this.authenticateWorker(req, res); + if (!auth) { return; } - const { deploymentName, userId, threadId } = tokenData; + const { deploymentName, userId, threadId } = auth.tokenData; // Setup SSE res.setHeader("Content-Type", "text/event-stream"); @@ -93,25 +88,12 @@ export class WorkerGateway { * Handle HTTP response from worker */ private async handleWorkerResponse(req: Request, res: Response) { - const authHeader = req.headers.authorization; - - // Verify worker token - if (!authHeader || !authHeader.startsWith("Bearer ")) { - res - .status(401) - .json({ error: "Missing or invalid authorization header" }); - return; - } - - const token = authHeader.substring(7); - const tokenData = verifyWorkerToken(token); - - if (!tokenData) { - res.status(401).json({ error: "Invalid token" }); + const auth = this.authenticateWorker(req, res); + if (!auth) { return; } - const { deploymentName } = tokenData; + const { deploymentName } = auth.tokenData; // Update connection activity this.connectionManager.touchConnection(deploymentName); @@ -134,6 +116,68 @@ export class WorkerGateway { } } + private async handleMcpConfigRequest(req: Request, res: Response) { + if (!this.mcpConfigService) { + res.status(503).json({ error: "mcp_config_unavailable" }); + return; + } + + const auth = this.authenticateWorker(req, res); + if (!auth) { + return; + } + + try { + const baseUrl = this.getRequestBaseUrl(req); + const config = await this.mcpConfigService.getWorkerConfig({ + baseUrl, + workerToken: auth.token, + }); + res.json(config); + } catch (error) { + logger.error("Failed to generate MCP config", { error }); + res.status(500).json({ error: "mcp_config_error" }); + } + } + + private authenticateWorker( + req: Request, + res: Response + ): { tokenData: WorkerTokenData; token: string } | null { + const authHeader = req.headers.authorization; + + if (!authHeader || !authHeader.startsWith("Bearer ")) { + res + .status(401) + .json({ error: "Missing or invalid authorization header" }); + return null; + } + + const token = authHeader.substring(7); + const tokenData = verifyWorkerToken(token); + + if (!tokenData) { + logger.warn("Invalid token"); + res.status(401).json({ error: "Invalid token" }); + return null; + } + + return { tokenData, token }; + } + + private getRequestBaseUrl(req: Request): string { + const forwardedProto = req.headers["x-forwarded-proto"]; + const protocolCandidate = Array.isArray(forwardedProto) + ? forwardedProto[0] + : forwardedProto?.split(",")[0]; + const protocol = (protocolCandidate || req.protocol || "http").trim(); + const host = req.get("host"); + if (host) { + return `${protocol}://${host}`; + } + return process.env.PEERBOT_PUBLIC_GATEWAY_URL || `${protocol}://localhost:8080`; + } + /** * Get active worker connections */ diff --git a/packages/gateway/src/index.ts b/packages/gateway/src/index.ts index ded01e32..edf1a610 100644 --- a/packages/gateway/src/index.ts +++ b/packages/gateway/src/index.ts @@ -6,6 +6,9 @@ import { initSentry } from "@peerbot/core"; initSentry(); import http from "node:http"; +import { existsSync } from "node:fs"; +import path from "node:path"; +import { Command } from "commander"; import { moduleRegistry } from "@peerbot/core"; import { createLogger, type OrchestratorConfig } from "@peerbot/core"; import { LogLevel } from "@slack/bolt"; @@ -19,6 +22,7 @@ import { Orchestrator } from "./orchestration"; import type { AnthropicProxy } from "./proxy/anthropic-proxy"; import { SlackDispatcher } from "./slack"; import type { DispatcherConfig } from "./types"; +import type { McpProxy } from "./mcp/proxy"; let healthServer: http.Server | null = null; @@ -27,7 +31,8 @@ let healthServer: http.Server | null = null; */ function setupHealthEndpoints( anthropicProxy?: AnthropicProxy, - workerGateway?: WorkerGateway + workerGateway?: WorkerGateway, + mcpProxy?: McpProxy ) { if (healthServer) return; @@ -63,6 +68,11 @@ function setupHealthEndpoints( logger.info("✅ Worker gateway routes enabled at :8080/worker/*"); } + if (mcpProxy) { + mcpProxy.setupRoutes(proxyApp); + logger.info("✅ MCP proxy routes enabled at :8080/mcp/*"); + } + // Register module endpoints moduleRegistry.registerEndpoints(proxyApp); logger.info("✅ Module endpoints registered"); @@ -82,13 +92,38 @@ function setupHealthEndpoints( /** * Main entry point */ -async function main() { +type StartOptions = { + env?: string; +}; + +class MissingRequiredEnvError extends Error { + constructor(public envName: string | string[]) { + const message = Array.isArray(envName) + ? `Missing one of the required environment variables: ${envName.join(", ")}` + : `Missing required environment variable: ${envName}`; + super(message); + this.name = "MissingRequiredEnvError"; + } +} + +async function startGateway({ env }: StartOptions = {}) { try { - // Load environment variables from .env file (searches up from cwd) - // In Docker/K8s, env vars are injected by container runtime if (process.env.NODE_ENV !== "production") { - dotenvConfig(); + const envProvided = Boolean(env); + const envPath = envProvided + ? path.resolve(process.cwd(), env!) + : path.resolve(process.cwd(), ".env"); + + if (existsSync(envPath)) { + dotenvConfig({ path: envPath }); + logger.debug(`Loaded environment variables from ${envPath}`); + } else if (envProvided) { + logger.warn(`Specified env file ${envPath} was not found; continuing without it.`); + } else { + logger.debug("No .env file found; relying on process environment."); + } } + logger.info("🚀 Starting Claude Code Slack Dispatcher"); // Get bot token from environment @@ -101,9 +136,19 @@ async function main() { signingSecret: `${process.env.SLACK_SIGNING_SECRET?.substring(0, 10)}...`, }); + const connectionString = + process.env.QUEUE_URL || process.env.DATABASE_URL; + if (!connectionString) { + throw new MissingRequiredEnvError(["QUEUE_URL", "DATABASE_URL"]); + } + + if (!botToken) { + throw new MissingRequiredEnvError("SLACK_BOT_TOKEN"); + } + const config: DispatcherConfig = { slack: { - token: botToken!, + token: botToken, appToken: process.env.SLACK_APP_TOKEN, signingSecret: process.env.SLACK_SIGNING_SECRET, socketMode: process.env.SLACK_HTTP_MODE !== "true", @@ -125,7 +170,7 @@ async function main() { logLevel: (process.env.LOG_LEVEL as any) || LogLevel.INFO, // Queue configuration (required) queues: { - connectionString: process.env.QUEUE_URL || process.env.DATABASE_URL!, + connectionString, directMessage: process.env.QUEUE_DIRECT_MESSAGE || "direct_message", messageQueue: process.env.QUEUE_MESSAGE_QUEUE || "message_queue", retryLimit: parseInt(process.env.PGBOSS_RETRY_LIMIT || "3", 10), @@ -149,13 +194,6 @@ async function main() { }); // Validate required configuration - if (!config.slack.token) { - throw new Error("SLACK_BOT_TOKEN is required"); - } - if (!config.queues.connectionString) { - throw new Error("QUEUE_URL is required"); - } - // Create orchestrator configuration const orchestratorConfig: OrchestratorConfig = { queues: { @@ -218,7 +256,8 @@ async function main() { // Setup health endpoints on port 8080 setupHealthEndpoints( dispatcher.getAnthropicProxy(), - dispatcher.getWorkerGateway() + dispatcher.getWorkerGateway(), + dispatcher.getMcpProxy() ); // Setup graceful shutdown for orchestrator @@ -237,12 +276,28 @@ async function main() { logger.info("Health check:", JSON.stringify(status, null, 2)); }); } catch (error) { - logger.error("❌ Failed to start Slack Dispatcher:", error); + if (error instanceof MissingRequiredEnvError) { + logger.error(error.message); + } else { + logger.error("❌ Failed to start Slack Dispatcher:", error); + } process.exit(1); } } -// Start the application -main(); +const program = new Command(); + +program + .name("peerbot-gateway") + .description("Peerbot gateway service") + .option("--env ", "Path to environment file") + .action(async (options: StartOptions) => { + await startGateway(options); + }); + +program.parseAsync(process.argv).catch((error) => { + logger.error("❌ Failed to start Slack Dispatcher:", error); + process.exit(1); +}); export type { DispatcherConfig } from "./types"; diff --git a/packages/gateway/src/mcp/config-service.ts b/packages/gateway/src/mcp/config-service.ts new file mode 100644 index 00000000..4716f5ef --- /dev/null +++ b/packages/gateway/src/mcp/config-service.ts @@ -0,0 +1,261 @@ +import { createLogger } from "@peerbot/core"; +import { readFile, stat } from "node:fs/promises"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; +import { z } from "zod"; + +const logger = createLogger("mcp-config-service"); + +const McpServersSchema = z.object({ + mcpServers: z.record(z.any()), +}); + +type RawMcpConfig = z.infer; + +export interface HttpMcpServerConfig { + id: string; + upstreamUrl: string; + loginUrl?: string; +} + +export interface WorkerMcpConfig { + mcpServers: Record; +} + +interface LoadedConfig { + rawServers: Record; + httpServers: Map; + mtimeMs: number; +} + +type ConfigSource = + | { type: "file"; path: string } + | { type: "http"; url: URL }; + +export interface McpConfigServiceOptions { + configUrl?: string; + configPath?: string; +} + +export class McpConfigService { + private source?: ConfigSource; + private cache?: LoadedConfig; + + constructor(options: McpConfigServiceOptions = {}) { + const rawLocation = + options.configUrl || + options.configPath || + process.env.PEERBOT_MCP_SERVERS_URL || + process.env.PEERBOT_MCP_SERVERS_FILE; + + if (!rawLocation) { + return; + } + + this.source = this.resolveConfigSource(rawLocation); + } + + /** + * Return MCP config tailored for a worker request. + */ + async getWorkerConfig(options: { + baseUrl: string; + workerToken: string; + }): Promise { + const { baseUrl, workerToken } = options; + const config = await this.loadConfig(); + const workerConfig: WorkerMcpConfig = { mcpServers: {} }; + + for (const [id, serverConfig] of Object.entries(config.rawServers)) { + const cloned = cloneConfig(serverConfig); + if (config.httpServers.has(id)) { + const proxiedUrl = buildProxyUrl(baseUrl, id); + cloned.url = proxiedUrl; + cloned.headers = mergeHeaders(cloned.headers, workerToken); + } + workerConfig.mcpServers[id] = cloned; + } + + return workerConfig; + } + + /** + * Get HTTP proxy metadata for a specific MCP server. + */ + async getHttpServer(id: string): Promise { + const config = await this.loadConfig(); + return config.httpServers.get(id); + } + + private async loadConfig(): Promise { + if (!this.source) { + if (!this.cache) { + this.cache = { + rawServers: {}, + httpServers: new Map(), + mtimeMs: 0, + }; + } + return this.cache; + } + + const fallback = + this.cache ?? { + rawServers: {}, + httpServers: new Map(), + mtimeMs: 0, + }; + + try { + if (this.source.type === "file") { + const fileStat = await stat(this.source.path); + const fileContents = await readFile(this.source.path, "utf-8"); + return this.parseAndCache(fileContents, fileStat.mtimeMs); + } + + const response = await fetch(this.source.url, { cache: "no-store" }); + if (!response.ok) { + logger.error("Failed to fetch MCP config from remote URL", { + url: this.source.url.toString(), + status: response.status, + statusText: response.statusText, + }); + return fallback; + } + + const text = await response.text(); + return this.parseAndCache(text, Date.now()); + } catch (error) { + logger.error("Error loading MCP config", { + error, + source: + this.source.type === "file" + ? this.source.path + : this.source.url.toString(), + }); + return fallback; + } + } + + private resolveConfigSource(location: string): ConfigSource | undefined { + try { + const parsed = new URL(location); + if (parsed.protocol === "file:") { + return { type: "file", path: fileURLToPath(parsed) }; + } + + if (parsed.protocol === "http:" || parsed.protocol === "https:") { + return { type: "http", url: parsed }; + } + + logger.warn("Unsupported MCP config URL protocol; falling back to file", { + location, + }); + } catch (_err) { + // Not a valid URL, treat as path + return { type: "file", path: path.resolve(location) }; + } + + return { type: "file", path: path.resolve(location) }; + } + + private parseAndCache(rawContents: string, mtimeMs: number): LoadedConfig { + try { + const parsed = McpServersSchema.safeParse(JSON.parse(rawContents)); + if (!parsed.success) { + logger.error("Failed to parse MCP config", { + issues: parsed.error.issues, + }); + return ( + this.cache ?? { + rawServers: {}, + httpServers: new Map(), + mtimeMs, + } + ); + } + + const normalized = normalizeConfig(parsed.data); + this.cache = { + rawServers: normalized.rawServers, + httpServers: normalized.httpServers, + mtimeMs, + }; + return this.cache; + } catch (error) { + logger.error("Failed to parse MCP config contents", { error }); + return ( + this.cache ?? { + rawServers: {}, + httpServers: new Map(), + mtimeMs, + } + ); + } + } +} + +function normalizeConfig(config: RawMcpConfig) { + const rawServers: Record = {}; + const httpServers = new Map(); + + for (const [id, serverConfig] of Object.entries(config.mcpServers)) { + if (!serverConfig || typeof serverConfig !== "object") { + continue; + } + + const cloned = cloneConfig(serverConfig); + rawServers[id] = cloned; + + if (typeof cloned.url === "string" && isHttpUrl(cloned.url)) { + httpServers.set(id, { + id, + upstreamUrl: cloned.url, + loginUrl: + typeof cloned.loginUrl === "string" ? cloned.loginUrl : undefined, + }); + } + } + + return { rawServers, httpServers }; +} + +function cloneConfig(config: any) { + return JSON.parse(JSON.stringify(config)); +} + +function buildProxyUrl(baseUrl: string, id: string) { + const url = new URL(`/mcp/${encodeURIComponent(id)}`, ensureTrailingSlash(baseUrl)); + return url.toString(); +} + +function ensureTrailingSlash(baseUrl: string): string { + if (!baseUrl.endsWith("/")) { + return `${baseUrl}/`; + } + return baseUrl; +} + +function isHttpUrl(candidate: string): boolean { + return candidate.startsWith("http://") || candidate.startsWith("https://"); +} + +function mergeHeaders( + existingHeaders: unknown, + workerToken: string +): Record { + const normalized: Record = {}; + + if (existingHeaders && typeof existingHeaders === "object") { + for (const [key, value] of Object.entries(existingHeaders as any)) { + if (typeof value === "string") { + normalized[key] = value; + } else if (value != null) { + normalized[key] = String(value); + } + } + } + + normalized["Authorization"] = `Bearer ${workerToken}`; + return normalized; +} diff --git a/packages/gateway/src/mcp/credential-store.ts b/packages/gateway/src/mcp/credential-store.ts new file mode 100644 index 00000000..7198c77e --- /dev/null +++ b/packages/gateway/src/mcp/credential-store.ts @@ -0,0 +1,63 @@ +import { RedisClient, type IMessageQueue, type IRedisClient } from "@peerbot/core"; +import { createLogger } from "@peerbot/core"; + +const logger = createLogger("mcp-credentials"); + +export interface McpCredentialRecord { + accessToken: string; + tokenType?: string; + expiresAt?: number; + refreshToken?: string; + metadata?: Record; +} + +export class McpCredentialStore { + private redis: IRedisClient; + private static KEY_PREFIX = "mcp:credential"; + + constructor(queue: IMessageQueue) { + this.redis = new RedisClient(queue.getRedisClient()); + } + + async get(userId: string, mcpId: string): Promise { + const key = this.buildKey(userId, mcpId); + try { + const value = await this.redis.get(key); + if (!value) { + return null; + } + return JSON.parse(value) as McpCredentialRecord; + } catch (error) { + logger.error("Failed to fetch MCP credentials", { error, key }); + return null; + } + } + + async set( + userId: string, + mcpId: string, + record: McpCredentialRecord, + ttlSeconds?: number + ): Promise { + const key = this.buildKey(userId, mcpId); + try { + await this.redis.set(key, JSON.stringify(record), ttlSeconds); + } catch (error) { + logger.error("Failed to store MCP credentials", { error, key }); + throw error; + } + } + + async delete(userId: string, mcpId: string): Promise { + const key = this.buildKey(userId, mcpId); + try { + await this.redis.del(key); + } catch (error) { + logger.error("Failed to delete MCP credentials", { error, key }); + } + } + + private buildKey(userId: string, mcpId: string): string { + return `${McpCredentialStore.KEY_PREFIX}:${userId}:${mcpId}`; + } +} diff --git a/packages/gateway/src/mcp/proxy.ts b/packages/gateway/src/mcp/proxy.ts new file mode 100644 index 00000000..42b0dd38 --- /dev/null +++ b/packages/gateway/src/mcp/proxy.ts @@ -0,0 +1,223 @@ +import { createLogger, verifyWorkerToken } from "@peerbot/core"; +import type { Request, Response } from "express"; +import { McpCredentialStore } from "./credential-store"; +import { McpConfigService } from "./config-service"; + +const logger = createLogger("mcp-proxy"); + +export class McpProxy { + constructor( + private readonly configService: McpConfigService, + private readonly credentialStore: McpCredentialStore + ) {} + + setupRoutes(app: any) { + app.all("/mcp/:mcpId", (req: Request, res: Response) => + this.handleProxyRequest(req, res) + ); + app.all("/mcp/:mcpId/*", (req: Request, res: Response) => + this.handleProxyRequest(req, res) + ); + } + + private async handleProxyRequest(req: Request, res: Response) { + const { mcpId } = req.params; + const sessionToken = this.extractSessionToken(req); + + if (!sessionToken) { + res.status(401).json({ error: "missing_token" }); + return; + } + + const tokenData = verifyWorkerToken(sessionToken); + if (!tokenData) { + res.status(401).json({ error: "invalid_token" }); + return; + } + + const httpServer = await this.configService.getHttpServer(mcpId); + if (!httpServer) { + res.status(404).json({ error: "unknown_mcp" }); + return; + } + + const credentials = await this.credentialStore.get( + tokenData.userId, + mcpId + ); + + if (!credentials || !credentials.accessToken) { + logger.info("MCP credentials missing", { + userId: tokenData.userId, + mcpId, + }); + res.status(401).json({ + error: "not_authenticated", + loginUrl: httpServer.loginUrl, + }); + return; + } + + if (credentials.expiresAt && credentials.expiresAt <= Date.now()) { + logger.warn("MCP credentials expired", { + userId: tokenData.userId, + mcpId, + }); + res.status(401).json({ + error: "token_expired", + loginUrl: httpServer.loginUrl, + }); + return; + } + + try { + await this.forwardRequest(req, res, httpServer.upstreamUrl, credentials); + } catch (error) { + logger.error("Failed to proxy MCP request", { error, mcpId }); + res.status(502).json({ error: "proxy_failure" }); + } + } + + private extractSessionToken(req: Request): string | null { + const authHeader = req.headers.authorization; + if (authHeader && authHeader.startsWith("Bearer ")) { + return authHeader.substring(7); + } + + const tokenFromQuery = req.query.workerToken; + if (typeof tokenFromQuery === "string") { + return tokenFromQuery; + } + + if (Array.isArray(tokenFromQuery)) { + return tokenFromQuery[0]; + } + + return null; + } + + private async forwardRequest( + req: Request, + res: Response, + upstreamBaseUrl: string, + credentials: { accessToken: string; tokenType?: string } + ): Promise { + const upstreamUrl = this.buildUpstreamUrl(req, upstreamBaseUrl); + const headers = this.buildUpstreamHeaders(req, credentials); + const body = this.getRequestBody(req); + + const response = await fetch(upstreamUrl, { + method: req.method, + headers, + body, + redirect: "manual", + }); + + res.status(response.status); + response.headers.forEach((value, key) => { + if (key.toLowerCase() === "content-length") { + return; + } + res.setHeader(key, value); + }); + + if (!response.body) { + res.end(); + return; + } + + const reader = response.body.getReader(); + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + if (value) { + res.write(typeof value === "string" ? value : Buffer.from(value)); + } + } + + res.end(); + } + + private buildUpstreamUrl(req: Request, upstreamBaseUrl: string): string { + const baseUrl = new URL(upstreamBaseUrl); + const remainder = (req.params as any)[0] ? `/${(req.params as any)[0]}` : ""; + baseUrl.pathname = joinPaths(baseUrl.pathname, remainder); + + const searchParams = new URLSearchParams(); + for (const [key, value] of Object.entries(req.query)) { + if (key === "workerToken") { + continue; + } + if (Array.isArray(value)) { + for (const entry of value) { + if (entry !== undefined) { + searchParams.append(key, String(entry)); + } + } + } else if (value !== undefined) { + searchParams.append(key, String(value)); + } + } + + baseUrl.search = searchParams.toString(); + return baseUrl.toString(); + } + + private buildUpstreamHeaders( + req: Request, + credentials: { accessToken: string; tokenType?: string } + ): Record { + const headers: Record = {}; + + for (const [key, value] of Object.entries(req.headers)) { + const lowerKey = key.toLowerCase(); + if (lowerKey === "host" || lowerKey === "content-length") { + continue; + } + if (Array.isArray(value)) { + headers[key] = value.join(","); + } else if (typeof value === "string") { + headers[key] = value; + } + } + + headers["authorization"] = `${ + credentials.tokenType || "Bearer" + } ${credentials.accessToken}`; + return headers; + } + + private getRequestBody(req: Request): BodyInit | undefined { + if (req.method === "GET" || req.method === "HEAD") { + return undefined; + } + + if (Buffer.isBuffer(req.body)) { + return req.body; + } + + if (typeof req.body === "string") { + return req.body; + } + + if (req.body && typeof req.body === "object") { + return JSON.stringify(req.body); + } + + return undefined; + } +} + +function joinPaths(basePath: string, suffix: string): string { + const trimmedBase = basePath.endsWith("/") + ? basePath.slice(0, -1) + : basePath; + if (!suffix) { + return trimmedBase || "/"; + } + const extra = suffix.startsWith("/") ? suffix : `/${suffix}`; + return `${trimmedBase}${extra}`; +} diff --git a/packages/gateway/src/slack/index.ts b/packages/gateway/src/slack/index.ts index 39096f87..9a583fab 100644 --- a/packages/gateway/src/slack/index.ts +++ b/packages/gateway/src/slack/index.ts @@ -8,6 +8,9 @@ import { QueueProducer } from "../session/queue-producer"; import { ThreadResponseConsumer } from "../session/thread-processor"; import type { DispatcherConfig } from "../types"; import { SlackEventHandlers } from "./event-router"; +import { McpConfigService } from "../mcp/config-service"; +import { McpCredentialStore } from "../mcp/credential-store"; +import { McpProxy } from "../mcp/proxy"; export class SlackDispatcher { private app: App; @@ -15,6 +18,7 @@ export class SlackDispatcher { private threadResponseConsumer?: ThreadResponseConsumer; private anthropicProxy?: AnthropicProxy; private workerGateway?: WorkerGateway; + private mcpProxy?: McpProxy; private config: DispatcherConfig; private queue?: ReturnType; @@ -112,10 +116,18 @@ export class SlackDispatcher { logger.info("✅ Anthropic proxy initialized"); // Initialize Worker Gateway for SSE/HTTP worker communication - this.queue = createMessageQueue(this.config.queues.connectionString); - await this.queue.start(); - this.workerGateway = new WorkerGateway(this.queue); + const queue = createMessageQueue(this.config.queues.connectionString); + await queue.start(); + const mcpConfigService = new McpConfigService({ + configUrl: + process.env.PEERBOT_MCP_SERVERS_URL || + process.env.PEERBOT_MCP_SERVERS_FILE, + }); + const mcpCredentialStore = new McpCredentialStore(queue); + this.workerGateway = new WorkerGateway(queue, mcpConfigService); + this.mcpProxy = new McpProxy(mcpConfigService, mcpCredentialStore); logger.info("✅ Worker gateway initialized"); + logger.info("✅ MCP proxy initialized"); // Discover and register available modules await moduleRegistry.registerAvailableModules(); @@ -365,6 +377,10 @@ export class SlackDispatcher { return this.workerGateway; } + getMcpProxy() { + return this.mcpProxy; + } + /** * Initialize bot info and event handlers * CRITICAL: This must be called BEFORE starting the app to ensure diff --git a/packages/worker/src/claude/session-executor.ts b/packages/worker/src/claude/session-executor.ts index ba88c848..67a0aa26 100644 --- a/packages/worker/src/claude/session-executor.ts +++ b/packages/worker/src/claude/session-executor.ts @@ -112,7 +112,7 @@ async function prepareRunConfig( // Generate MCP config dynamically let mcpConfigPath: string | undefined; - const mcpConfig = getMCPConfigForClaude(); + const mcpConfig = await getMCPConfigForClaude(); if (mcpConfig) { const tempDir = process.env.RUNNER_TEMP || "/tmp"; mcpConfigPath = join(tempDir, `mcp-config-${Date.now()}.json`); diff --git a/packages/worker/src/utils/mcp-config.ts b/packages/worker/src/utils/mcp-config.ts index 0685f305..cfcb50db 100644 --- a/packages/worker/src/utils/mcp-config.ts +++ b/packages/worker/src/utils/mcp-config.ts @@ -1,34 +1,84 @@ +import { createLogger } from "@peerbot/core"; import { getProcessManagerInstance } from "../integrations/process-manager"; -/** - * Generate MCP configuration dynamically based on running MCP servers - */ -export function generateMCPConfig(): string { - const mcpServers: Record = {}; +const logger = createLogger("worker-mcp-config"); - // Add process manager MCP if running - const processManager = getProcessManagerInstance(); - if (processManager) { - mcpServers["process-manager"] = { - type: "sse", - url: `http://localhost:${processManager.port}/sse`, - description: - "Process management MCP server for background tasks with cloudflared tunnel support (HTTP streaming)", - }; +interface GatewayMcpConfigResponse { + mcpServers?: Record; +} + +export async function getMCPConfigForClaude(): Promise { + const gatewayConfig = await fetchGatewayMcpConfig(); + const mergedServers: Record = { + ...(gatewayConfig?.mcpServers ?? {}), + }; + + const processManagerEntry = buildProcessManagerServer(); + if (processManagerEntry && !mergedServers["process-manager"]) { + mergedServers["process-manager"] = processManagerEntry; + } + + if (Object.keys(mergedServers).length === 0) { + return undefined; + } + + return JSON.stringify({ mcpServers: mergedServers }, null, 2); +} + +async function fetchGatewayMcpConfig(): Promise { + const dispatcherUrl = process.env.DISPATCHER_URL; + const workerToken = process.env.WORKER_TOKEN; + + if (!dispatcherUrl || !workerToken) { + logger.warn("Missing dispatcher URL or worker token for MCP config fetch"); + return null; } - return JSON.stringify({ mcpServers }, null, 2); + try { + const url = new URL("/worker/mcp/config", ensureBaseUrl(dispatcherUrl)); + const response = await fetch(url, { + headers: { + Authorization: `Bearer ${workerToken}`, + }, + }); + + if (!response.ok) { + logger.warn("Gateway returned non-success status for MCP config", { + status: response.status, + }); + return null; + } + + const data = (await response.json()) as GatewayMcpConfigResponse; + if (!data || typeof data !== "object") { + logger.warn("Gateway MCP config response malformed"); + return null; + } + + return data; + } catch (error) { + logger.error("Failed to fetch MCP config from gateway", { error }); + return null; + } } -/** - * Get MCP config path for Claude CLI - * Returns the config as a JSON string to be written to a temp file - */ -export function getMCPConfigForClaude(): string | undefined { +function buildProcessManagerServer(): Record | null { const processManager = getProcessManagerInstance(); if (!processManager) { - return undefined; + return null; } - return generateMCPConfig(); + return { + type: "sse", + url: `http://localhost:${processManager.port}/sse`, + description: + "Process management MCP server for background tasks with cloudflared tunnel support (HTTP streaming)", + }; +} + +function ensureBaseUrl(base: string): string { + if (!base.startsWith("http")) { + return `http://${base.replace(/^\/+/, "")}`; + } + return base; }