diff --git a/apps/mesh/src/database/index.ts b/apps/mesh/src/database/index.ts index 05cbaaf42b..cb84d54233 100644 --- a/apps/mesh/src/database/index.ts +++ b/apps/mesh/src/database/index.ts @@ -118,20 +118,47 @@ const defaultPoolOptions = { // Keep connections alive to avoid reconnection latency across regions keepAlive: true, keepAliveInitialDelayMillis: 10000, - // Allow connections to stay idle longer (5 min instead of default 10s) - // This reduces reconnection overhead for cross-region databases - idleTimeoutMillis: 300000, + // FIX: Reduced from 300000 (5min) to 30000 (30s). + // In Kubernetes, pods are ephemeral — holding idle connections for 5min + // causes burst "Connection reset by peer" on RDS when pods are terminated. + // 30s releases idle connections proactively before pod shutdown. + idleTimeoutMillis: 30000, // Increase connection timeout for high-latency networks (30s) connectionTimeoutMillis: 30000, // Allow the process to exit even with idle connections allowExitOnIdle: true, + // PROTECTION: Kill queries that run too long (prevents runaway queries) + // Configurable via DATABASE_PG_STATEMENT_TIMEOUT (default 30s) + statement_timeout: + parseInt(process.env.DATABASE_PG_STATEMENT_TIMEOUT ?? "", 10) || 30000, + // PROTECTION: Kill idle transactions (prevents connection hoarding) + // Transactions sitting open lock resources and prevent autovacuum + idle_in_transaction_session_timeout: + parseInt(process.env.DATABASE_PG_IDLE_TX_TIMEOUT ?? "", 10) || 10000, }; + function createPostgresDatabase(config: DatabaseConfig): PostgresDatabase { + const maxConnections = + config.options?.maxConnections ?? + (process.env.DATABASE_PG_MAX_CONNECTIONS + ? parseInt(process.env.DATABASE_PG_MAX_CONNECTIONS, 10) + : 5); // FIX: Reduced from 10. With multiple K8s pods, total = pods × max. + // Tune via DATABASE_PG_MAX_CONNECTIONS env var without code changes. + const pool = new Pool({ connectionString: config.connectionString, - max: config.options?.maxConnections || 10, + max: maxConnections, ssl: process.env.DATABASE_PG_SSL === "true" ? true : false, ...defaultPoolOptions, + // Set default session parameters for all connections + // These apply via "SET" on connection initialization + idle_in_transaction_session_timeout: + parseInt(process.env.DATABASE_PG_IDLE_TX_TIMEOUT ?? "", 10) || 10000, + }); + + // FIX: Handle async pool errors to prevent silent process crashes + pool.on("error", (err) => { + console.error("[db] Unexpected pool client error:", err); }); const dialect = new PostgresDialect({ pool }); @@ -288,24 +315,43 @@ export function getDatabaseUrl(): string { /** * Create a Kysely dialect for the given database URL * This allows you to create a dialect without creating the full MeshDatabase + * + * FIX: Now a singleton. Previously, every call created a new Pool that was + * never closed, leaking connections on every invocation. */ +let dialectInstance: Dialect | null = null; + export function getDbDialect(databaseUrl?: string): Dialect { - const config = parseDatabaseUrl(databaseUrl); + if (!dialectInstance) { + const config = parseDatabaseUrl(databaseUrl ?? getDatabaseUrl()); - if (config.type === "postgres") { - return new PostgresDialect({ - pool: new Pool({ + if (config.type === "postgres") { + const maxConnections = process.env.DATABASE_PG_MAX_CONNECTIONS + ? parseInt(process.env.DATABASE_PG_MAX_CONNECTIONS, 10) + : 5; + + const pool = new Pool({ connectionString: config.connectionString, - max: config.options?.maxConnections || 10, + max: maxConnections, ssl: process.env.DATABASE_PG_SSL === "true" ? true : false, ...defaultPoolOptions, - }), - }); + idle_in_transaction_session_timeout: + parseInt(process.env.DATABASE_PG_IDLE_TX_TIMEOUT ?? "", 10) || 10000, + }); + + pool.on("error", (err) => { + console.error("[db] Unexpected dialect pool client error:", err); + }); + + dialectInstance = new PostgresDialect({ pool }); + } else { + let dbPath = extractSqlitePath(config.connectionString); + dbPath = ensureSqliteDirectory(dbPath); + dialectInstance = new BunWorkerDialect({ url: dbPath || ":memory:" }); + } } - let dbPath = extractSqlitePath(config.connectionString); - dbPath = ensureSqliteDirectory(dbPath); - return new BunWorkerDialect({ url: dbPath || ":memory:" }); + return dialectInstance; } /**