diff --git a/apps/mesh/src/api/access-control.integration.test.ts b/apps/mesh/src/api/access-control.integration.test.ts index ef5c42ad7b..89cdfab8eb 100644 --- a/apps/mesh/src/api/access-control.integration.test.ts +++ b/apps/mesh/src/api/access-control.integration.test.ts @@ -101,7 +101,7 @@ interface MCPRequest { describe("Access Control Integration Tests", () => { let database: MeshDatabase; - let app: Awaited>; + let app: Awaited>["app"]; let testUsers: Map; let testOrganizations: Map; let testConnections: Map; @@ -121,7 +121,7 @@ describe("Access Control Integration Tests", () => { await createTestSchema(database.db); // Create app instance with test database and mock event bus - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); // Initialize test data maps testUsers = new Map(); diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 4a2affe8fd..9e872dd64f 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -201,12 +201,14 @@ export async function createApp(options: CreateAppOptions = {}) { if (options.eventBus) { eventBus = options.eventBus; - sseHub.start().catch((error) => { + try { + await sseHub.start(); + } catch (error) { console.error( "[SSEHub] Error starting broadcast (custom eventBus):", error, ); - }); + } } else { // Create notify function that uses the context factory // This is called by the worker to deliver events to subscribers @@ -593,20 +595,21 @@ export async function createApp(options: CreateAppOptions = {}) { // Start the event bus worker (async - resets stuck deliveries from previous crashes) // Then run plugin startup hooks (e.g., recover stuck workflow executions) - Promise.resolve(eventBus.start()) - .then(() => { - console.log("[EventBus] Worker started"); - // db is typed as `any` to avoid Kysely version mismatch issues between packages - return runPluginStartupHooks({ - db: database.db as any, - publish: async (organizationId, event) => { - await eventBus.publish(organizationId, "", event); - }, - }); - }) - .catch((error) => { - console.error("[EventBus] Error during startup:", error); + // Await critical startup to avoid degraded state where the event bus + // or plugins aren't ready when the first request arrives. + try { + await eventBus.start(); + console.log("[EventBus] Worker started"); + // db is typed as `any` to avoid Kysely version mismatch issues between packages + await runPluginStartupHooks({ + db: database.db as any, + publish: async (organizationId, event) => { + await eventBus.publish(organizationId, "", event); + }, }); + } catch (error) { + console.error("[EventBus] Error during startup:", error); + } // Inject MeshContext into requests // Skip auth routes, static files, health check, and metrics - they don't need MeshContext @@ -629,7 +632,29 @@ export async function createApp(options: CreateAppOptions = {}) { const meshCtx = await ContextFactory.create(c.req.raw, { timings }); c.set("meshContext", meshCtx); - return next(); + // Dispose the per-request client pool when the request ends. + // Two mechanisms handle the two response types: + // - Abort signal: fires when the client disconnects (handles SSE/streaming) + // - Finally block: fires when next() resolves (handles non-streaming JSON) + // Both are needed because SSE streams outlive next(), while non-streaming + // keep-alive connections may never fire the abort signal. + // Disposal is idempotent so both can safely fire. + c.req.raw.signal.addEventListener("abort", () => { + meshCtx.getOrCreateClient[Symbol.asyncDispose]().catch((err) => + console.error("[ClientPool] Disposal error:", err), + ); + }); + + try { + return await next(); + } finally { + // Skip for SSE — the abort signal handles cleanup when the stream ends. + if (!c.res?.headers?.get("content-type")?.includes("text/event-stream")) { + await meshCtx.getOrCreateClient[Symbol.asyncDispose]().catch((err) => + console.error("[ClientPool] Disposal error:", err), + ); + } + } }); // Get all management tools (for OAuth consent UI) @@ -870,5 +895,18 @@ export async function createApp(options: CreateAppOptions = {}) { ); }); - return app; + // Graceful shutdown: stop active runs, event bus, SSE hub, NATS + const shutdown = async () => { + console.log("[Shutdown] Graceful shutdown initiated"); + runRegistry.stopAll(threadStorage); + runRegistry.dispose(); + await cancelBroadcast.stop().catch(() => {}); + streamBuffer.teardown(); + await Promise.resolve(eventBus.stop()).catch(() => {}); + await sseHub.stop().catch(() => {}); + await natsProvider?.drain().catch(() => {}); + console.log("[Shutdown] Cleanup complete"); + }; + + return { app, shutdown }; } diff --git a/apps/mesh/src/api/index.test.ts b/apps/mesh/src/api/index.test.ts index fe12353cfa..d86fd7b5c5 100644 --- a/apps/mesh/src/api/index.test.ts +++ b/apps/mesh/src/api/index.test.ts @@ -54,12 +54,12 @@ function createMockEventBus(): EventBus { describe("Hono App", () => { let database: MeshDatabase; - let app: Awaited>; + let app: Awaited>["app"]; beforeEach(async () => { database = createDatabase(":memory:"); await createTestSchema(database.db); - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); }); afterEach(async () => { diff --git a/apps/mesh/src/api/index.ts b/apps/mesh/src/api/index.ts index 6e37ae5b77..d1cc6cadec 100644 --- a/apps/mesh/src/api/index.ts +++ b/apps/mesh/src/api/index.ts @@ -10,4 +10,6 @@ export { createApp, type CreateAppOptions } from "./app"; // Default app instance for production use // This runs createApp() immediately on module load import { createApp } from "./app"; -export default await createApp(); +const { app, shutdown } = await createApp(); +export { shutdown }; +export default app; diff --git a/apps/mesh/src/api/integration.test.ts b/apps/mesh/src/api/integration.test.ts index e9f0ea6e3b..f8bd24c374 100644 --- a/apps/mesh/src/api/integration.test.ts +++ b/apps/mesh/src/api/integration.test.ts @@ -44,13 +44,13 @@ describe("MCP Integration", () => { let client: Client | null = null; let originalFetch: typeof global.fetch; let database: MeshDatabase; - let app: Awaited>; + let app: Awaited>["app"]; beforeEach(async () => { // Create test database and app database = createDatabase(":memory:"); await createTestSchema(database.db); - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); // Store original fetch originalFetch = global.fetch; diff --git a/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts b/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts index 9c6355521c..875d0980c2 100644 --- a/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts +++ b/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts @@ -180,8 +180,11 @@ export class NatsStreamBuffer implements StreamBuffer { } })(); + const MAX_CONSECUTIVE_MALFORMED = 100; + return new ReadableStream({ async pull(controller) { + let consecutiveMalformed = 0; while (true) { let timer: ReturnType | undefined; const result = await Promise.race([ @@ -211,8 +214,18 @@ export class NatsStreamBuffer implements StreamBuffer { controller.enqueue(data.p); return; } + consecutiveMalformed++; } catch { - // skip malformed, continue to next message + consecutiveMalformed++; + } + // Prevent CPU-burning busy-loop on streams with many malformed messages + if (consecutiveMalformed >= MAX_CONSECUTIVE_MALFORMED) { + console.warn( + `[Decopilot] Closing replay stream after ${MAX_CONSECUTIVE_MALFORMED} consecutive malformed messages`, + ); + sub.unsubscribe(); + controller.close(); + return; } } }, diff --git a/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts b/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts index 7015c3f5c2..fa4d29af5f 100644 --- a/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts +++ b/apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts @@ -91,7 +91,7 @@ function createMockEventBus(): EventBus { } let database: MeshDatabase; -let app: Awaited>; +let app: Awaited>["app"]; const connectionMap = new Map(); describe("MCP OAuth Proxy E2E", () => { @@ -101,7 +101,7 @@ describe("MCP OAuth Proxy E2E", () => { database = createDatabase(":memory:"); await createTestSchema(database.db); - app = await createApp({ database, eventBus: createMockEventBus() }); + ({ app } = await createApp({ database, eventBus: createMockEventBus() })); const orgId = "org_test"; diff --git a/apps/mesh/src/api/routes/proxy.ts b/apps/mesh/src/api/routes/proxy.ts index 497c45fc83..6ad8ab20c2 100644 --- a/apps/mesh/src/api/routes/proxy.ts +++ b/apps/mesh/src/api/routes/proxy.ts @@ -275,7 +275,6 @@ app.all("/:connectionId", async (c) => { // Connect server to transport await server.connect(transport); - // Handle request and cleanup return await transport.handleRequest(c.req.raw); } catch (error) { // Check if this is an auth error - if so, return appropriate 401 diff --git a/apps/mesh/src/core/define-tool.test.ts b/apps/mesh/src/core/define-tool.test.ts index 0ce57f6f3b..23b042c9d6 100644 --- a/apps/mesh/src/core/define-tool.test.ts +++ b/apps/mesh/src/core/define-tool.test.ts @@ -114,7 +114,9 @@ const createMockContext = (): MeshContext => ({ isRunning: vi.fn().mockReturnValue(false), } as unknown as EventBus, createMCPProxy: vi.fn().mockResolvedValue({}), - getOrCreateClient: vi.fn().mockResolvedValue({}), + getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), { + [Symbol.asyncDispose]: async () => {}, + }), }); describe("defineTool", () => { diff --git a/apps/mesh/src/core/mesh-context.test.ts b/apps/mesh/src/core/mesh-context.test.ts index d155191588..2b1edd70d7 100644 --- a/apps/mesh/src/core/mesh-context.test.ts +++ b/apps/mesh/src/core/mesh-context.test.ts @@ -56,7 +56,9 @@ const createMockContext = (overrides?: Partial): MeshContext => ({ }, eventBus: null as unknown as EventBus, createMCPProxy: async () => ({}) as never, - getOrCreateClient: async () => ({}) as never, + getOrCreateClient: Object.assign(async () => ({}) as never, { + [Symbol.asyncDispose]: async () => {}, + }), ...overrides, }); diff --git a/apps/mesh/src/core/mesh-context.ts b/apps/mesh/src/core/mesh-context.ts index 255cbc259b..3c4de744b7 100644 --- a/apps/mesh/src/core/mesh-context.ts +++ b/apps/mesh/src/core/mesh-context.ts @@ -327,10 +327,12 @@ export interface MeshContext { ) => ReturnType; // Client pool for STDIO connection reuse (LRU cache) - getOrCreateClient: ( + getOrCreateClient: (( transport: T, key: string, - ) => Promise; + ) => Promise) & { + [Symbol.asyncDispose]: () => Promise; + }; } // ============================================================================ diff --git a/apps/mesh/src/database/index.ts b/apps/mesh/src/database/index.ts index 05cbaaf42b..d47b2afba3 100644 --- a/apps/mesh/src/database/index.ts +++ b/apps/mesh/src/database/index.ts @@ -125,6 +125,9 @@ const defaultPoolOptions = { connectionTimeoutMillis: 30000, // Allow the process to exit even with idle connections allowExitOnIdle: true, + // Prevent runaway queries from exhausting PG connections and crashing the process. + // Monitoring JSONB queries on text columns can take 170s+ without this guard. + statement_timeout: 30000, // 30s max per query }; function createPostgresDatabase(config: DatabaseConfig): PostgresDatabase { const pool = new Pool({ diff --git a/apps/mesh/src/event-bus/worker.ts b/apps/mesh/src/event-bus/worker.ts index ff1506f430..4cd5e890ed 100644 --- a/apps/mesh/src/event-bus/worker.ts +++ b/apps/mesh/src/event-bus/worker.ts @@ -180,6 +180,11 @@ export class EventBusWorker { do { this.pendingNotify = false; await this.processEvents(); + // Yield briefly between coalescing iterations to prevent a CPU-burning + // busy-loop when notifications arrive faster than processing completes. + if (this.pendingNotify) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } } while (this.pendingNotify); } catch (error) { console.error("[EventBus] Error processing events:", error); diff --git a/apps/mesh/src/index.ts b/apps/mesh/src/index.ts index f39781b9f8..7cdd26c70e 100644 --- a/apps/mesh/src/index.ts +++ b/apps/mesh/src/index.ts @@ -38,7 +38,7 @@ const handleAssets = createAssetHandler({ }); // Create the Hono app -const app = await createApp(); +const { app, shutdown } = await createApp(); console.log(""); console.log(`${green}✓${reset} ${bold}Ready${reset}`); @@ -60,6 +60,28 @@ Bun.serve({ development: process.env.NODE_ENV !== "production", }); +// Graceful shutdown on SIGTERM/SIGINT (container restart, deploy, Ctrl+C) +let shuttingDown = false; +const SHUTDOWN_TIMEOUT_MS = 10_000; +const handleSignal = async (signal: string) => { + if (shuttingDown) return; + shuttingDown = true; + console.log(`\n[${signal}] Shutting down...`); + const forceExit = setTimeout(() => { + console.error("[Shutdown] Timed out after 10s, forcing exit"); + process.exit(1); + }, SHUTDOWN_TIMEOUT_MS); + forceExit.unref(); + try { + await shutdown(); + } catch (err) { + console.error("[Shutdown] Error during cleanup:", err); + } + process.exit(0); +}; +process.on("SIGTERM", () => handleSignal("SIGTERM")); +process.on("SIGINT", () => handleSignal("SIGINT")); + // Internal debug server (only enabled via ENABLE_DEBUG_SERVER=true) if (enableDebugServer) { startDebugServer({ port: debugPort }); diff --git a/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts b/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts index 5be5e1e6ee..131d159fa8 100644 --- a/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts +++ b/apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts @@ -39,8 +39,14 @@ interface InflightRequest { span?: Span; } +// Max concurrent fire-and-forget DB writes. Beyond this limit, logs are +// dropped to prevent unbounded memory growth when the database is slow. +const MAX_INFLIGHT_DB_WRITES = 50; + export class MonitoringTransport extends WrapperTransport { private inflightRequests = new Map(); + private inflightDbWrites = 0; + private droppedDbWrites = 0; constructor( innerTransport: Transport, @@ -207,6 +213,18 @@ export class MonitoringTransport extends WrapperTransport { return; } + // Backpressure: drop monitoring logs when too many writes are in-flight. + // This prevents unbounded memory growth when the database is slow. + if (this.inflightDbWrites >= MAX_INFLIGHT_DB_WRITES) { + this.droppedDbWrites++; + if (this.droppedDbWrites === 1 || this.droppedDbWrites % 100 === 0) { + console.warn( + `[MonitoringTransport] Backpressure: dropped ${this.droppedDbWrites} monitoring log(s)`, + ); + } + return; + } + // Get organization ID from context const organizationId = ctx.organization?.id; if (!organizationId) { @@ -224,6 +242,7 @@ export class MonitoringTransport extends WrapperTransport { const properties = mergeProperties(ctx.metadata.properties, metaProps); // Log to database + this.inflightDbWrites++; try { await ctx.storage.monitoring.log({ organizationId, @@ -245,6 +264,8 @@ export class MonitoringTransport extends WrapperTransport { } catch (error) { // Don't throw - logging failures shouldn't break tool execution console.error("[MonitoringTransport] Failed to log to database:", error); + } finally { + this.inflightDbWrites--; } } diff --git a/apps/mesh/src/tools/connection/connection-tools.test.ts b/apps/mesh/src/tools/connection/connection-tools.test.ts index 1134f1c733..b29f120ba9 100644 --- a/apps/mesh/src/tools/connection/connection-tools.test.ts +++ b/apps/mesh/src/tools/connection/connection-tools.test.ts @@ -128,7 +128,9 @@ describe("Connection Tools", () => { isRunning: vi.fn().mockReturnValue(false), } as unknown as EventBus, createMCPProxy: vi.fn().mockResolvedValue({}), - getOrCreateClient: vi.fn().mockResolvedValue({}), + getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), { + [Symbol.asyncDispose]: async () => {}, + }), }; }); diff --git a/apps/mesh/src/tools/monitoring-dashboard/query.ts b/apps/mesh/src/tools/monitoring-dashboard/query.ts index 3aade7a5f8..9a4a7d8411 100644 --- a/apps/mesh/src/tools/monitoring-dashboard/query.ts +++ b/apps/mesh/src/tools/monitoring-dashboard/query.ts @@ -90,9 +90,11 @@ export const MONITORING_DASHBOARD_QUERY = defineTool({ ? new Date(input.timeRange.endDate) : now; - // Execute each widget's aggregation - const results: WidgetQueryResult[] = await Promise.all( - dashboard.widgets.map(async (widget: DashboardWidget) => { + // Execute widgets sequentially to avoid N concurrent expensive JSONB queries + // that can exhaust PG connections and crash the process + const results: WidgetQueryResult[] = []; + for (const widget of dashboard.widgets as DashboardWidget[]) { + const widgetResult = await (async () => { // Merge dashboard-level filters with widget-level filters // Dashboard-level propertyFilters are exact-match only; runtime ones // support all operators (eq, contains, exists, in). @@ -155,8 +157,9 @@ export const MONITORING_DASHBOARD_QUERY = defineTool({ timeseries: undefined, }; } - }), - ); + })(); + results.push(widgetResult); + } return { dashboardId: input.dashboardId, diff --git a/apps/mesh/src/tools/organization/organization-tools.test.ts b/apps/mesh/src/tools/organization/organization-tools.test.ts index f8fa049019..24bafc323e 100644 --- a/apps/mesh/src/tools/organization/organization-tools.test.ts +++ b/apps/mesh/src/tools/organization/organization-tools.test.ts @@ -229,7 +229,9 @@ const createMockContext = ( timestamp: new Date(), }, createMCPProxy: vi.fn().mockResolvedValue({}), - getOrCreateClient: vi.fn().mockResolvedValue({}), + getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), { + [Symbol.asyncDispose]: async () => {}, + }), }; };