From feb34f3841bf785c621c052cd3a46d9d19230e04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 13 Oct 2025 14:48:32 +0200 Subject: [PATCH 1/2] chore: make gateway start command validate envs --- Makefile | 11 +- docs/mcp_integration_plan.md | 92 ++++++ packages/gateway/src/gateway/index.ts | 114 +++++--- packages/gateway/src/index.ts | 91 ++++-- packages/gateway/src/mcp/config-service.ts | 261 ++++++++++++++++++ packages/gateway/src/mcp/credential-store.ts | 63 +++++ packages/gateway/src/mcp/proxy.ts | 223 +++++++++++++++ packages/gateway/src/slack/index.ts | 18 +- .../worker/src/claude/session-executor.ts | 2 +- packages/worker/src/utils/mcp-config.ts | 94 +++++-- 10 files changed, 885 insertions(+), 84 deletions(-) create mode 100644 docs/mcp_integration_plan.md create mode 100644 packages/gateway/src/mcp/config-service.ts create mode 100644 packages/gateway/src/mcp/credential-store.ts create mode 100644 packages/gateway/src/mcp/proxy.ts diff --git a/Makefile b/Makefile index 6ed88793..21e0f01a 100644 --- a/Makefile +++ b/Makefile @@ -18,13 +18,10 @@ help: # Start local development with Docker Compose in foreground dev: - @if [ ! -f .env ]; then \ - echo "❌ .env file not found!"; \ - echo ""; \ - echo "Please run setup first:"; \ - echo " make setup"; \ - echo ""; \ - exit 1; \ + @if [ -f .env ]; then \ + echo "ℹ️ Loading environment overrides from .env"; \ + else \ + echo "ℹ️ No .env file detected; relying on current environment."; \ fi @echo "🚀 Starting local development mode with Docker Compose..." @echo " This will:" diff --git a/docs/mcp_integration_plan.md b/docs/mcp_integration_plan.md new file mode 100644 index 00000000..a2280ca0 --- /dev/null +++ b/docs/mcp_integration_plan.md @@ -0,0 +1,92 @@ +# MCP Integration Plan + +## Goals +- Load MCP server definitions from environment variables compatible with Claude-style JSON manifests. +- Expose MCP availability in Slack Home tab with authentication status and actions. +- Support OAuth-backed and stdin-secret-backed MCPs with secure credential storage in Redis secret store. +- Ensure worker processes automatically receive authorized credentials and reuse them without prompting users repeatedly. +- Provide foundations that can support multiple CLI adapters (Claude Code today, Codex CLI later) without UI changes. + +## Configuration Loading +1. **Environment Variable Schema** + - Define `PEERBOT_MCP_SERVERS_FILE` pointing to a local JSON file (Claude-compatible schema) that lives alongside other deployment configs. + - Loader reads and parses the file during orchestrator bootstrap, normalizing entries into internal schema: `id`, `displayName`, `adapter`, `manifestPath` or `endpoint`, and `auth` metadata (`type`, `oauthConfigId`, `stdinSecretName`, scopes, audience, tokenTransport, etc.). + - Perform structural validation (Zod schema) and emit metrics for missing/invalid fields. When validation fails, return empty list but continue serving Slack so the Home tab can communicate the misconfiguration. + - Keep the file path in config so we can watch for changes (fs watcher or inotify when running on bare metal; in containers trigger reload hook via SIGHUP or configmap reload sidecar). On change, reload registry and bump version counter. +2. **Shared Registry** + - Implement `packages/shared/src/mcp/registry.ts` exporting a singleton `McpRegistry` that loads the env var once, indexes by `id`, and exposes read-only getters. + - Provide adapter hints so the worker orchestrator knows which CLI adapter to instantiate per MCP. + - Include version/hash of manifest and the source file `mtime` so workers can detect reconfiguration. Publish internal event (Redis pub/sub or message bus) when registry changes so long-lived workers can refresh without restart. + +## Authentication & Credential Storage +1. **Secret Provider Abstraction** + - Introduce `McpCredentialStore` interface (in shared package) with methods: `getUserCredentials(userId, mcpId)`, `setUserCredentials(userId, mcpId, secretPayload, metadata)`, `deleteUserCredentials(...)`, `listAuthorizedMcps(userId)`. + - Default implementation uses Redis secret store with namespacing: keys like `mcp:cred:${teamId}:${userId}:${mcpId}` storing encrypted JSON (AES-GCM) via existing KMS wrapper. + - Store metadata: issuedAt, expiresAt, refreshToken, scopes, tokenType, providerId, adapter expectations (e.g., env var name), and monotonically increasing `credentialVersion`. +2. **OAuth Flow** + - Slack Home tab presents "Connect" / "Login" buttons for each MCP needing auth. + - Clicking launches Slack modal -> app backend generates OAuth state token (team, user, mcpId, redirect) stored in Redis with TTL 10 minutes. + - Upon OAuth callback, exchange code using provider config (client id/secret stored in backend secrets). Save tokens in credential store and mark `needsReauth=false`. Persist gateway-generated `sessionNonce` that workers can present when requesting fresh tokens (prevents replay). + - Support refresh tokens: background job refreshes tokens proactively using `expiresAt` minus buffer; on failure mark `needsReauth` and notify user via Slack DM. When refresh succeeds, increment `credentialVersion` and publish `mcp-token-updated` event with `(teamId, userId, mcpId, version)` payload. +3. **Stdin Secret Collection** + - For `auth.type=stdin`, present Slack modal with secure input (single-use). Encrypt and store in credential store. Mark `rotatable=false` unless UI collects new value. + - When user updates secret, version increments to force worker re-login. + +## Worker Injection & Lifecycle +1. **Connection Request Flow** + - When worker bootstraps for a conversation, orchestrator queries `McpRegistry` for default MCPs and `McpCredentialStore` for user tokens. If token is absent, mark MCP as `pendingAuth` but still pass metadata so worker can prompt gracefully. + - Worker spawn context receives sanitized `MCP_CONTEXT` object: list of MCPs with resolved manifest paths, credential metadata (id, version, expiresAt, transport), and a signed one-time `gatewaySessionToken` allowing the worker to request the latest secret directly from the gateway on demand. + - Credentials are never written to disk permanently: mount tmpfs directory or use in-memory env injection. The worker calls gateway to exchange `gatewaySessionToken` for the actual OAuth access token immediately before establishing the CLI session, minimizing exposure window. + - Workers load credentials into CLI adapter before establishing connection: + - For OAuth tokens retrieved via gateway: set env vars expected by CLI or write to ephemeral config file consumed by CLI, ensuring file is deleted after CLI acknowledges login. + - For stdin secrets: worker requests the secret blob from gateway, streams to CLI stdin once, then wipes memory buffers. Cache success marker in worker memory to avoid repeated prompts. +2. **Credential Reuse** + - Worker maintains in-memory map `authorizedMcps` keyed by `mcpId` storing last `credentialVersion` seen. Before each request and on heartbeat timer, it compares against version fetched from gateway via lightweight `HEAD /mcp-token` call. When version increments (user re-logged or refresh happened), worker re-fetches token and reinitializes CLI session as needed. + - Subscribe workers to Redis pub/sub channel `mcp:token-updated` scoped by user/team. Upon message, worker fetches fresh token from gateway immediately; this covers mid-conversation logins or manual revocations without polling delay. + - If worker crashes or is re-created, orchestrator rehydrates credentials at start, enabling single-login experience across sessions. +3. **Token Refresh Handling** + - Worker receives `expiresAt` and proactively calls gateway when the buffer window (e.g., 5 minutes) is reached; gateway refreshes using stored refresh token. Gateway returns both new token and updated `expiresAt`/`version`. + - In case of refresh failure mid-session, worker sends Slack notification instructing user to re-login and marks local state `needsReauth` to avoid retry storms. Slack Home tab re-renders via event. + +4. **Gateway Token Broker & MCP Proxy (Optional)** + - **Gateway Broker**: Implement `/internal/mcp/token` endpoint (mutual TLS or signed session token required) that reads encrypted credentials from Redis, refreshes when expired, and responds with short-lived token payload plus `version`. + - **Reverse Proxy Mode**: For MCPs that support HTTP(S), gateway can expose `/mcp/proxy/:mcpId/*` which injects current OAuth token into outbound requests, shielding workers from token handling entirely. CLI adapter then points to proxy URL, and gateway attaches credentials per request. + - **Proxy Advantages**: centralizes token refresh, simplifies worker logic, supports real-time token changes, and allows revocation without restarting workers. Downside: gateway becomes critical path; need rate limiting, connection pooling, and SSE streaming support (ensure proxy handles `EventSource` with auth header injection). + - **Recommendation**: Start with broker mode (workers still make direct MCP connections but fetch tokens via gateway). Introduce full proxy for MCPs requiring complex auth or when CLI lacks token injection hooks. Design interfaces so adapters can toggle between `direct` and `proxy` transport. + +## Slack Home Tab Implementation +1. **Data Model** + - Extend existing user preferences data to include `authorizedMcpIds`, `needsReauth`, `lastSeen`. Keep statuses in Redis or Postgres depending on existing store. + - Provide backend API `GET /slack/home/:userId` that fetches registry entries, joins with credential status, and generates Block Kit view. +2. **UI Blocks** + - For each MCP: show title, description, status badge (Connected, Needs Login, Expired, Offline), and buttons `Connect`/`Re-auth`/`Disconnect` as appropriate. + - Include "Add Custom MCP" if we later allow user-defined endpoints (future). + - Display last login timestamp and adapter type for transparency. +3. **Event Handling** + - On button press, handle Slack interactivity: open OAuth modal or secret input modal. + - After successful auth, re-render home tab using Slack `views.publish`. + - If worker reports failure (e.g., invalid token), send ephemeral message and set status to `Needs Login` before re-rendering. + +## Edge Cases & Mitigations +- **Missing or malformed env var**: fail-safe to empty registry; log error and show Home tab message "No MCPs configured". +- **Multiple teams/users**: namespace credentials by Slack team + user. Ensure cross-team isolation even if running multi-tenant. +- **Token expiry during conversation**: worker requests refresh; if refresh fails, gracefully degrade (stop streaming, prompt re-login) without crashing CLI. +- **Mid-conversation first-time login**: user authenticates after worker already running; gateway publishes token-update event, worker fetches new token, replays login handshake, and resumes conversation without requiring restart. +- **Redis unavailability**: operations fail fast; Home tab shows error message; workers avoid starting sensitive sessions without credentials. Consider circuit breaker to prevent repeated login prompts. +- **Concurrent logins**: enforce single-flight by locking on `mcpId:userId` key when performing OAuth exchange or secret update. +- **Revocation & Logout**: Provide "Disconnect" button that deletes credentials and resets worker state. Worker should drop sessions gracefully and remove cached tokens. +- **Adapter variance**: For MCPs requiring Codex CLI later, ensure adapter interface handles CLI-specific login commands while still using shared credential injection contract. +- **SSE/Stream restarts**: When SSE disconnects due to auth failure, propagate to orchestrator, mark credentials invalid, and notify user. +- **Manifest updates**: If env manifest changes (detected via hash difference), flush relevant worker caches and require re-auth if scopes differ. +- **User deactivation**: On Slack user removal, run cleanup job to delete stored credentials for compliance. +- **Auditability**: Log credential lifecycle events (create/update/delete) with masked tokens for security review. + +## Implementation Steps +1. Build `McpRegistry` and load env var at orchestrator startup; add unit tests using sample JSON. +2. Implement Redis-backed `McpCredentialStore` with encryption utilities from shared package. +3. Extend Slack backend to surface registry + auth status in Home tab; create handlers for OAuth/secret flows. +4. Update worker bootstrap to receive `MCP_CONTEXT` payload with registry metadata plus signed `gatewaySessionToken`; teach orchestrator to mint session tokens per worker launch. +5. Modify Claude adapter to request runtime credentials from gateway broker/proxy and consume them; add placeholders for future Codex adapter to use identical hook. +6. Add monitoring hooks (metrics + logs) for login successes, failures, token refresh counts. +7. Write integration tests simulating login, worker restart, token refresh, and SSE auth failure recovery. + diff --git a/packages/gateway/src/gateway/index.ts b/packages/gateway/src/gateway/index.ts index ac67597d..2b9165ac 100644 --- a/packages/gateway/src/gateway/index.ts +++ b/packages/gateway/src/gateway/index.ts @@ -1,10 +1,11 @@ #!/usr/bin/env bun -import type { IMessageQueue } from "@peerbot/core"; +import type { IMessageQueue, WorkerTokenData } from "@peerbot/core"; import { createLogger, verifyWorkerToken } from "@peerbot/core"; import type { Request, Response } from "express"; import { WorkerConnectionManager } from "./connection-manager"; import { WorkerJobRouter } from "./job-router"; +import { McpConfigService } from "../mcp/config-service"; const logger = createLogger("worker-gateway"); @@ -17,11 +18,13 @@ export class WorkerGateway { private connectionManager: WorkerConnectionManager; private jobRouter: WorkerJobRouter; private queue: IMessageQueue; + private mcpConfigService?: McpConfigService; - constructor(queue: IMessageQueue) { + constructor(queue: IMessageQueue, mcpConfigService?: McpConfigService) { this.queue = queue; this.connectionManager = new WorkerConnectionManager(); this.jobRouter = new WorkerJobRouter(queue, this.connectionManager); + this.mcpConfigService = mcpConfigService; } /** @@ -38,6 +41,12 @@ export class WorkerGateway { this.handleWorkerResponse(req, res) ); + if (this.mcpConfigService) { + app.get("/worker/mcp/config", (req: Request, res: Response) => + this.handleMcpConfigRequest(req, res) + ); + } + logger.info("Worker gateway routes registered"); } @@ -45,26 +54,12 @@ export class WorkerGateway { * Handle SSE connection from worker */ private async handleStreamConnection(req: Request, res: Response) { - const authHeader = req.headers.authorization; - - // Verify worker token - if (!authHeader || !authHeader.startsWith("Bearer ")) { - res - .status(401) - .json({ error: "Missing or invalid authorization header" }); - return; - } - - const token = authHeader.substring(7); - const tokenData = verifyWorkerToken(token); - - if (!tokenData) { - logger.warn("Invalid token"); - res.status(401).json({ error: "Invalid token" }); + const auth = this.authenticateWorker(req, res); + if (!auth) { return; } - const { deploymentName, userId, threadId } = tokenData; + const { deploymentName, userId, threadId } = auth.tokenData; // Setup SSE res.setHeader("Content-Type", "text/event-stream"); @@ -93,25 +88,12 @@ export class WorkerGateway { * Handle HTTP response from worker */ private async handleWorkerResponse(req: Request, res: Response) { - const authHeader = req.headers.authorization; - - // Verify worker token - if (!authHeader || !authHeader.startsWith("Bearer ")) { - res - .status(401) - .json({ error: "Missing or invalid authorization header" }); - return; - } - - const token = authHeader.substring(7); - const tokenData = verifyWorkerToken(token); - - if (!tokenData) { - res.status(401).json({ error: "Invalid token" }); + const auth = this.authenticateWorker(req, res); + if (!auth) { return; } - const { deploymentName } = tokenData; + const { deploymentName } = auth.tokenData; // Update connection activity this.connectionManager.touchConnection(deploymentName); @@ -134,6 +116,68 @@ export class WorkerGateway { } } + private async handleMcpConfigRequest(req: Request, res: Response) { + if (!this.mcpConfigService) { + res.status(503).json({ error: "mcp_config_unavailable" }); + return; + } + + const auth = this.authenticateWorker(req, res); + if (!auth) { + return; + } + + try { + const baseUrl = this.getRequestBaseUrl(req); + const config = await this.mcpConfigService.getWorkerConfig({ + baseUrl, + workerToken: auth.token, + }); + res.json(config); + } catch (error) { + logger.error("Failed to generate MCP config", { error }); + res.status(500).json({ error: "mcp_config_error" }); + } + } + + private authenticateWorker( + req: Request, + res: Response + ): { tokenData: WorkerTokenData; token: string } | null { + const authHeader = req.headers.authorization; + + if (!authHeader || !authHeader.startsWith("Bearer ")) { + res + .status(401) + .json({ error: "Missing or invalid authorization header" }); + return null; + } + + const token = authHeader.substring(7); + const tokenData = verifyWorkerToken(token); + + if (!tokenData) { + logger.warn("Invalid token"); + res.status(401).json({ error: "Invalid token" }); + return null; + } + + return { tokenData, token }; + } + + private getRequestBaseUrl(req: Request): string { + const forwardedProto = req.headers["x-forwarded-proto"]; + const protocolCandidate = Array.isArray(forwardedProto) + ? forwardedProto[0] + : forwardedProto?.split(",")[0]; + const protocol = (protocolCandidate || req.protocol || "http").trim(); + const host = req.get("host"); + if (host) { + return `${protocol}://${host}`; + } + return process.env.PEERBOT_PUBLIC_GATEWAY_URL || `${protocol}://localhost:8080`; + } + /** * Get active worker connections */ diff --git a/packages/gateway/src/index.ts b/packages/gateway/src/index.ts index e7c2a8ae..5bb191b9 100644 --- a/packages/gateway/src/index.ts +++ b/packages/gateway/src/index.ts @@ -6,6 +6,9 @@ import { initSentry } from "@peerbot/core"; initSentry(); import http from "node:http"; +import { existsSync } from "node:fs"; +import path from "node:path"; +import { Command } from "commander"; import { moduleRegistry } from "@peerbot/core"; import { createLogger, type OrchestratorConfig } from "@peerbot/core"; import { LogLevel } from "@slack/bolt"; @@ -19,6 +22,7 @@ import { Orchestrator } from "./orchestration"; import type { AnthropicProxy } from "./proxy/anthropic-proxy"; import { SlackDispatcher } from "./slack"; import type { DispatcherConfig } from "./types"; +import type { McpProxy } from "./mcp/proxy"; let healthServer: http.Server | null = null; @@ -27,7 +31,8 @@ let healthServer: http.Server | null = null; */ function setupHealthEndpoints( anthropicProxy?: AnthropicProxy, - workerGateway?: WorkerGateway + workerGateway?: WorkerGateway, + mcpProxy?: McpProxy ) { if (healthServer) return; @@ -63,6 +68,11 @@ function setupHealthEndpoints( logger.info("✅ Worker gateway routes enabled at :8080/worker/*"); } + if (mcpProxy) { + mcpProxy.setupRoutes(proxyApp); + logger.info("✅ MCP proxy routes enabled at :8080/mcp/*"); + } + // Register module endpoints moduleRegistry.registerEndpoints(proxyApp); logger.info("✅ Module endpoints registered"); @@ -82,13 +92,38 @@ function setupHealthEndpoints( /** * Main entry point */ -async function main() { +type StartOptions = { + env?: string; +}; + +class MissingRequiredEnvError extends Error { + constructor(public envName: string | string[]) { + const message = Array.isArray(envName) + ? `Missing one of the required environment variables: ${envName.join(", ")}` + : `Missing required environment variable: ${envName}`; + super(message); + this.name = "MissingRequiredEnvError"; + } +} + +async function startGateway({ env }: StartOptions = {}) { try { - // Load environment variables from .env file (searches up from cwd) - // In Docker/K8s, env vars are injected by container runtime if (process.env.NODE_ENV !== "production") { - dotenvConfig(); + const envProvided = Boolean(env); + const envPath = envProvided + ? path.resolve(process.cwd(), env!) + : path.resolve(process.cwd(), ".env"); + + if (existsSync(envPath)) { + dotenvConfig({ path: envPath }); + logger.debug(`Loaded environment variables from ${envPath}`); + } else if (envProvided) { + logger.warn(`Specified env file ${envPath} was not found; continuing without it.`); + } else { + logger.debug("No .env file found; relying on process environment."); + } } + logger.info("🚀 Starting Claude Code Slack Dispatcher"); // Get bot token from environment @@ -101,9 +136,19 @@ async function main() { signingSecret: `${process.env.SLACK_SIGNING_SECRET?.substring(0, 10)}...`, }); + const connectionString = + process.env.QUEUE_URL || process.env.DATABASE_URL; + if (!connectionString) { + throw new MissingRequiredEnvError(["QUEUE_URL", "DATABASE_URL"]); + } + + if (!botToken) { + throw new MissingRequiredEnvError("SLACK_BOT_TOKEN"); + } + const config: DispatcherConfig = { slack: { - token: botToken!, + token: botToken, appToken: process.env.SLACK_APP_TOKEN, signingSecret: process.env.SLACK_SIGNING_SECRET, socketMode: process.env.SLACK_HTTP_MODE !== "true", @@ -125,7 +170,7 @@ async function main() { logLevel: (process.env.LOG_LEVEL as any) || LogLevel.INFO, // Queue configuration (required) queues: { - connectionString: process.env.QUEUE_URL || process.env.DATABASE_URL!, + connectionString, directMessage: process.env.QUEUE_DIRECT_MESSAGE || "direct_message", messageQueue: process.env.QUEUE_MESSAGE_QUEUE || "message_queue", retryLimit: parseInt(process.env.PGBOSS_RETRY_LIMIT || "3", 10), @@ -149,13 +194,6 @@ async function main() { }); // Validate required configuration - if (!config.slack.token) { - throw new Error("SLACK_BOT_TOKEN is required"); - } - if (!config.queues.connectionString) { - throw new Error("QUEUE_URL is required"); - } - // Create orchestrator configuration const orchestratorConfig: OrchestratorConfig = { queues: { @@ -215,7 +253,8 @@ async function main() { // Setup health endpoints on port 8080 setupHealthEndpoints( dispatcher.getAnthropicProxy(), - dispatcher.getWorkerGateway() + dispatcher.getWorkerGateway(), + dispatcher.getMcpProxy() ); // Setup graceful shutdown for orchestrator @@ -234,12 +273,28 @@ async function main() { logger.info("Health check:", JSON.stringify(status, null, 2)); }); } catch (error) { - logger.error("❌ Failed to start Slack Dispatcher:", error); + if (error instanceof MissingRequiredEnvError) { + logger.error(error.message); + } else { + logger.error("❌ Failed to start Slack Dispatcher:", error); + } process.exit(1); } } -// Start the application -main(); +const program = new Command(); + +program + .name("peerbot-gateway") + .description("Peerbot gateway service") + .option("--env ", "Path to environment file") + .action(async (options: StartOptions) => { + await startGateway(options); + }); + +program.parseAsync(process.argv).catch((error) => { + logger.error("❌ Failed to start Slack Dispatcher:", error); + process.exit(1); +}); export type { DispatcherConfig } from "./types"; diff --git a/packages/gateway/src/mcp/config-service.ts b/packages/gateway/src/mcp/config-service.ts new file mode 100644 index 00000000..4716f5ef --- /dev/null +++ b/packages/gateway/src/mcp/config-service.ts @@ -0,0 +1,261 @@ +import { createLogger } from "@peerbot/core"; +import { readFile, stat } from "node:fs/promises"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; +import { z } from "zod"; + +const logger = createLogger("mcp-config-service"); + +const McpServersSchema = z.object({ + mcpServers: z.record(z.any()), +}); + +type RawMcpConfig = z.infer; + +export interface HttpMcpServerConfig { + id: string; + upstreamUrl: string; + loginUrl?: string; +} + +export interface WorkerMcpConfig { + mcpServers: Record; +} + +interface LoadedConfig { + rawServers: Record; + httpServers: Map; + mtimeMs: number; +} + +type ConfigSource = + | { type: "file"; path: string } + | { type: "http"; url: URL }; + +export interface McpConfigServiceOptions { + configUrl?: string; + configPath?: string; +} + +export class McpConfigService { + private source?: ConfigSource; + private cache?: LoadedConfig; + + constructor(options: McpConfigServiceOptions = {}) { + const rawLocation = + options.configUrl || + options.configPath || + process.env.PEERBOT_MCP_SERVERS_URL || + process.env.PEERBOT_MCP_SERVERS_FILE; + + if (!rawLocation) { + return; + } + + this.source = this.resolveConfigSource(rawLocation); + } + + /** + * Return MCP config tailored for a worker request. + */ + async getWorkerConfig(options: { + baseUrl: string; + workerToken: string; + }): Promise { + const { baseUrl, workerToken } = options; + const config = await this.loadConfig(); + const workerConfig: WorkerMcpConfig = { mcpServers: {} }; + + for (const [id, serverConfig] of Object.entries(config.rawServers)) { + const cloned = cloneConfig(serverConfig); + if (config.httpServers.has(id)) { + const proxiedUrl = buildProxyUrl(baseUrl, id); + cloned.url = proxiedUrl; + cloned.headers = mergeHeaders(cloned.headers, workerToken); + } + workerConfig.mcpServers[id] = cloned; + } + + return workerConfig; + } + + /** + * Get HTTP proxy metadata for a specific MCP server. + */ + async getHttpServer(id: string): Promise { + const config = await this.loadConfig(); + return config.httpServers.get(id); + } + + private async loadConfig(): Promise { + if (!this.source) { + if (!this.cache) { + this.cache = { + rawServers: {}, + httpServers: new Map(), + mtimeMs: 0, + }; + } + return this.cache; + } + + const fallback = + this.cache ?? { + rawServers: {}, + httpServers: new Map(), + mtimeMs: 0, + }; + + try { + if (this.source.type === "file") { + const fileStat = await stat(this.source.path); + const fileContents = await readFile(this.source.path, "utf-8"); + return this.parseAndCache(fileContents, fileStat.mtimeMs); + } + + const response = await fetch(this.source.url, { cache: "no-store" }); + if (!response.ok) { + logger.error("Failed to fetch MCP config from remote URL", { + url: this.source.url.toString(), + status: response.status, + statusText: response.statusText, + }); + return fallback; + } + + const text = await response.text(); + return this.parseAndCache(text, Date.now()); + } catch (error) { + logger.error("Error loading MCP config", { + error, + source: + this.source.type === "file" + ? this.source.path + : this.source.url.toString(), + }); + return fallback; + } + } + + private resolveConfigSource(location: string): ConfigSource | undefined { + try { + const parsed = new URL(location); + if (parsed.protocol === "file:") { + return { type: "file", path: fileURLToPath(parsed) }; + } + + if (parsed.protocol === "http:" || parsed.protocol === "https:") { + return { type: "http", url: parsed }; + } + + logger.warn("Unsupported MCP config URL protocol; falling back to file", { + location, + }); + } catch (_err) { + // Not a valid URL, treat as path + return { type: "file", path: path.resolve(location) }; + } + + return { type: "file", path: path.resolve(location) }; + } + + private parseAndCache(rawContents: string, mtimeMs: number): LoadedConfig { + try { + const parsed = McpServersSchema.safeParse(JSON.parse(rawContents)); + if (!parsed.success) { + logger.error("Failed to parse MCP config", { + issues: parsed.error.issues, + }); + return ( + this.cache ?? { + rawServers: {}, + httpServers: new Map(), + mtimeMs, + } + ); + } + + const normalized = normalizeConfig(parsed.data); + this.cache = { + rawServers: normalized.rawServers, + httpServers: normalized.httpServers, + mtimeMs, + }; + return this.cache; + } catch (error) { + logger.error("Failed to parse MCP config contents", { error }); + return ( + this.cache ?? { + rawServers: {}, + httpServers: new Map(), + mtimeMs, + } + ); + } + } +} + +function normalizeConfig(config: RawMcpConfig) { + const rawServers: Record = {}; + const httpServers = new Map(); + + for (const [id, serverConfig] of Object.entries(config.mcpServers)) { + if (!serverConfig || typeof serverConfig !== "object") { + continue; + } + + const cloned = cloneConfig(serverConfig); + rawServers[id] = cloned; + + if (typeof cloned.url === "string" && isHttpUrl(cloned.url)) { + httpServers.set(id, { + id, + upstreamUrl: cloned.url, + loginUrl: + typeof cloned.loginUrl === "string" ? cloned.loginUrl : undefined, + }); + } + } + + return { rawServers, httpServers }; +} + +function cloneConfig(config: any) { + return JSON.parse(JSON.stringify(config)); +} + +function buildProxyUrl(baseUrl: string, id: string) { + const url = new URL(`/mcp/${encodeURIComponent(id)}`, ensureTrailingSlash(baseUrl)); + return url.toString(); +} + +function ensureTrailingSlash(baseUrl: string): string { + if (!baseUrl.endsWith("/")) { + return `${baseUrl}/`; + } + return baseUrl; +} + +function isHttpUrl(candidate: string): boolean { + return candidate.startsWith("http://") || candidate.startsWith("https://"); +} + +function mergeHeaders( + existingHeaders: unknown, + workerToken: string +): Record { + const normalized: Record = {}; + + if (existingHeaders && typeof existingHeaders === "object") { + for (const [key, value] of Object.entries(existingHeaders as any)) { + if (typeof value === "string") { + normalized[key] = value; + } else if (value != null) { + normalized[key] = String(value); + } + } + } + + normalized["Authorization"] = `Bearer ${workerToken}`; + return normalized; +} diff --git a/packages/gateway/src/mcp/credential-store.ts b/packages/gateway/src/mcp/credential-store.ts new file mode 100644 index 00000000..7198c77e --- /dev/null +++ b/packages/gateway/src/mcp/credential-store.ts @@ -0,0 +1,63 @@ +import { RedisClient, type IMessageQueue, type IRedisClient } from "@peerbot/core"; +import { createLogger } from "@peerbot/core"; + +const logger = createLogger("mcp-credentials"); + +export interface McpCredentialRecord { + accessToken: string; + tokenType?: string; + expiresAt?: number; + refreshToken?: string; + metadata?: Record; +} + +export class McpCredentialStore { + private redis: IRedisClient; + private static KEY_PREFIX = "mcp:credential"; + + constructor(queue: IMessageQueue) { + this.redis = new RedisClient(queue.getRedisClient()); + } + + async get(userId: string, mcpId: string): Promise { + const key = this.buildKey(userId, mcpId); + try { + const value = await this.redis.get(key); + if (!value) { + return null; + } + return JSON.parse(value) as McpCredentialRecord; + } catch (error) { + logger.error("Failed to fetch MCP credentials", { error, key }); + return null; + } + } + + async set( + userId: string, + mcpId: string, + record: McpCredentialRecord, + ttlSeconds?: number + ): Promise { + const key = this.buildKey(userId, mcpId); + try { + await this.redis.set(key, JSON.stringify(record), ttlSeconds); + } catch (error) { + logger.error("Failed to store MCP credentials", { error, key }); + throw error; + } + } + + async delete(userId: string, mcpId: string): Promise { + const key = this.buildKey(userId, mcpId); + try { + await this.redis.del(key); + } catch (error) { + logger.error("Failed to delete MCP credentials", { error, key }); + } + } + + private buildKey(userId: string, mcpId: string): string { + return `${McpCredentialStore.KEY_PREFIX}:${userId}:${mcpId}`; + } +} diff --git a/packages/gateway/src/mcp/proxy.ts b/packages/gateway/src/mcp/proxy.ts new file mode 100644 index 00000000..42b0dd38 --- /dev/null +++ b/packages/gateway/src/mcp/proxy.ts @@ -0,0 +1,223 @@ +import { createLogger, verifyWorkerToken } from "@peerbot/core"; +import type { Request, Response } from "express"; +import { McpCredentialStore } from "./credential-store"; +import { McpConfigService } from "./config-service"; + +const logger = createLogger("mcp-proxy"); + +export class McpProxy { + constructor( + private readonly configService: McpConfigService, + private readonly credentialStore: McpCredentialStore + ) {} + + setupRoutes(app: any) { + app.all("/mcp/:mcpId", (req: Request, res: Response) => + this.handleProxyRequest(req, res) + ); + app.all("/mcp/:mcpId/*", (req: Request, res: Response) => + this.handleProxyRequest(req, res) + ); + } + + private async handleProxyRequest(req: Request, res: Response) { + const { mcpId } = req.params; + const sessionToken = this.extractSessionToken(req); + + if (!sessionToken) { + res.status(401).json({ error: "missing_token" }); + return; + } + + const tokenData = verifyWorkerToken(sessionToken); + if (!tokenData) { + res.status(401).json({ error: "invalid_token" }); + return; + } + + const httpServer = await this.configService.getHttpServer(mcpId); + if (!httpServer) { + res.status(404).json({ error: "unknown_mcp" }); + return; + } + + const credentials = await this.credentialStore.get( + tokenData.userId, + mcpId + ); + + if (!credentials || !credentials.accessToken) { + logger.info("MCP credentials missing", { + userId: tokenData.userId, + mcpId, + }); + res.status(401).json({ + error: "not_authenticated", + loginUrl: httpServer.loginUrl, + }); + return; + } + + if (credentials.expiresAt && credentials.expiresAt <= Date.now()) { + logger.warn("MCP credentials expired", { + userId: tokenData.userId, + mcpId, + }); + res.status(401).json({ + error: "token_expired", + loginUrl: httpServer.loginUrl, + }); + return; + } + + try { + await this.forwardRequest(req, res, httpServer.upstreamUrl, credentials); + } catch (error) { + logger.error("Failed to proxy MCP request", { error, mcpId }); + res.status(502).json({ error: "proxy_failure" }); + } + } + + private extractSessionToken(req: Request): string | null { + const authHeader = req.headers.authorization; + if (authHeader && authHeader.startsWith("Bearer ")) { + return authHeader.substring(7); + } + + const tokenFromQuery = req.query.workerToken; + if (typeof tokenFromQuery === "string") { + return tokenFromQuery; + } + + if (Array.isArray(tokenFromQuery)) { + return tokenFromQuery[0]; + } + + return null; + } + + private async forwardRequest( + req: Request, + res: Response, + upstreamBaseUrl: string, + credentials: { accessToken: string; tokenType?: string } + ): Promise { + const upstreamUrl = this.buildUpstreamUrl(req, upstreamBaseUrl); + const headers = this.buildUpstreamHeaders(req, credentials); + const body = this.getRequestBody(req); + + const response = await fetch(upstreamUrl, { + method: req.method, + headers, + body, + redirect: "manual", + }); + + res.status(response.status); + response.headers.forEach((value, key) => { + if (key.toLowerCase() === "content-length") { + return; + } + res.setHeader(key, value); + }); + + if (!response.body) { + res.end(); + return; + } + + const reader = response.body.getReader(); + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + if (value) { + res.write(typeof value === "string" ? value : Buffer.from(value)); + } + } + + res.end(); + } + + private buildUpstreamUrl(req: Request, upstreamBaseUrl: string): string { + const baseUrl = new URL(upstreamBaseUrl); + const remainder = (req.params as any)[0] ? `/${(req.params as any)[0]}` : ""; + baseUrl.pathname = joinPaths(baseUrl.pathname, remainder); + + const searchParams = new URLSearchParams(); + for (const [key, value] of Object.entries(req.query)) { + if (key === "workerToken") { + continue; + } + if (Array.isArray(value)) { + for (const entry of value) { + if (entry !== undefined) { + searchParams.append(key, String(entry)); + } + } + } else if (value !== undefined) { + searchParams.append(key, String(value)); + } + } + + baseUrl.search = searchParams.toString(); + return baseUrl.toString(); + } + + private buildUpstreamHeaders( + req: Request, + credentials: { accessToken: string; tokenType?: string } + ): Record { + const headers: Record = {}; + + for (const [key, value] of Object.entries(req.headers)) { + const lowerKey = key.toLowerCase(); + if (lowerKey === "host" || lowerKey === "content-length") { + continue; + } + if (Array.isArray(value)) { + headers[key] = value.join(","); + } else if (typeof value === "string") { + headers[key] = value; + } + } + + headers["authorization"] = `${ + credentials.tokenType || "Bearer" + } ${credentials.accessToken}`; + return headers; + } + + private getRequestBody(req: Request): BodyInit | undefined { + if (req.method === "GET" || req.method === "HEAD") { + return undefined; + } + + if (Buffer.isBuffer(req.body)) { + return req.body; + } + + if (typeof req.body === "string") { + return req.body; + } + + if (req.body && typeof req.body === "object") { + return JSON.stringify(req.body); + } + + return undefined; + } +} + +function joinPaths(basePath: string, suffix: string): string { + const trimmedBase = basePath.endsWith("/") + ? basePath.slice(0, -1) + : basePath; + if (!suffix) { + return trimmedBase || "/"; + } + const extra = suffix.startsWith("/") ? suffix : `/${suffix}`; + return `${trimmedBase}${extra}`; +} diff --git a/packages/gateway/src/slack/index.ts b/packages/gateway/src/slack/index.ts index 9d1560a4..c8af9437 100644 --- a/packages/gateway/src/slack/index.ts +++ b/packages/gateway/src/slack/index.ts @@ -8,6 +8,9 @@ import { QueueProducer } from "../session/queue-producer"; import { ThreadResponseConsumer } from "../session/thread-processor"; import type { DispatcherConfig } from "../types"; import { SlackEventHandlers } from "./event-router"; +import { McpConfigService } from "../mcp/config-service"; +import { McpCredentialStore } from "../mcp/credential-store"; +import { McpProxy } from "../mcp/proxy"; export class SlackDispatcher { private app: App; @@ -15,6 +18,7 @@ export class SlackDispatcher { private threadResponseConsumer?: ThreadResponseConsumer; private anthropicProxy?: AnthropicProxy; private workerGateway?: WorkerGateway; + private mcpProxy?: McpProxy; private config: DispatcherConfig; constructor(config: DispatcherConfig) { @@ -113,8 +117,16 @@ export class SlackDispatcher { // Initialize Worker Gateway for SSE/HTTP worker communication const queue = createMessageQueue(this.config.queues.connectionString); await queue.start(); - this.workerGateway = new WorkerGateway(queue); + const mcpConfigService = new McpConfigService({ + configUrl: + process.env.PEERBOT_MCP_SERVERS_URL || + process.env.PEERBOT_MCP_SERVERS_FILE, + }); + const mcpCredentialStore = new McpCredentialStore(queue); + this.workerGateway = new WorkerGateway(queue, mcpConfigService); + this.mcpProxy = new McpProxy(mcpConfigService, mcpCredentialStore); logger.info("✅ Worker gateway initialized"); + logger.info("✅ MCP proxy initialized"); // Discover and register available modules await moduleRegistry.registerAvailableModules(); @@ -341,6 +353,10 @@ export class SlackDispatcher { return this.workerGateway; } + getMcpProxy() { + return this.mcpProxy; + } + /** * Initialize bot info and event handlers * CRITICAL: This must be called BEFORE starting the app to ensure diff --git a/packages/worker/src/claude/session-executor.ts b/packages/worker/src/claude/session-executor.ts index ba88c848..67a0aa26 100644 --- a/packages/worker/src/claude/session-executor.ts +++ b/packages/worker/src/claude/session-executor.ts @@ -112,7 +112,7 @@ async function prepareRunConfig( // Generate MCP config dynamically let mcpConfigPath: string | undefined; - const mcpConfig = getMCPConfigForClaude(); + const mcpConfig = await getMCPConfigForClaude(); if (mcpConfig) { const tempDir = process.env.RUNNER_TEMP || "/tmp"; mcpConfigPath = join(tempDir, `mcp-config-${Date.now()}.json`); diff --git a/packages/worker/src/utils/mcp-config.ts b/packages/worker/src/utils/mcp-config.ts index 0685f305..cfcb50db 100644 --- a/packages/worker/src/utils/mcp-config.ts +++ b/packages/worker/src/utils/mcp-config.ts @@ -1,34 +1,84 @@ +import { createLogger } from "@peerbot/core"; import { getProcessManagerInstance } from "../integrations/process-manager"; -/** - * Generate MCP configuration dynamically based on running MCP servers - */ -export function generateMCPConfig(): string { - const mcpServers: Record = {}; +const logger = createLogger("worker-mcp-config"); - // Add process manager MCP if running - const processManager = getProcessManagerInstance(); - if (processManager) { - mcpServers["process-manager"] = { - type: "sse", - url: `http://localhost:${processManager.port}/sse`, - description: - "Process management MCP server for background tasks with cloudflared tunnel support (HTTP streaming)", - }; +interface GatewayMcpConfigResponse { + mcpServers?: Record; +} + +export async function getMCPConfigForClaude(): Promise { + const gatewayConfig = await fetchGatewayMcpConfig(); + const mergedServers: Record = { + ...(gatewayConfig?.mcpServers ?? {}), + }; + + const processManagerEntry = buildProcessManagerServer(); + if (processManagerEntry && !mergedServers["process-manager"]) { + mergedServers["process-manager"] = processManagerEntry; + } + + if (Object.keys(mergedServers).length === 0) { + return undefined; + } + + return JSON.stringify({ mcpServers: mergedServers }, null, 2); +} + +async function fetchGatewayMcpConfig(): Promise { + const dispatcherUrl = process.env.DISPATCHER_URL; + const workerToken = process.env.WORKER_TOKEN; + + if (!dispatcherUrl || !workerToken) { + logger.warn("Missing dispatcher URL or worker token for MCP config fetch"); + return null; } - return JSON.stringify({ mcpServers }, null, 2); + try { + const url = new URL("/worker/mcp/config", ensureBaseUrl(dispatcherUrl)); + const response = await fetch(url, { + headers: { + Authorization: `Bearer ${workerToken}`, + }, + }); + + if (!response.ok) { + logger.warn("Gateway returned non-success status for MCP config", { + status: response.status, + }); + return null; + } + + const data = (await response.json()) as GatewayMcpConfigResponse; + if (!data || typeof data !== "object") { + logger.warn("Gateway MCP config response malformed"); + return null; + } + + return data; + } catch (error) { + logger.error("Failed to fetch MCP config from gateway", { error }); + return null; + } } -/** - * Get MCP config path for Claude CLI - * Returns the config as a JSON string to be written to a temp file - */ -export function getMCPConfigForClaude(): string | undefined { +function buildProcessManagerServer(): Record | null { const processManager = getProcessManagerInstance(); if (!processManager) { - return undefined; + return null; } - return generateMCPConfig(); + return { + type: "sse", + url: `http://localhost:${processManager.port}/sse`, + description: + "Process management MCP server for background tasks with cloudflared tunnel support (HTTP streaming)", + }; +} + +function ensureBaseUrl(base: string): string { + if (!base.startsWith("http")) { + return `http://${base.replace(/^\/+/, "")}`; + } + return base; } From c00b01919a36af3cf207247cee21ae9bbfe70cbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 13 Oct 2025 13:58:30 +0100 Subject: [PATCH 2/2] Delete docs/mcp_integration_plan.md --- docs/mcp_integration_plan.md | 92 ------------------------------------ 1 file changed, 92 deletions(-) delete mode 100644 docs/mcp_integration_plan.md diff --git a/docs/mcp_integration_plan.md b/docs/mcp_integration_plan.md deleted file mode 100644 index a2280ca0..00000000 --- a/docs/mcp_integration_plan.md +++ /dev/null @@ -1,92 +0,0 @@ -# MCP Integration Plan - -## Goals -- Load MCP server definitions from environment variables compatible with Claude-style JSON manifests. -- Expose MCP availability in Slack Home tab with authentication status and actions. -- Support OAuth-backed and stdin-secret-backed MCPs with secure credential storage in Redis secret store. -- Ensure worker processes automatically receive authorized credentials and reuse them without prompting users repeatedly. -- Provide foundations that can support multiple CLI adapters (Claude Code today, Codex CLI later) without UI changes. - -## Configuration Loading -1. **Environment Variable Schema** - - Define `PEERBOT_MCP_SERVERS_FILE` pointing to a local JSON file (Claude-compatible schema) that lives alongside other deployment configs. - - Loader reads and parses the file during orchestrator bootstrap, normalizing entries into internal schema: `id`, `displayName`, `adapter`, `manifestPath` or `endpoint`, and `auth` metadata (`type`, `oauthConfigId`, `stdinSecretName`, scopes, audience, tokenTransport, etc.). - - Perform structural validation (Zod schema) and emit metrics for missing/invalid fields. When validation fails, return empty list but continue serving Slack so the Home tab can communicate the misconfiguration. - - Keep the file path in config so we can watch for changes (fs watcher or inotify when running on bare metal; in containers trigger reload hook via SIGHUP or configmap reload sidecar). On change, reload registry and bump version counter. -2. **Shared Registry** - - Implement `packages/shared/src/mcp/registry.ts` exporting a singleton `McpRegistry` that loads the env var once, indexes by `id`, and exposes read-only getters. - - Provide adapter hints so the worker orchestrator knows which CLI adapter to instantiate per MCP. - - Include version/hash of manifest and the source file `mtime` so workers can detect reconfiguration. Publish internal event (Redis pub/sub or message bus) when registry changes so long-lived workers can refresh without restart. - -## Authentication & Credential Storage -1. **Secret Provider Abstraction** - - Introduce `McpCredentialStore` interface (in shared package) with methods: `getUserCredentials(userId, mcpId)`, `setUserCredentials(userId, mcpId, secretPayload, metadata)`, `deleteUserCredentials(...)`, `listAuthorizedMcps(userId)`. - - Default implementation uses Redis secret store with namespacing: keys like `mcp:cred:${teamId}:${userId}:${mcpId}` storing encrypted JSON (AES-GCM) via existing KMS wrapper. - - Store metadata: issuedAt, expiresAt, refreshToken, scopes, tokenType, providerId, adapter expectations (e.g., env var name), and monotonically increasing `credentialVersion`. -2. **OAuth Flow** - - Slack Home tab presents "Connect" / "Login" buttons for each MCP needing auth. - - Clicking launches Slack modal -> app backend generates OAuth state token (team, user, mcpId, redirect) stored in Redis with TTL 10 minutes. - - Upon OAuth callback, exchange code using provider config (client id/secret stored in backend secrets). Save tokens in credential store and mark `needsReauth=false`. Persist gateway-generated `sessionNonce` that workers can present when requesting fresh tokens (prevents replay). - - Support refresh tokens: background job refreshes tokens proactively using `expiresAt` minus buffer; on failure mark `needsReauth` and notify user via Slack DM. When refresh succeeds, increment `credentialVersion` and publish `mcp-token-updated` event with `(teamId, userId, mcpId, version)` payload. -3. **Stdin Secret Collection** - - For `auth.type=stdin`, present Slack modal with secure input (single-use). Encrypt and store in credential store. Mark `rotatable=false` unless UI collects new value. - - When user updates secret, version increments to force worker re-login. - -## Worker Injection & Lifecycle -1. **Connection Request Flow** - - When worker bootstraps for a conversation, orchestrator queries `McpRegistry` for default MCPs and `McpCredentialStore` for user tokens. If token is absent, mark MCP as `pendingAuth` but still pass metadata so worker can prompt gracefully. - - Worker spawn context receives sanitized `MCP_CONTEXT` object: list of MCPs with resolved manifest paths, credential metadata (id, version, expiresAt, transport), and a signed one-time `gatewaySessionToken` allowing the worker to request the latest secret directly from the gateway on demand. - - Credentials are never written to disk permanently: mount tmpfs directory or use in-memory env injection. The worker calls gateway to exchange `gatewaySessionToken` for the actual OAuth access token immediately before establishing the CLI session, minimizing exposure window. - - Workers load credentials into CLI adapter before establishing connection: - - For OAuth tokens retrieved via gateway: set env vars expected by CLI or write to ephemeral config file consumed by CLI, ensuring file is deleted after CLI acknowledges login. - - For stdin secrets: worker requests the secret blob from gateway, streams to CLI stdin once, then wipes memory buffers. Cache success marker in worker memory to avoid repeated prompts. -2. **Credential Reuse** - - Worker maintains in-memory map `authorizedMcps` keyed by `mcpId` storing last `credentialVersion` seen. Before each request and on heartbeat timer, it compares against version fetched from gateway via lightweight `HEAD /mcp-token` call. When version increments (user re-logged or refresh happened), worker re-fetches token and reinitializes CLI session as needed. - - Subscribe workers to Redis pub/sub channel `mcp:token-updated` scoped by user/team. Upon message, worker fetches fresh token from gateway immediately; this covers mid-conversation logins or manual revocations without polling delay. - - If worker crashes or is re-created, orchestrator rehydrates credentials at start, enabling single-login experience across sessions. -3. **Token Refresh Handling** - - Worker receives `expiresAt` and proactively calls gateway when the buffer window (e.g., 5 minutes) is reached; gateway refreshes using stored refresh token. Gateway returns both new token and updated `expiresAt`/`version`. - - In case of refresh failure mid-session, worker sends Slack notification instructing user to re-login and marks local state `needsReauth` to avoid retry storms. Slack Home tab re-renders via event. - -4. **Gateway Token Broker & MCP Proxy (Optional)** - - **Gateway Broker**: Implement `/internal/mcp/token` endpoint (mutual TLS or signed session token required) that reads encrypted credentials from Redis, refreshes when expired, and responds with short-lived token payload plus `version`. - - **Reverse Proxy Mode**: For MCPs that support HTTP(S), gateway can expose `/mcp/proxy/:mcpId/*` which injects current OAuth token into outbound requests, shielding workers from token handling entirely. CLI adapter then points to proxy URL, and gateway attaches credentials per request. - - **Proxy Advantages**: centralizes token refresh, simplifies worker logic, supports real-time token changes, and allows revocation without restarting workers. Downside: gateway becomes critical path; need rate limiting, connection pooling, and SSE streaming support (ensure proxy handles `EventSource` with auth header injection). - - **Recommendation**: Start with broker mode (workers still make direct MCP connections but fetch tokens via gateway). Introduce full proxy for MCPs requiring complex auth or when CLI lacks token injection hooks. Design interfaces so adapters can toggle between `direct` and `proxy` transport. - -## Slack Home Tab Implementation -1. **Data Model** - - Extend existing user preferences data to include `authorizedMcpIds`, `needsReauth`, `lastSeen`. Keep statuses in Redis or Postgres depending on existing store. - - Provide backend API `GET /slack/home/:userId` that fetches registry entries, joins with credential status, and generates Block Kit view. -2. **UI Blocks** - - For each MCP: show title, description, status badge (Connected, Needs Login, Expired, Offline), and buttons `Connect`/`Re-auth`/`Disconnect` as appropriate. - - Include "Add Custom MCP" if we later allow user-defined endpoints (future). - - Display last login timestamp and adapter type for transparency. -3. **Event Handling** - - On button press, handle Slack interactivity: open OAuth modal or secret input modal. - - After successful auth, re-render home tab using Slack `views.publish`. - - If worker reports failure (e.g., invalid token), send ephemeral message and set status to `Needs Login` before re-rendering. - -## Edge Cases & Mitigations -- **Missing or malformed env var**: fail-safe to empty registry; log error and show Home tab message "No MCPs configured". -- **Multiple teams/users**: namespace credentials by Slack team + user. Ensure cross-team isolation even if running multi-tenant. -- **Token expiry during conversation**: worker requests refresh; if refresh fails, gracefully degrade (stop streaming, prompt re-login) without crashing CLI. -- **Mid-conversation first-time login**: user authenticates after worker already running; gateway publishes token-update event, worker fetches new token, replays login handshake, and resumes conversation without requiring restart. -- **Redis unavailability**: operations fail fast; Home tab shows error message; workers avoid starting sensitive sessions without credentials. Consider circuit breaker to prevent repeated login prompts. -- **Concurrent logins**: enforce single-flight by locking on `mcpId:userId` key when performing OAuth exchange or secret update. -- **Revocation & Logout**: Provide "Disconnect" button that deletes credentials and resets worker state. Worker should drop sessions gracefully and remove cached tokens. -- **Adapter variance**: For MCPs requiring Codex CLI later, ensure adapter interface handles CLI-specific login commands while still using shared credential injection contract. -- **SSE/Stream restarts**: When SSE disconnects due to auth failure, propagate to orchestrator, mark credentials invalid, and notify user. -- **Manifest updates**: If env manifest changes (detected via hash difference), flush relevant worker caches and require re-auth if scopes differ. -- **User deactivation**: On Slack user removal, run cleanup job to delete stored credentials for compliance. -- **Auditability**: Log credential lifecycle events (create/update/delete) with masked tokens for security review. - -## Implementation Steps -1. Build `McpRegistry` and load env var at orchestrator startup; add unit tests using sample JSON. -2. Implement Redis-backed `McpCredentialStore` with encryption utilities from shared package. -3. Extend Slack backend to surface registry + auth status in Home tab; create handlers for OAuth/secret flows. -4. Update worker bootstrap to receive `MCP_CONTEXT` payload with registry metadata plus signed `gatewaySessionToken`; teach orchestrator to mint session tokens per worker launch. -5. Modify Claude adapter to request runtime credentials from gateway broker/proxy and consume them; add placeholders for future Codex adapter to use identical hook. -6. Add monitoring hooks (metrics + logs) for login successes, failures, token refresh counts. -7. Write integration tests simulating login, worker restart, token refresh, and SSE auth failure recovery. -