Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions crates/beamtalk-cli/src/commands/workspace/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,24 @@ use super::storage::{
workspace_exists, workspace_id_for, workspaces_base_dir,
};

/// Create a new workspace.
pub fn create_workspace(
project_path: &Path,
workspace_name: Option<&str>,
) -> Result<WorkspaceMetadata> {
let workspace_id = workspace_id_for(project_path, workspace_name)?;

// Acquire exclusive lock to prevent TOCTOU race on concurrent creation.
// The lock is released when `_lock` is dropped at end of scope.
let _lock = acquire_workspace_lock(&workspace_id)?;

/// Inner logic for workspace creation, called under an already-held lock.
///
/// Does NOT acquire the lock itself — callers are responsible for holding
/// `acquire_workspace_lock` before calling this function.
fn create_workspace_impl(workspace_id: &str, project_path: &Path) -> Result<WorkspaceMetadata> {
// Re-check under lock — another process may have created the workspace
// while we were waiting for the lock.
if workspace_exists(&workspace_id)? {
return get_workspace_metadata(&workspace_id);
if workspace_exists(workspace_id)? {
return get_workspace_metadata(workspace_id);
}

// Create workspace directory
let dir = workspace_dir(&workspace_id)?;
let dir = workspace_dir(workspace_id)?;
fs::create_dir_all(&dir).into_diagnostic()?;

// Generate and save cookie
let cookie = generate_cookie();
save_workspace_cookie(&workspace_id, &cookie)?;
save_workspace_cookie(workspace_id, &cookie)?;

// Create metadata
let now = SystemTime::now()
Expand All @@ -57,7 +51,7 @@ pub fn create_workspace(
.as_secs();

let metadata = WorkspaceMetadata {
workspace_id: workspace_id.clone(),
workspace_id: workspace_id.to_string(),
project_path: project_path.to_path_buf(),
created_at: now,
};
Expand All @@ -67,6 +61,20 @@ pub fn create_workspace(
Ok(metadata)
}

/// Create a new workspace.
pub fn create_workspace(
project_path: &Path,
workspace_name: Option<&str>,
) -> Result<WorkspaceMetadata> {
let workspace_id = workspace_id_for(project_path, workspace_name)?;

// Acquire exclusive lock to prevent TOCTOU race on concurrent creation.
// The lock is released when `_lock` is dropped at end of scope.
let _lock = acquire_workspace_lock(&workspace_id)?;

create_workspace_impl(&workspace_id, project_path)
}

/// Get or start a workspace node for the current directory.
/// Returns (`NodeInfo`, bool) where bool indicates if a new node was started.
#[allow(clippy::too_many_arguments)] // delegates to start_detached_node with same params
Expand All @@ -86,11 +94,27 @@ pub fn get_or_start_workspace(
ssl_dist_optfile: Option<&Path>,
web_port: Option<u16>,
) -> Result<(NodeInfo, bool, String)> {
// Create workspace if it doesn't exist
let metadata = create_workspace(project_path, workspace_name)?;
let workspace_id = metadata.workspace_id.clone();
let workspace_id = workspace_id_for(project_path, workspace_name)?;

// Acquire exclusive lock covering the full check-is-running + start sequence.
//
// Without this lock, two concurrent callers (e.g. IDE extension + terminal both
// starting `beamtalk repl`) can both observe "not running" and both attempt
// `start_detached_node`, causing the second call to fail when EPMD rejects the
// duplicate node name.
//
// With the lock:
// - The first caller acquires it, creates the workspace, starts the node, releases the lock.
// - The second caller blocks until the lock is released, then acquires it, finds the
// node already running (written by the first caller), and returns it directly.
//
// The lock is released when `_lock` is dropped at end of scope (including error paths).
let _lock = acquire_workspace_lock(&workspace_id)?;

// Create workspace if it doesn't exist (under lock)
create_workspace_impl(&workspace_id, project_path)?;

// Check if node is already running
// Check if node is already running (under lock)
if let Some(node_info) = get_node_info(&workspace_id)? {
if is_node_running(&node_info) {
return Ok((node_info, false, workspace_id)); // Existing node
Expand All @@ -100,7 +124,7 @@ pub fn get_or_start_workspace(
cleanup_stale_node_info(&workspace_id)?;
}

// Start new detached node
// Start new detached node (under lock — released after node_info is written)
let node_info = start_detached_node(
&workspace_id,
port,
Expand Down
97 changes: 97 additions & 0 deletions crates/beamtalk-cli/src/commands/workspace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,103 @@ mod tests {
assert!(is_node_running(&info3), "restarted node should be running");
}

/// Regression test for BT-970: concurrent `get_or_start_workspace` calls on the
/// same workspace ID must not both attempt to start a node. The workspace lock must
/// cover the full check-is-running + start sequence so the second caller discovers
/// the node already running after the first caller starts it.
#[test]
#[cfg(unix)]
#[ignore = "integration test — requires Erlang/OTP runtime"]
#[serial(workspace_integration)]
fn test_concurrent_get_or_start_workspace_integration() {
use std::sync::{Arc, Barrier};
use std::thread;

let tw = TestWorkspace::new("integ_concurrent");
let project_path = std::env::current_dir().unwrap();

let (runtime, workspace, jsx, compiler, stdlib, extra_paths) = beam_dirs_for_tests();

// Use an Arc<Barrier> so both threads enter get_or_start_workspace at the
// same time, maximising the chance of hitting the race window.
let barrier = Arc::new(Barrier::new(2));

let handles: Vec<_> = (0..2)
.map(|_| {
let barrier = Arc::clone(&barrier);
let project_path = project_path.clone();
let tw_id = tw.id.clone();
let runtime = runtime.clone();
let workspace = workspace.clone();
let jsx = jsx.clone();
let compiler = compiler.clone();
let stdlib = stdlib.clone();
let extra_paths = extra_paths.clone();
thread::spawn(move || {
barrier.wait();
get_or_start_workspace(
&project_path,
Some(&tw_id),
0,
&runtime,
&workspace,
&jsx,
&compiler,
&stdlib,
&extra_paths,
false,
Some(60),
None,
None, // ssl_dist_optfile
None, // web_port
)
})
})
.collect();

let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();

// Both calls must succeed
for result in &results {
assert!(
result.is_ok(),
"get_or_start_workspace should succeed, got: {:?}",
result.as_ref().err()
);
}

let infos: Vec<_> = results.into_iter().map(|r| r.unwrap()).collect();

// Safety net: ensure the node is killed even if an assertion below panics.
let _guard = NodeGuard {
pid: infos[0].0.pid,
};

// Exactly one caller must have started a new node; the other must have joined it
let started_count = infos.iter().filter(|(_, started, _)| *started).count();
assert_eq!(
started_count, 1,
"exactly one caller should have started the node; got started_count={started_count}"
);

// Both callers must return the same node (same PID and port)
let (info0, _, id0) = &infos[0];
let (info1, _, id1) = &infos[1];
assert_eq!(id0, id1, "both callers must return the same workspace ID");
assert_eq!(
info0.pid, info1.pid,
"both callers must return the same node PID"
);
assert_eq!(
info0.port, info1.port,
"both callers must return the same node port"
);
assert!(
is_node_running(info0),
"node should be running after concurrent start"
);
}

/// Regression test for BT-967: stale port file from a previous aborted startup
/// causes `start_detached_node` to connect to the wrong BEAM node, producing
/// auth failures in `wait_for_tcp_ready` for 30 seconds before timing out.
Expand Down