Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 107 additions & 7 deletions apps/mesh/src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@
* - For PGlite: the PGlite instance (for lifecycle management)
*/

import { existsSync, mkdirSync, readFileSync, rmSync } from "fs";
import {
closeSync,
constants as fsConstants,
existsSync,
mkdirSync,
openSync,
readFileSync,
rmSync,
writeFileSync,
} from "fs";
import { type Dialect, Kysely, LogEvent, PostgresDialect } from "kysely";
import { PGlite } from "@electric-sql/pglite";
import { KyselyPGlite } from "kysely-pglite";
Expand Down Expand Up @@ -173,34 +182,119 @@ function clearStalePGliteLock(dataDir: string): void {
const raw = readFileSync(pidFile, "utf8").trim();
const pid = parseInt(raw.split("\n")[0] ?? "", 10);

// Negative or NaN PID is always stale
// Negative or NaN PID is always stale (PGlite WASM uses -42)
const isAlive =
pid > 0 &&
(() => {
try {
process.kill(pid, 0);
return true;
} catch (err) {
// EPERM means the process exists but we cannot signal it — treat as alive.
// Only ESRCH means no such process, i.e. genuinely stale.
return (err as NodeJS.ErrnoException).code === "EPERM";
}
})();

if (!isAlive) {
rmSync(pidFile);
console.warn(
`Removed stale PGlite lock file (PID ${pid} not running): ${pidFile}`,
);
}
} catch {
// If we can't read/parse the pid file, leave it alone
}
}

// ============================================================================
// Process-level PGlite Lock
// ============================================================================
// PGlite (WASM) writes postmaster.pid with PID -42, which is meaningless for
// cross-process exclusion. We maintain our own lock with a real PID so
// concurrent processes (e.g. two worktrees) detect each other instead of
// silently corrupting the database.

const MESH_LOCK_FILE = ".mesh.lock";
let meshLockCleanup: (() => void) | null = null;

function isProcessAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch (err) {
return (err as NodeJS.ErrnoException).code === "EPERM";
}
}

function acquirePGliteLock(dataDir: string): void {
// Skip lock in test environment and CI
if (env.NODE_ENV === "test" || process.env.CI) return;

// Place the lock file OUTSIDE the PGlite data directory to avoid
// interfering with PGlite's internal data directory structure.
const lockPath = dataDir + "." + MESH_LOCK_FILE;

if (existsSync(lockPath)) {
try {
const raw = readFileSync(lockPath, "utf8").trim();
const pid = parseInt(raw, 10);

if (pid > 0 && isProcessAlive(pid)) {
throw new Error(
`\n🔒 Another mesh process (PID ${pid}) is using the database at ${dataDir}\n` +
` Stop the other process first, or set DATA_DIR to use a separate database.\n` +
` Example: DATA_DIR=~/deco/other bun run dev\n`,
);
}
// Stale lock from crashed process — remove it
rmSync(lockPath);
} catch (err) {
if (
err instanceof Error &&
err.message.includes("Another mesh process")
) {
throw err;
}
try {
rmSync(lockPath);
} catch {}
}
}

// Write our real PID atomically (O_EXCL fails if file already exists)
try {
const fd = openSync(
lockPath,
fsConstants.O_WRONLY | fsConstants.O_CREAT | fsConstants.O_EXCL,
);
writeFileSync(fd, String(process.pid));
closeSync(fd);
} catch (err) {
if ((err as NodeJS.ErrnoException).code === "EEXIST") {
throw new Error(
`\n🔒 Another mesh process acquired the lock at ${dataDir}\n` +
` Stop the other process first, or set DATA_DIR to use a separate database.\n`,
);
}
throw err;
}

// Clean up on exit
const cleanup = () => {
try {
if (existsSync(lockPath)) {
const content = readFileSync(lockPath, "utf8").trim();
if (content === String(process.pid)) {
rmSync(lockPath);
}
}
} catch {}
};

meshLockCleanup = cleanup;
process.on("exit", cleanup);
}

function createPGliteInstance(dataDir: string): PGlite {
const resolvedDir = ensurePGliteDirectory(dataDir);
if (resolvedDir !== ":memory:") {
acquirePGliteLock(resolvedDir);
clearStalePGliteLock(resolvedDir);
}
return new PGlite(resolvedDir === ":memory:" ? undefined : resolvedDir);
Expand Down Expand Up @@ -356,6 +450,12 @@ export async function closeDatabase(database: MeshDatabase): Promise<void> {
// so subsequent getDb() calls create a fresh instance.
if (database === dbInstance) {
dbInstance = null;
// Release the PGlite lock and unregister the exit handler
if (meshLockCleanup) {
process.removeListener("exit", meshLockCleanup);
meshLockCleanup();
meshLockCleanup = null;
}
}
}

Expand Down
Loading