Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b28caf5
fix(monitoring): add statement_timeout and sequential widget queries …
vibegui Mar 6, 2026
0cc51c4
fix(api): dispose per-request client pool to prevent connection leak
vibegui Mar 6, 2026
e83f911
fix(proxy): close server/transport on error paths to prevent resource…
vibegui Mar 6, 2026
7034279
fix(server): add graceful shutdown on SIGTERM/SIGINT
vibegui Mar 6, 2026
d6214b6
fix(startup): await event bus and SSE hub initialization
vibegui Mar 6, 2026
6b582a1
fix(event-bus): add yield in coalescing loop to prevent CPU exhaustion
vibegui Mar 6, 2026
0b4bfa1
fix(decopilot): close NATS replay stream after consecutive malformed …
vibegui Mar 6, 2026
05896ad
fix(monitoring): add backpressure to fire-and-forget DB writes
vibegui Mar 6, 2026
8debddb
fix(types): update MeshContext type to include asyncDispose on client…
vibegui Mar 6, 2026
8764dff
revert: remove server.close() from proxy routes (breaks SSE connections)
vibegui Mar 6, 2026
fc71715
fix: guard eventBus.stop() in shutdown to prevent skipping remaining …
vibegui Mar 6, 2026
c34e1cd
fix: handle shutdown() rejection in signal handler to ensure clean exit
vibegui Mar 6, 2026
89d7ca5
fix: wrap eventBus.stop() in Promise.resolve for void | Promise<void>…
vibegui Mar 6, 2026
e010c1f
fix: use abort signal for client pool disposal instead of finally block
vibegui Mar 6, 2026
a5e78fd
fix: add 10s timeout to shutdown handler to prevent infinite hangs
vibegui Mar 6, 2026
873ddda
fix(monitoring): log sampled warning when backpressure drops writes
vibegui Mar 6, 2026
95b685e
fix: preserve default export as Hono app in api/index.ts
vibegui Mar 6, 2026
e4dc672
fix: combine abort signal + finally for client pool disposal
vibegui Mar 6, 2026
d8fff8e
fix: add catch to finally-path client pool disposal
vibegui Mar 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/mesh/src/api/access-control.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ interface MCPRequest {

describe("Access Control Integration Tests", () => {
let database: MeshDatabase;
let app: Awaited<ReturnType<typeof createApp>>;
let app: Awaited<ReturnType<typeof createApp>>["app"];
let testUsers: Map<string, TestUser>;
let testOrganizations: Map<string, TestOrganization>;
let testConnections: Map<string, TestConnection>;
Expand All @@ -121,7 +121,7 @@ describe("Access Control Integration Tests", () => {
await createTestSchema(database.db);

// Create app instance with test database and mock event bus
app = await createApp({ database, eventBus: createMockEventBus() });
({ app } = await createApp({ database, eventBus: createMockEventBus() }));

// Initialize test data maps
testUsers = new Map();
Expand Down
72 changes: 55 additions & 17 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,14 @@ export async function createApp(options: CreateAppOptions = {}) {

if (options.eventBus) {
eventBus = options.eventBus;
sseHub.start().catch((error) => {
try {
await sseHub.start();
} catch (error) {
console.error(
"[SSEHub] Error starting broadcast (custom eventBus):",
error,
);
});
}
} else {
// Create notify function that uses the context factory
// This is called by the worker to deliver events to subscribers
Expand Down Expand Up @@ -593,20 +595,21 @@ export async function createApp(options: CreateAppOptions = {}) {

// Start the event bus worker (async - resets stuck deliveries from previous crashes)
// Then run plugin startup hooks (e.g., recover stuck workflow executions)
Promise.resolve(eventBus.start())
.then(() => {
console.log("[EventBus] Worker started");
// db is typed as `any` to avoid Kysely version mismatch issues between packages
return runPluginStartupHooks({
db: database.db as any,
publish: async (organizationId, event) => {
await eventBus.publish(organizationId, "", event);
},
});
})
.catch((error) => {
console.error("[EventBus] Error during startup:", error);
// Await critical startup to avoid degraded state where the event bus
// or plugins aren't ready when the first request arrives.
try {
await eventBus.start();
console.log("[EventBus] Worker started");
// db is typed as `any` to avoid Kysely version mismatch issues between packages
await runPluginStartupHooks({
db: database.db as any,
publish: async (organizationId, event) => {
await eventBus.publish(organizationId, "", event);
},
});
} catch (error) {
console.error("[EventBus] Error during startup:", error);
}

// Inject MeshContext into requests
// Skip auth routes, static files, health check, and metrics - they don't need MeshContext
Expand All @@ -629,7 +632,29 @@ export async function createApp(options: CreateAppOptions = {}) {
const meshCtx = await ContextFactory.create(c.req.raw, { timings });
c.set("meshContext", meshCtx);

return next();
// Dispose the per-request client pool when the request ends.
// Two mechanisms handle the two response types:
// - Abort signal: fires when the client disconnects (handles SSE/streaming)
// - Finally block: fires when next() resolves (handles non-streaming JSON)
// Both are needed because SSE streams outlive next(), while non-streaming
// keep-alive connections may never fire the abort signal.
// Disposal is idempotent so both can safely fire.
c.req.raw.signal.addEventListener("abort", () => {
meshCtx.getOrCreateClient[Symbol.asyncDispose]().catch((err) =>
console.error("[ClientPool] Disposal error:", err),
);
});

try {
return await next();
} finally {
// Skip for SSE — the abort signal handles cleanup when the stream ends.
if (!c.res?.headers?.get("content-type")?.includes("text/event-stream")) {
await meshCtx.getOrCreateClient[Symbol.asyncDispose]().catch((err) =>
console.error("[ClientPool] Disposal error:", err),
);
}
}
});

// Get all management tools (for OAuth consent UI)
Expand Down Expand Up @@ -870,5 +895,18 @@ export async function createApp(options: CreateAppOptions = {}) {
);
});

return app;
// Graceful shutdown: stop active runs, event bus, SSE hub, NATS
const shutdown = async () => {
console.log("[Shutdown] Graceful shutdown initiated");
runRegistry.stopAll(threadStorage);
runRegistry.dispose();
await cancelBroadcast.stop().catch(() => {});
streamBuffer.teardown();
await Promise.resolve(eventBus.stop()).catch(() => {});
await sseHub.stop().catch(() => {});
await natsProvider?.drain().catch(() => {});
console.log("[Shutdown] Cleanup complete");
};

return { app, shutdown };
}
4 changes: 2 additions & 2 deletions apps/mesh/src/api/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ function createMockEventBus(): EventBus {

describe("Hono App", () => {
let database: MeshDatabase;
let app: Awaited<ReturnType<typeof createApp>>;
let app: Awaited<ReturnType<typeof createApp>>["app"];

beforeEach(async () => {
database = createDatabase(":memory:");
await createTestSchema(database.db);
app = await createApp({ database, eventBus: createMockEventBus() });
({ app } = await createApp({ database, eventBus: createMockEventBus() }));
});

afterEach(async () => {
Expand Down
4 changes: 3 additions & 1 deletion apps/mesh/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ export { createApp, type CreateAppOptions } from "./app";
// Default app instance for production use
// This runs createApp() immediately on module load
import { createApp } from "./app";
export default await createApp();
const { app, shutdown } = await createApp();
export { shutdown };
export default app;
4 changes: 2 additions & 2 deletions apps/mesh/src/api/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ describe("MCP Integration", () => {
let client: Client | null = null;
let originalFetch: typeof global.fetch;
let database: MeshDatabase;
let app: Awaited<ReturnType<typeof createApp>>;
let app: Awaited<ReturnType<typeof createApp>>["app"];

beforeEach(async () => {
// Create test database and app
database = createDatabase(":memory:");
await createTestSchema(database.db);
app = await createApp({ database, eventBus: createMockEventBus() });
({ app } = await createApp({ database, eventBus: createMockEventBus() }));

// Store original fetch
originalFetch = global.fetch;
Expand Down
15 changes: 14 additions & 1 deletion apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ export class NatsStreamBuffer implements StreamBuffer {
}
})();

const MAX_CONSECUTIVE_MALFORMED = 100;

return new ReadableStream({
async pull(controller) {
let consecutiveMalformed = 0;
while (true) {
let timer: ReturnType<typeof setTimeout> | undefined;
const result = await Promise.race([
Expand Down Expand Up @@ -211,8 +214,18 @@ export class NatsStreamBuffer implements StreamBuffer {
controller.enqueue(data.p);
return;
}
consecutiveMalformed++;
} catch {
// skip malformed, continue to next message
consecutiveMalformed++;
}
// Prevent CPU-burning busy-loop on streams with many malformed messages
if (consecutiveMalformed >= MAX_CONSECUTIVE_MALFORMED) {
console.warn(
`[Decopilot] Closing replay stream after ${MAX_CONSECUTIVE_MALFORMED} consecutive malformed messages`,
);
sub.unsubscribe();
controller.close();
return;
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions apps/mesh/src/api/routes/oauth-proxy.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function createMockEventBus(): EventBus {
}

let database: MeshDatabase;
let app: Awaited<ReturnType<typeof createApp>>;
let app: Awaited<ReturnType<typeof createApp>>["app"];
const connectionMap = new Map<string, string>();

describe("MCP OAuth Proxy E2E", () => {
Expand All @@ -101,7 +101,7 @@ describe("MCP OAuth Proxy E2E", () => {

database = createDatabase(":memory:");
await createTestSchema(database.db);
app = await createApp({ database, eventBus: createMockEventBus() });
({ app } = await createApp({ database, eventBus: createMockEventBus() }));

const orgId = "org_test";

Expand Down
1 change: 0 additions & 1 deletion apps/mesh/src/api/routes/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ app.all("/:connectionId", async (c) => {
// Connect server to transport
await server.connect(transport);

// Handle request and cleanup
return await transport.handleRequest(c.req.raw);
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Per-request MCP server is no longer closed after handling the proxy request, causing a resource leak on this hot path.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/mesh/src/api/routes/proxy.ts, line 278:

<comment>Per-request MCP server is no longer closed after handling the proxy request, causing a resource leak on this hot path.</comment>

<file context>
@@ -275,12 +275,7 @@ app.all("/:connectionId", async (c) => {
-      } finally {
-        await server.close();
-      }
+      return await transport.handleRequest(c.req.raw);
     } catch (error) {
       // Check if this is an auth error - if so, return appropriate 401
</file context>
Suggested change
return await transport.handleRequest(c.req.raw);
try {
return await transport.handleRequest(c.req.raw);
} finally {
await server.close();
}
Fix with Cubic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deliberate for SSE connections, correct @tlgimenes ?

} catch (error) {
// Check if this is an auth error - if so, return appropriate 401
Expand Down
4 changes: 3 additions & 1 deletion apps/mesh/src/core/define-tool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ const createMockContext = (): MeshContext => ({
isRunning: vi.fn().mockReturnValue(false),
} as unknown as EventBus,
createMCPProxy: vi.fn().mockResolvedValue({}),
getOrCreateClient: vi.fn().mockResolvedValue({}),
getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), {
[Symbol.asyncDispose]: async () => {},
}),
});

describe("defineTool", () => {
Expand Down
4 changes: 3 additions & 1 deletion apps/mesh/src/core/mesh-context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ const createMockContext = (overrides?: Partial<MeshContext>): MeshContext => ({
},
eventBus: null as unknown as EventBus,
createMCPProxy: async () => ({}) as never,
getOrCreateClient: async () => ({}) as never,
getOrCreateClient: Object.assign(async () => ({}) as never, {
[Symbol.asyncDispose]: async () => {},
}),
...overrides,
});

Expand Down
6 changes: 4 additions & 2 deletions apps/mesh/src/core/mesh-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,12 @@ export interface MeshContext {
) => ReturnType<typeof createMCPProxy>;

// Client pool for STDIO connection reuse (LRU cache)
getOrCreateClient: <T extends Transport>(
getOrCreateClient: (<T extends Transport>(
transport: T,
key: string,
) => Promise<Client>;
) => Promise<Client>) & {
[Symbol.asyncDispose]: () => Promise<void>;
};
}

// ============================================================================
Expand Down
3 changes: 3 additions & 0 deletions apps/mesh/src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ const defaultPoolOptions = {
connectionTimeoutMillis: 30000,
// Allow the process to exit even with idle connections
allowExitOnIdle: true,
// Prevent runaway queries from exhausting PG connections and crashing the process.
// Monitoring JSONB queries on text columns can take 170s+ without this guard.
statement_timeout: 30000, // 30s max per query
};
function createPostgresDatabase(config: DatabaseConfig): PostgresDatabase {
const pool = new Pool({
Expand Down
5 changes: 5 additions & 0 deletions apps/mesh/src/event-bus/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ export class EventBusWorker {
do {
this.pendingNotify = false;
await this.processEvents();
// Yield briefly between coalescing iterations to prevent a CPU-burning
// busy-loop when notifications arrive faster than processing completes.
if (this.pendingNotify) {
await new Promise((resolve) => setTimeout(resolve, 50));
}
} while (this.pendingNotify);
} catch (error) {
console.error("[EventBus] Error processing events:", error);
Expand Down
24 changes: 23 additions & 1 deletion apps/mesh/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const handleAssets = createAssetHandler({
});

// Create the Hono app
const app = await createApp();
const { app, shutdown } = await createApp();

console.log("");
console.log(`${green}✓${reset} ${bold}Ready${reset}`);
Expand All @@ -60,6 +60,28 @@ Bun.serve({
development: process.env.NODE_ENV !== "production",
});

// Graceful shutdown on SIGTERM/SIGINT (container restart, deploy, Ctrl+C)
let shuttingDown = false;
const SHUTDOWN_TIMEOUT_MS = 10_000;
const handleSignal = async (signal: string) => {
if (shuttingDown) return;
shuttingDown = true;
console.log(`\n[${signal}] Shutting down...`);
const forceExit = setTimeout(() => {
console.error("[Shutdown] Timed out after 10s, forcing exit");
process.exit(1);
}, SHUTDOWN_TIMEOUT_MS);
forceExit.unref();
try {
await shutdown();
} catch (err) {
console.error("[Shutdown] Error during cleanup:", err);
}
process.exit(0);
};
process.on("SIGTERM", () => handleSignal("SIGTERM"));
process.on("SIGINT", () => handleSignal("SIGINT"));

// Internal debug server (only enabled via ENABLE_DEBUG_SERVER=true)
if (enableDebugServer) {
startDebugServer({ port: debugPort });
Expand Down
21 changes: 21 additions & 0 deletions apps/mesh/src/mcp-clients/outbound/transports/monitoring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ interface InflightRequest {
span?: Span;
}

// Max concurrent fire-and-forget DB writes. Beyond this limit, logs are
// dropped to prevent unbounded memory growth when the database is slow.
const MAX_INFLIGHT_DB_WRITES = 50;

export class MonitoringTransport extends WrapperTransport {
private inflightRequests = new Map<string | number, InflightRequest>();
private inflightDbWrites = 0;
private droppedDbWrites = 0;

constructor(
innerTransport: Transport,
Expand Down Expand Up @@ -207,6 +213,18 @@ export class MonitoringTransport extends WrapperTransport {
return;
}

// Backpressure: drop monitoring logs when too many writes are in-flight.
// This prevents unbounded memory growth when the database is slow.
if (this.inflightDbWrites >= MAX_INFLIGHT_DB_WRITES) {
this.droppedDbWrites++;
if (this.droppedDbWrites === 1 || this.droppedDbWrites % 100 === 0) {
console.warn(
`[MonitoringTransport] Backpressure: dropped ${this.droppedDbWrites} monitoring log(s)`,
);
}
return;
}

// Get organization ID from context
const organizationId = ctx.organization?.id;
if (!organizationId) {
Expand All @@ -224,6 +242,7 @@ export class MonitoringTransport extends WrapperTransport {
const properties = mergeProperties(ctx.metadata.properties, metaProps);

// Log to database
this.inflightDbWrites++;
try {
await ctx.storage.monitoring.log({
organizationId,
Expand All @@ -245,6 +264,8 @@ export class MonitoringTransport extends WrapperTransport {
} catch (error) {
// Don't throw - logging failures shouldn't break tool execution
console.error("[MonitoringTransport] Failed to log to database:", error);
} finally {
this.inflightDbWrites--;
}
}

Expand Down
4 changes: 3 additions & 1 deletion apps/mesh/src/tools/connection/connection-tools.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ describe("Connection Tools", () => {
isRunning: vi.fn().mockReturnValue(false),
} as unknown as EventBus,
createMCPProxy: vi.fn().mockResolvedValue({}),
getOrCreateClient: vi.fn().mockResolvedValue({}),
getOrCreateClient: Object.assign(vi.fn().mockResolvedValue({}), {
[Symbol.asyncDispose]: async () => {},
}),
};
});

Expand Down
Loading