From b28caf59275d34ddc331776742544aa9ce427197 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:22:07 -0300 Subject: [PATCH 01/19] fix(monitoring): add statement_timeout and sequential widget queries to prevent crash Monitoring JSONB queries on text columns can run for 170s+, exhausting PG connections and crashing the process. Two mitigations: 1. Add statement_timeout (30s) to the PG pool to kill runaway queries 2. Change dashboard widget queries from Promise.all to sequential execution to avoid N concurrent expensive queries overwhelming the connection pool Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/database/index.ts | 3 +++ apps/mesh/src/tools/monitoring-dashboard/query.ts | 13 ++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/apps/mesh/src/database/index.ts b/apps/mesh/src/database/index.ts index 05cbaaf42b..d47b2afba3 100644 --- a/apps/mesh/src/database/index.ts +++ b/apps/mesh/src/database/index.ts @@ -125,6 +125,9 @@ const defaultPoolOptions = { connectionTimeoutMillis: 30000, // Allow the process to exit even with idle connections allowExitOnIdle: true, + // Prevent runaway queries from exhausting PG connections and crashing the process. + // Monitoring JSONB queries on text columns can take 170s+ without this guard. + statement_timeout: 30000, // 30s max per query }; function createPostgresDatabase(config: DatabaseConfig): PostgresDatabase { const pool = new Pool({ diff --git a/apps/mesh/src/tools/monitoring-dashboard/query.ts b/apps/mesh/src/tools/monitoring-dashboard/query.ts index 3aade7a5f8..9a4a7d8411 100644 --- a/apps/mesh/src/tools/monitoring-dashboard/query.ts +++ b/apps/mesh/src/tools/monitoring-dashboard/query.ts @@ -90,9 +90,11 @@ export const MONITORING_DASHBOARD_QUERY = defineTool({ ? new Date(input.timeRange.endDate) : now; - // Execute each widget's aggregation - const results: WidgetQueryResult[] = await Promise.all( - dashboard.widgets.map(async (widget: DashboardWidget) => { + // Execute widgets sequentially to avoid N concurrent expensive JSONB queries + // that can exhaust PG connections and crash the process + const results: WidgetQueryResult[] = []; + for (const widget of dashboard.widgets as DashboardWidget[]) { + const widgetResult = await (async () => { // Merge dashboard-level filters with widget-level filters // Dashboard-level propertyFilters are exact-match only; runtime ones // support all operators (eq, contains, exists, in). @@ -155,8 +157,9 @@ export const MONITORING_DASHBOARD_QUERY = defineTool({ timeseries: undefined, }; } - }), - ); + })(); + results.push(widgetResult); + } return { dashboardId: input.dashboardId, From 0cc51c4c0b3c91e535e59323b194beabeece6458 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:22:26 -0300 Subject: [PATCH 02/19] fix(api): dispose per-request client pool to prevent connection leak The middleware creates a MeshContext with a fresh clientPool per request but never calls its Symbol.asyncDispose after the request completes. Every request using getOrCreateClient() leaks MCP client connections, eventually exhausting file descriptors and memory. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/app.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 4a2affe8fd..cbeabf21ca 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -629,7 +629,14 @@ export async function createApp(options: CreateAppOptions = {}) { const meshCtx = await ContextFactory.create(c.req.raw, { timings }); c.set("meshContext", meshCtx); - return next(); + try { + return await next(); + } finally { + // Dispose the per-request client pool to close MCP client connections. + // Without this, every request that uses getOrCreateClient() leaks + // connections, eventually exhausting file descriptors and memory. + await meshCtx.getOrCreateClient[Symbol.asyncDispose](); + } }); // Get all management tools (for OAuth consent UI) From e83f911811c440125e0393166c16fca62e4c5a18 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:23:07 -0300 Subject: [PATCH 03/19] fix(proxy): close server/transport on error paths to prevent resource leaks All proxy route patterns follow server.connect(transport) then transport.handleRequest() without cleanup. If handleRequest throws, the server and transport leak. Wrapping in try/finally with server.close() ensures cleanup on all code paths. Applied to: proxy, virtual-mcp, self, and dev-assets-mcp routes. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/routes/dev-assets-mcp.ts | 6 +++++- apps/mesh/src/api/routes/proxy.ts | 8 ++++++-- apps/mesh/src/api/routes/self.ts | 6 +++++- apps/mesh/src/api/routes/virtual-mcp.ts | 6 +++++- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/apps/mesh/src/api/routes/dev-assets-mcp.ts b/apps/mesh/src/api/routes/dev-assets-mcp.ts index b584b2102f..edd700e821 100644 --- a/apps/mesh/src/api/routes/dev-assets-mcp.ts +++ b/apps/mesh/src/api/routes/dev-assets-mcp.ts @@ -536,7 +536,11 @@ export async function handleDevAssetsMcpRequest( req.headers.get("Accept")?.includes("application/json") ?? false, }); await server.connect(transport); - return transport.handleRequest(req); + try { + return await transport.handleRequest(req); + } finally { + await server.close(); + } } /** diff --git a/apps/mesh/src/api/routes/proxy.ts b/apps/mesh/src/api/routes/proxy.ts index 497c45fc83..0044134539 100644 --- a/apps/mesh/src/api/routes/proxy.ts +++ b/apps/mesh/src/api/routes/proxy.ts @@ -275,8 +275,12 @@ app.all("/:connectionId", async (c) => { // Connect server to transport await server.connect(transport); - // Handle request and cleanup - return await transport.handleRequest(c.req.raw); + // Handle request with cleanup — ensures server/transport are released on error + try { + return await transport.handleRequest(c.req.raw); + } finally { + await server.close(); + } } catch (error) { // Check if this is an auth error - if so, return appropriate 401 // Note: This only applies to HTTP connections diff --git a/apps/mesh/src/api/routes/self.ts b/apps/mesh/src/api/routes/self.ts index 3e9b344f3d..671843f593 100644 --- a/apps/mesh/src/api/routes/self.ts +++ b/apps/mesh/src/api/routes/self.ts @@ -29,7 +29,11 @@ app.all("/", async (c) => { c.req.raw.headers.get("Accept")?.includes("application/json") ?? false, }); await server.connect(transport); - return transport.handleRequest(c.req.raw); + try { + return await transport.handleRequest(c.req.raw); + } finally { + await server.close(); + } }); export default app; diff --git a/apps/mesh/src/api/routes/virtual-mcp.ts b/apps/mesh/src/api/routes/virtual-mcp.ts index 57685c15f9..ddc3dd602c 100644 --- a/apps/mesh/src/api/routes/virtual-mcp.ts +++ b/apps/mesh/src/api/routes/virtual-mcp.ts @@ -141,7 +141,11 @@ export async function handleVirtualMcpRequest( // Connect server to transport await server.connect(transport); - return await transport.handleRequest(c.req.raw); + try { + return await transport.handleRequest(c.req.raw); + } finally { + await server.close(); + } } catch (error) { const err = error as Error; console.error("[virtual-mcp] Error handling virtual MCP request:", err); From 70342793e88769c3a10df967b2a37b61b0cdb2ce Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:25:13 -0300 Subject: [PATCH 04/19] fix(server): add graceful shutdown on SIGTERM/SIGINT When the process receives SIGTERM (container restart, deploy) or SIGINT (Ctrl+C), there was no cleanup: RunRegistry reaper leaked, in-flight event bus deliveries became stuck, NATS wasn't drained, SSE connections hung. Now createApp returns a shutdown function that cleanly stops all subsystems. The entry point wires it to SIGTERM/SIGINT handlers. Co-Authored-By: Claude Opus 4.6 --- .../src/api/access-control.integration.test.ts | 4 ++-- apps/mesh/src/api/app.ts | 15 ++++++++++++++- apps/mesh/src/api/index.test.ts | 4 ++-- apps/mesh/src/api/index.ts | 3 ++- apps/mesh/src/api/integration.test.ts | 4 ++-- apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts | 4 ++-- apps/mesh/src/index.ts | 14 +++++++++++++- 7 files changed, 37 insertions(+), 11 deletions(-) diff --git a/apps/mesh/src/api/access-control.integration.test.ts b/apps/mesh/src/api/access-control.integration.test.ts index ef5c42ad7b..89cdfab8eb 100644 --- a/apps/mesh/src/api/access-control.integration.test.ts +++ b/apps/mesh/src/api/access-control.integration.test.ts @@ -101,7 +101,7 @@ interface MCPRequest { describe("Access Control Integration Tests", () => { let database: MeshDatabase; - let app: Awaited>; + let app: Awaited>["app"]; let testUsers: Map; let testOrganizations: Map; let testConnections: Map; @@ -121,7 +121,7 @@ describe("Access Control Integration Tests", () => { await createTestSchema(database.db); // Create app instance with test database and mock event bus - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); // Initialize test data maps testUsers = new Map(); diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index cbeabf21ca..cc34dac995 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -877,5 +877,18 @@ export async function createApp(options: CreateAppOptions = {}) { ); }); - return app; + // Graceful shutdown: stop active runs, event bus, SSE hub, NATS + const shutdown = async () => { + console.log("[Shutdown] Graceful shutdown initiated"); + runRegistry.stopAll(threadStorage); + runRegistry.dispose(); + await cancelBroadcast.stop().catch(() => {}); + streamBuffer.teardown(); + await eventBus.stop(); + await sseHub.stop().catch(() => {}); + await natsProvider?.drain().catch(() => {}); + console.log("[Shutdown] Cleanup complete"); + }; + + return { app, shutdown }; } diff --git a/apps/mesh/src/api/index.test.ts b/apps/mesh/src/api/index.test.ts index fe12353cfa..d86fd7b5c5 100644 --- a/apps/mesh/src/api/index.test.ts +++ b/apps/mesh/src/api/index.test.ts @@ -54,12 +54,12 @@ function createMockEventBus(): EventBus { describe("Hono App", () => { let database: MeshDatabase; - let app: Awaited>; + let app: Awaited>["app"]; beforeEach(async () => { database = createDatabase(":memory:"); await createTestSchema(database.db); - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); }); afterEach(async () => { diff --git a/apps/mesh/src/api/index.ts b/apps/mesh/src/api/index.ts index 6e37ae5b77..bc086c95c4 100644 --- a/apps/mesh/src/api/index.ts +++ b/apps/mesh/src/api/index.ts @@ -10,4 +10,5 @@ export { createApp, type CreateAppOptions } from "./app"; // Default app instance for production use // This runs createApp() immediately on module load import { createApp } from "./app"; -export default await createApp(); +const result = await createApp(); +export default result; diff --git a/apps/mesh/src/api/integration.test.ts b/apps/mesh/src/api/integration.test.ts index e9f0ea6e3b..f8bd24c374 100644 --- a/apps/mesh/src/api/integration.test.ts +++ b/apps/mesh/src/api/integration.test.ts @@ -44,13 +44,13 @@ describe("MCP Integration", () => { let client: Client | null = null; let originalFetch: typeof global.fetch; let database: MeshDatabase; - let app: Awaited>; + let app: Awaited>["app"]; beforeEach(async () => { // Create test database and app database = createDatabase(":memory:"); await createTestSchema(database.db); - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); // Store original fetch originalFetch = global.fetch; diff --git a/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts b/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts index 7015c3f5c2..fa4d29af5f 100644 --- a/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts +++ b/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts @@ -91,7 +91,7 @@ function createMockEventBus(): EventBus { } let database: MeshDatabase; -let app: Awaited>; +let app: Awaited>["app"]; const connectionMap = new Map(); describe("MCP OAuth Proxy E2E", () => { @@ -101,7 +101,7 @@ describe("MCP OAuth Proxy E2E", () => { database = createDatabase(":memory:"); await createTestSchema(database.db); - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); const orgId = "org_test"; diff --git a/apps/mesh/src/index.ts b/apps/mesh/src/index.ts index f39781b9f8..6c0faca49e 100644 --- a/apps/mesh/src/index.ts +++ b/apps/mesh/src/index.ts @@ -38,7 +38,7 @@ const handleAssets = createAssetHandler({ }); // Create the Hono app -const app = await createApp(); +const { app, shutdown } = await createApp(); console.log(""); console.log(`${green}✓${reset} ${bold}Ready${reset}`); @@ -60,6 +60,18 @@ Bun.serve({ development: process.env.NODE_ENV !== "production", }); +// Graceful shutdown on SIGTERM/SIGINT (container restart, deploy, Ctrl+C) +let shuttingDown = false; +const handleSignal = async (signal: string) => { + if (shuttingDown) return; + shuttingDown = true; + console.log(`\n[${signal}] Shutting down...`); + await shutdown(); + process.exit(0); +}; +process.on("SIGTERM", () => handleSignal("SIGTERM")); +process.on("SIGINT", () => handleSignal("SIGINT")); + // Internal debug server (only enabled via ENABLE_DEBUG_SERVER=true) if (enableDebugServer) { startDebugServer({ port: debugPort }); From d6214b6c6c32a54e321f27c54d4122819cea68ce Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:25:40 -0300 Subject: [PATCH 05/19] fix(startup): await event bus and SSE hub initialization Fire-and-forget startup of EventBus and SSE hub meant the app could start serving requests in a degraded state. Now these critical operations are awaited so the server is fully ready before handling traffic. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/app.ts | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index cc34dac995..7ce3c5fb26 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -201,12 +201,14 @@ export async function createApp(options: CreateAppOptions = {}) { if (options.eventBus) { eventBus = options.eventBus; - sseHub.start().catch((error) => { + try { + await sseHub.start(); + } catch (error) { console.error( "[SSEHub] Error starting broadcast (custom eventBus):", error, ); - }); + } } else { // Create notify function that uses the context factory // This is called by the worker to deliver events to subscribers @@ -593,20 +595,21 @@ export async function createApp(options: CreateAppOptions = {}) { // Start the event bus worker (async - resets stuck deliveries from previous crashes) // Then run plugin startup hooks (e.g., recover stuck workflow executions) - Promise.resolve(eventBus.start()) - .then(() => { - console.log("[EventBus] Worker started"); - // db is typed as `any` to avoid Kysely version mismatch issues between packages - return runPluginStartupHooks({ - db: database.db as any, - publish: async (organizationId, event) => { - await eventBus.publish(organizationId, "", event); - }, - }); - }) - .catch((error) => { - console.error("[EventBus] Error during startup:", error); + // Await critical startup to avoid degraded state where the event bus + // or plugins aren't ready when the first request arrives. + try { + await eventBus.start(); + console.log("[EventBus] Worker started"); + // db is typed as `any` to avoid Kysely version mismatch issues between packages + await runPluginStartupHooks({ + db: database.db as any, + publish: async (organizationId, event) => { + await eventBus.publish(organizationId, "", event); + }, }); + } catch (error) { + console.error("[EventBus] Error during startup:", error); + } // Inject MeshContext into requests // Skip auth routes, static files, health check, and metrics - they don't need MeshContext From 6b582a191f1756c081dc90641ff08f95790a6518 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:25:54 -0300 Subject: [PATCH 06/19] fix(event-bus): add yield in coalescing loop to prevent CPU exhaustion Under sustained high event volume, the do/while coalescing loop can spin indefinitely with 100% CPU usage because pendingNotify is continuously set to true by incoming notifications. A brief 50ms yield between iterations provides backpressure without meaningfully impacting latency. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/event-bus/worker.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/mesh/src/event-bus/worker.ts b/apps/mesh/src/event-bus/worker.ts index ff1506f430..4cd5e890ed 100644 --- a/apps/mesh/src/event-bus/worker.ts +++ b/apps/mesh/src/event-bus/worker.ts @@ -180,6 +180,11 @@ export class EventBusWorker { do { this.pendingNotify = false; await this.processEvents(); + // Yield briefly between coalescing iterations to prevent a CPU-burning + // busy-loop when notifications arrive faster than processing completes. + if (this.pendingNotify) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } } while (this.pendingNotify); } catch (error) { console.error("[EventBus] Error processing events:", error); From 0b4bfa1dc5d36759305f6b257c9c68ecabb40e88 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:26:22 -0300 Subject: [PATCH 07/19] fix(decopilot): close NATS replay stream after consecutive malformed messages The while(true) loop in the ReadableStream pull method skips malformed messages without delay. If a stream contains many consecutive malformed messages, this becomes a CPU-burning busy-loop. Now the stream closes after 100 consecutive malformed messages. Co-Authored-By: Claude Opus 4.6 --- .../api/routes/decopilot/nats-stream-buffer.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts b/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts index 9c6355521c..875d0980c2 100644 --- a/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts +++ b/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts @@ -180,8 +180,11 @@ export class NatsStreamBuffer implements StreamBuffer { } })(); + const MAX_CONSECUTIVE_MALFORMED = 100; + return new ReadableStream({ async pull(controller) { + let consecutiveMalformed = 0; while (true) { let timer: ReturnType | undefined; const result = await Promise.race([ @@ -211,8 +214,18 @@ export class NatsStreamBuffer implements StreamBuffer { controller.enqueue(data.p); return; } + consecutiveMalformed++; } catch { - // skip malformed, continue to next message + consecutiveMalformed++; + } + // Prevent CPU-burning busy-loop on streams with many malformed messages + if (consecutiveMalformed >= MAX_CONSECUTIVE_MALFORMED) { + console.warn( + `[Decopilot] Closing replay stream after ${MAX_CONSECUTIVE_MALFORMED} consecutive malformed messages`, + ); + sub.unsubscribe(); + controller.close(); + return; } } }, From 05896ad73124e684d7dfbea75e77cb35ae544c9b Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:27:03 -0300 Subject: [PATCH 08/19] fix(monitoring): add backpressure to fire-and-forget DB writes logToDatabase() is called fire-and-forget. Under load with a slow database, these async operations accumulate without bound, consuming memory. Now a simple counter caps in-flight writes to 50, dropping monitoring logs when the queue is full. OpenTelemetry metrics are still recorded regardless. Co-Authored-By: Claude Opus 4.6 --- .../mcp-clients/outbound/transports/monitoring.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts b/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts index 5be5e1e6ee..5345333802 100644 --- a/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts +++ b/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts @@ -39,8 +39,13 @@ interface InflightRequest { span?: Span; } +// Max concurrent fire-and-forget DB writes. Beyond this limit, logs are +// dropped to prevent unbounded memory growth when the database is slow. +const MAX_INFLIGHT_DB_WRITES = 50; + export class MonitoringTransport extends WrapperTransport { private inflightRequests = new Map(); + private inflightDbWrites = 0; constructor( innerTransport: Transport, @@ -207,6 +212,12 @@ export class MonitoringTransport extends WrapperTransport { return; } + // Backpressure: drop monitoring logs when too many writes are in-flight. + // This prevents unbounded memory growth when the database is slow. + if (this.inflightDbWrites >= MAX_INFLIGHT_DB_WRITES) { + return; + } + // Get organization ID from context const organizationId = ctx.organization?.id; if (!organizationId) { @@ -224,6 +235,7 @@ export class MonitoringTransport extends WrapperTransport { const properties = mergeProperties(ctx.metadata.properties, metaProps); // Log to database + this.inflightDbWrites++; try { await ctx.storage.monitoring.log({ organizationId, @@ -245,6 +257,8 @@ export class MonitoringTransport extends WrapperTransport { } catch (error) { // Don't throw - logging failures shouldn't break tool execution console.error("[MonitoringTransport] Failed to log to database:", error); + } finally { + this.inflightDbWrites--; } } From 8debddb8ed26fddfe76c87317775fc61485ac706 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 08:29:23 -0300 Subject: [PATCH 09/19] fix(types): update MeshContext type to include asyncDispose on client pool Update the getOrCreateClient type in MeshContext to include Symbol.asyncDispose, matching the runtime implementation. Fix test mocks to satisfy the updated type. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/core/define-tool.test.ts | 4 +++- apps/mesh/src/core/mesh-context.test.ts | 4 +++- apps/mesh/src/core/mesh-context.ts | 6 ++++-- apps/mesh/src/tools/connection/connection-tools.test.ts | 4 +++- apps/mesh/src/tools/organization/organization-tools.test.ts | 4 +++- 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/apps/mesh/src/core/define-tool.test.ts b/apps/mesh/src/core/define-tool.test.ts index 0ce57f6f3b..23b042c9d6 100644 --- a/apps/mesh/src/core/define-tool.test.ts +++ b/apps/mesh/src/core/define-tool.test.ts @@ -114,7 +114,9 @@ const createMockContext = (): MeshContext => ({ isRunning: vi.fn().mockReturnValue(false), } as unknown as EventBus, createMCPProxy: vi.fn().mockResolvedValue({}), - getOrCreateClient: vi.fn().mockResolvedValue({}), + getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), { + [Symbol.asyncDispose]: async () => {}, + }), }); describe("defineTool", () => { diff --git a/apps/mesh/src/core/mesh-context.test.ts b/apps/mesh/src/core/mesh-context.test.ts index d155191588..2b1edd70d7 100644 --- a/apps/mesh/src/core/mesh-context.test.ts +++ b/apps/mesh/src/core/mesh-context.test.ts @@ -56,7 +56,9 @@ const createMockContext = (overrides?: Partial): MeshContext => ({ }, eventBus: null as unknown as EventBus, createMCPProxy: async () => ({}) as never, - getOrCreateClient: async () => ({}) as never, + getOrCreateClient: Object.assign(async () => ({}) as never, { + [Symbol.asyncDispose]: async () => {}, + }), ...overrides, }); diff --git a/apps/mesh/src/core/mesh-context.ts b/apps/mesh/src/core/mesh-context.ts index 255cbc259b..3c4de744b7 100644 --- a/apps/mesh/src/core/mesh-context.ts +++ b/apps/mesh/src/core/mesh-context.ts @@ -327,10 +327,12 @@ export interface MeshContext { ) => ReturnType; // Client pool for STDIO connection reuse (LRU cache) - getOrCreateClient: ( + getOrCreateClient: (( transport: T, key: string, - ) => Promise; + ) => Promise) & { + [Symbol.asyncDispose]: () => Promise; + }; } // ============================================================================ diff --git a/apps/mesh/src/tools/connection/connection-tools.test.ts b/apps/mesh/src/tools/connection/connection-tools.test.ts index 1134f1c733..b29f120ba9 100644 --- a/apps/mesh/src/tools/connection/connection-tools.test.ts +++ b/apps/mesh/src/tools/connection/connection-tools.test.ts @@ -128,7 +128,9 @@ describe("Connection Tools", () => { isRunning: vi.fn().mockReturnValue(false), } as unknown as EventBus, createMCPProxy: vi.fn().mockResolvedValue({}), - getOrCreateClient: vi.fn().mockResolvedValue({}), + getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), { + [Symbol.asyncDispose]: async () => {}, + }), }; }); diff --git a/apps/mesh/src/tools/organization/organization-tools.test.ts b/apps/mesh/src/tools/organization/organization-tools.test.ts index f8fa049019..24bafc323e 100644 --- a/apps/mesh/src/tools/organization/organization-tools.test.ts +++ b/apps/mesh/src/tools/organization/organization-tools.test.ts @@ -229,7 +229,9 @@ const createMockContext = ( timestamp: new Date(), }, createMCPProxy: vi.fn().mockResolvedValue({}), - getOrCreateClient: vi.fn().mockResolvedValue({}), + getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), { + [Symbol.asyncDispose]: async () => {}, + }), }; }; From 8764dff1cc63022d52569f3cfb5b4ffdd192e8f2 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 09:14:16 -0300 Subject: [PATCH 10/19] revert: remove server.close() from proxy routes (breaks SSE connections) The try/finally with server.close() kills SSE streams because handleRequest() returns while the streaming response is still active. The server must stay alive for the duration of the SSE connection. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/routes/dev-assets-mcp.ts | 6 +----- apps/mesh/src/api/routes/proxy.ts | 7 +------ apps/mesh/src/api/routes/self.ts | 6 +----- apps/mesh/src/api/routes/virtual-mcp.ts | 6 +----- 4 files changed, 4 insertions(+), 21 deletions(-) diff --git a/apps/mesh/src/api/routes/dev-assets-mcp.ts b/apps/mesh/src/api/routes/dev-assets-mcp.ts index edd700e821..b584b2102f 100644 --- a/apps/mesh/src/api/routes/dev-assets-mcp.ts +++ b/apps/mesh/src/api/routes/dev-assets-mcp.ts @@ -536,11 +536,7 @@ export async function handleDevAssetsMcpRequest( req.headers.get("Accept")?.includes("application/json") ?? false, }); await server.connect(transport); - try { - return await transport.handleRequest(req); - } finally { - await server.close(); - } + return transport.handleRequest(req); } /** diff --git a/apps/mesh/src/api/routes/proxy.ts b/apps/mesh/src/api/routes/proxy.ts index 0044134539..6ad8ab20c2 100644 --- a/apps/mesh/src/api/routes/proxy.ts +++ b/apps/mesh/src/api/routes/proxy.ts @@ -275,12 +275,7 @@ app.all("/:connectionId", async (c) => { // Connect server to transport await server.connect(transport); - // Handle request with cleanup — ensures server/transport are released on error - try { - return await transport.handleRequest(c.req.raw); - } finally { - await server.close(); - } + return await transport.handleRequest(c.req.raw); } catch (error) { // Check if this is an auth error - if so, return appropriate 401 // Note: This only applies to HTTP connections diff --git a/apps/mesh/src/api/routes/self.ts b/apps/mesh/src/api/routes/self.ts index 671843f593..3e9b344f3d 100644 --- a/apps/mesh/src/api/routes/self.ts +++ b/apps/mesh/src/api/routes/self.ts @@ -29,11 +29,7 @@ app.all("/", async (c) => { c.req.raw.headers.get("Accept")?.includes("application/json") ?? false, }); await server.connect(transport); - try { - return await transport.handleRequest(c.req.raw); - } finally { - await server.close(); - } + return transport.handleRequest(c.req.raw); }); export default app; diff --git a/apps/mesh/src/api/routes/virtual-mcp.ts b/apps/mesh/src/api/routes/virtual-mcp.ts index ddc3dd602c..57685c15f9 100644 --- a/apps/mesh/src/api/routes/virtual-mcp.ts +++ b/apps/mesh/src/api/routes/virtual-mcp.ts @@ -141,11 +141,7 @@ export async function handleVirtualMcpRequest( // Connect server to transport await server.connect(transport); - try { - return await transport.handleRequest(c.req.raw); - } finally { - await server.close(); - } + return await transport.handleRequest(c.req.raw); } catch (error) { const err = error as Error; console.error("[virtual-mcp] Error handling virtual MCP request:", err); From fc717152817f1bcb62b8ce8718a7391e309d4bcb Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 10:51:16 -0300 Subject: [PATCH 11/19] fix: guard eventBus.stop() in shutdown to prevent skipping remaining cleanup Add .catch(() => {}) to match the pattern used by all other awaited calls in the shutdown sequence. Without this, a throw from eventBus.stop() would skip sseHub.stop() and natsProvider.drain(). Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/app.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 7ce3c5fb26..5db91c3ab8 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -887,7 +887,7 @@ export async function createApp(options: CreateAppOptions = {}) { runRegistry.dispose(); await cancelBroadcast.stop().catch(() => {}); streamBuffer.teardown(); - await eventBus.stop(); + await eventBus.stop().catch(() => {}); await sseHub.stop().catch(() => {}); await natsProvider?.drain().catch(() => {}); console.log("[Shutdown] Cleanup complete"); From c34e1cd796f3d5dcaf23e89adf0c80e771436c3c Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 10:51:47 -0300 Subject: [PATCH 12/19] fix: handle shutdown() rejection in signal handler to ensure clean exit Wrap shutdown() in try/catch so a rejection logs the error and process.exit(0) still runs, preventing unhandled rejections and process hangs during graceful shutdown. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/index.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/mesh/src/index.ts b/apps/mesh/src/index.ts index 6c0faca49e..adafd565d2 100644 --- a/apps/mesh/src/index.ts +++ b/apps/mesh/src/index.ts @@ -66,7 +66,11 @@ const handleSignal = async (signal: string) => { if (shuttingDown) return; shuttingDown = true; console.log(`\n[${signal}] Shutting down...`); - await shutdown(); + try { + await shutdown(); + } catch (err) { + console.error("[Shutdown] Error during cleanup:", err); + } process.exit(0); }; process.on("SIGTERM", () => handleSignal("SIGTERM")); From 89d7ca5f92ece9b8146b7ededff92339b0a42cdc Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 10:56:00 -0300 Subject: [PATCH 13/19] fix: wrap eventBus.stop() in Promise.resolve for void | Promise type eventBus.stop() returns void | Promise, so .catch() is not available on the void branch. Wrap in Promise.resolve() to normalize. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/app.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 5db91c3ab8..4576e8c140 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -887,7 +887,7 @@ export async function createApp(options: CreateAppOptions = {}) { runRegistry.dispose(); await cancelBroadcast.stop().catch(() => {}); streamBuffer.teardown(); - await eventBus.stop().catch(() => {}); + await Promise.resolve(eventBus.stop()).catch(() => {}); await sseHub.stop().catch(() => {}); await natsProvider?.drain().catch(() => {}); console.log("[Shutdown] Cleanup complete"); From e010c1f244fe46a2fe264551f231ce7eabd6f82e Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 11:11:23 -0300 Subject: [PATCH 14/19] fix: use abort signal for client pool disposal instead of finally block The finally block fires when `await next()` resolves, but SSE/streaming routes return a Response while the stream is still active. This kills MCP client connections mid-stream. Using the request abort signal defers disposal until the connection actually closes. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/app.ts | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 4576e8c140..d637d872e4 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -632,14 +632,17 @@ export async function createApp(options: CreateAppOptions = {}) { const meshCtx = await ContextFactory.create(c.req.raw, { timings }); c.set("meshContext", meshCtx); - try { - return await next(); - } finally { - // Dispose the per-request client pool to close MCP client connections. - // Without this, every request that uses getOrCreateClient() leaks - // connections, eventually exhausting file descriptors and memory. - await meshCtx.getOrCreateClient[Symbol.asyncDispose](); - } + // Dispose the per-request client pool when the request ends. + // We use the abort signal instead of a finally block because SSE/streaming + // routes return a Response while the stream is still active — a finally + // block would close MCP client connections mid-stream. + c.req.raw.signal.addEventListener("abort", () => { + meshCtx.getOrCreateClient[Symbol.asyncDispose]().catch((err) => + console.error("[ClientPool] Disposal error:", err), + ); + }); + + return next(); }); // Get all management tools (for OAuth consent UI) From a5e78fd13edecf0a0732e0353a207eaaed880d6d Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 11:11:50 -0300 Subject: [PATCH 15/19] fix: add 10s timeout to shutdown handler to prevent infinite hangs If any cleanup step (NATS drain, eventBus stop) hangs, the process would never exit. Add a hard timeout that force-exits with code 1. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/index.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/mesh/src/index.ts b/apps/mesh/src/index.ts index adafd565d2..7cdd26c70e 100644 --- a/apps/mesh/src/index.ts +++ b/apps/mesh/src/index.ts @@ -62,10 +62,16 @@ Bun.serve({ // Graceful shutdown on SIGTERM/SIGINT (container restart, deploy, Ctrl+C) let shuttingDown = false; +const SHUTDOWN_TIMEOUT_MS = 10_000; const handleSignal = async (signal: string) => { if (shuttingDown) return; shuttingDown = true; console.log(`\n[${signal}] Shutting down...`); + const forceExit = setTimeout(() => { + console.error("[Shutdown] Timed out after 10s, forcing exit"); + process.exit(1); + }, SHUTDOWN_TIMEOUT_MS); + forceExit.unref(); try { await shutdown(); } catch (err) { From 873ddda99c368ca8833051e7061c41b3a3238dda Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 11:12:16 -0300 Subject: [PATCH 16/19] fix(monitoring): log sampled warning when backpressure drops writes Without this, operators have zero visibility when monitoring logs are silently dropped. Uses sampled logging (1st drop + every 100th) to avoid flooding logs under sustained backpressure. Co-Authored-By: Claude Opus 4.6 --- .../mesh/src/mcp-clients/outbound/transports/monitoring.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts b/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts index 5345333802..131d159fa8 100644 --- a/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts +++ b/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts @@ -46,6 +46,7 @@ const MAX_INFLIGHT_DB_WRITES = 50; export class MonitoringTransport extends WrapperTransport { private inflightRequests = new Map(); private inflightDbWrites = 0; + private droppedDbWrites = 0; constructor( innerTransport: Transport, @@ -215,6 +216,12 @@ export class MonitoringTransport extends WrapperTransport { // Backpressure: drop monitoring logs when too many writes are in-flight. // This prevents unbounded memory growth when the database is slow. if (this.inflightDbWrites >= MAX_INFLIGHT_DB_WRITES) { + this.droppedDbWrites++; + if (this.droppedDbWrites === 1 || this.droppedDbWrites % 100 === 0) { + console.warn( + `[MonitoringTransport] Backpressure: dropped ${this.droppedDbWrites} monitoring log(s)`, + ); + } return; } From 95b685e08ce90ee58e0ddda7a3b1dbe6cd240ffa Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 11:13:00 -0300 Subject: [PATCH 17/19] fix: preserve default export as Hono app in api/index.ts The default export was changed from a Hono app to { app, shutdown }, which would break any consumer calling .fetch() on the import. Export shutdown as a named export instead. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/index.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/mesh/src/api/index.ts b/apps/mesh/src/api/index.ts index bc086c95c4..d1cc6cadec 100644 --- a/apps/mesh/src/api/index.ts +++ b/apps/mesh/src/api/index.ts @@ -10,5 +10,6 @@ export { createApp, type CreateAppOptions } from "./app"; // Default app instance for production use // This runs createApp() immediately on module load import { createApp } from "./app"; -const result = await createApp(); -export default result; +const { app, shutdown } = await createApp(); +export { shutdown }; +export default app; From e4dc67269d7148de4efeaada5cd6b40b5ac02458 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 11:25:36 -0300 Subject: [PATCH 18/19] fix: combine abort signal + finally for client pool disposal Abort-signal-only disposal leaks on non-streaming requests where the signal may never fire (keep-alive connections). Now: - Finally block disposes for non-SSE responses (JSON, call-tool) - Abort signal disposes for SSE/streaming responses (MCP proxy) - Content-Type check skips finally for text/event-stream - Disposal is idempotent so both mechanisms can safely fire Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/app.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index d637d872e4..9c916f20e2 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -633,16 +633,26 @@ export async function createApp(options: CreateAppOptions = {}) { c.set("meshContext", meshCtx); // Dispose the per-request client pool when the request ends. - // We use the abort signal instead of a finally block because SSE/streaming - // routes return a Response while the stream is still active — a finally - // block would close MCP client connections mid-stream. + // Two mechanisms handle the two response types: + // - Abort signal: fires when the client disconnects (handles SSE/streaming) + // - Finally block: fires when next() resolves (handles non-streaming JSON) + // Both are needed because SSE streams outlive next(), while non-streaming + // keep-alive connections may never fire the abort signal. + // Disposal is idempotent so both can safely fire. c.req.raw.signal.addEventListener("abort", () => { meshCtx.getOrCreateClient[Symbol.asyncDispose]().catch((err) => console.error("[ClientPool] Disposal error:", err), ); }); - return next(); + try { + return await next(); + } finally { + // Skip for SSE — the abort signal handles cleanup when the stream ends. + if (!c.res?.headers?.get("content-type")?.includes("text/event-stream")) { + await meshCtx.getOrCreateClient[Symbol.asyncDispose](); + } + } }); // Get all management tools (for OAuth consent UI) From d8fff8e1f47b18073e2f12028880e7109c6ee414 Mon Sep 17 00:00:00 2001 From: Guilherme Rodrigues Date: Fri, 6 Mar 2026 12:02:35 -0300 Subject: [PATCH 19/19] fix: add catch to finally-path client pool disposal Mirror the abort-path error handling so a disposal rejection doesn't become an unhandled rejection that fails the request. Co-Authored-By: Claude Opus 4.6 --- apps/mesh/src/api/app.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 9c916f20e2..9e872dd64f 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -650,7 +650,9 @@ export async function createApp(options: CreateAppOptions = {}) { } finally { // Skip for SSE — the abort signal handles cleanup when the stream ends. if (!c.res?.headers?.get("content-type")?.includes("text/event-stream")) { - await meshCtx.getOrCreateClient[Symbol.asyncDispose](); + await meshCtx.getOrCreateClient[Symbol.asyncDispose]().catch((err) => + console.error("[ClientPool] Disposal error:", err), + ); } } });