From 4d9dd3712de118f853b6a38cec1b1ef277ea6638 Mon Sep 17 00:00:00 2001 From: Ross Sullivan Date: Sun, 4 Jan 2026 20:09:11 +0900 Subject: [PATCH 1/2] refactor: Use trait for flock blocking messages --- src/cargo/util/context/mod.rs | 10 ++++++- src/cargo/util/flock.rs | 50 ++++++++++++++++++++--------------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/cargo/util/context/mod.rs b/src/cargo/util/context/mod.rs index d6d9a9e9f05..69106335b53 100644 --- a/src/cargo/util/context/mod.rs +++ b/src/cargo/util/context/mod.rs @@ -86,11 +86,12 @@ use crate::sources::CRATES_IO_REGISTRY; use crate::util::OnceExt as _; use crate::util::cache_lock::{CacheLock, CacheLockMode, CacheLocker}; use crate::util::errors::CargoResult; +use crate::util::flock::ReportBlocking; use crate::util::network::http::configure_http_handle; use crate::util::network::http::http_handle; use crate::util::restricted_names::is_glob_pattern; use crate::util::{CanonicalUrl, closest_msg, internal}; -use crate::util::{Filesystem, IntoUrl, IntoUrlWithBase, Rustc}; +use crate::util::{Filesystem, IntoUrl, IntoUrlWithBase, Rustc, style}; use annotate_snippets::Level; use anyhow::{Context as _, anyhow, bail, format_err}; @@ -2123,6 +2124,13 @@ impl GlobalContext { } } +impl ReportBlocking for &GlobalContext { + fn blocking(&self, msg: &str) -> CargoResult<()> { + self.shell() + .status_with_color("Blocking", &msg, &style::NOTE) + } +} + pub fn homedir(cwd: &Path) -> Option { ::home::cargo_home_with_cwd(cwd).ok() } diff --git a/src/cargo/util/flock.rs b/src/cargo/util/flock.rs index 5e9d3549040..400e9cc068e 100644 --- a/src/cargo/util/flock.rs +++ b/src/cargo/util/flock.rs @@ -13,9 +13,7 @@ use std::io; use std::io::{Read, Seek, SeekFrom, Write}; use std::path::{Display, Path, PathBuf}; -use crate::util::GlobalContext; use crate::util::errors::CargoResult; -use crate::util::style; use anyhow::Context as _; use cargo_util::paths; @@ -223,14 +221,14 @@ impl Filesystem { /// This function will create a file at `path` if it doesn't already exist /// (including intermediate directories), and then it will acquire an /// exclusive lock on `path`. If the process must block waiting for the - /// lock, the `msg` is printed to [`GlobalContext`]. + /// lock, the `msg` is shown to the user via [`ReportBlocking`]. /// /// The returned file can be accessed to look at the path and also has /// read/write access to the underlying file. pub fn open_rw_exclusive_create

( &self, path: P, - gctx: &GlobalContext, + report_blocking: impl ReportBlocking, msg: &str, ) -> CargoResult where @@ -239,7 +237,7 @@ impl Filesystem { let mut opts = OpenOptions::new(); opts.read(true).write(true).create(true); let (path, f) = self.open(path.as_ref(), &opts, true)?; - acquire(gctx, msg, &path, &|| f.try_lock(), &|| f.lock())?; + acquire(report_blocking, msg, &path, &|| f.try_lock(), &|| f.lock())?; Ok(FileLock { f: Some(f), path }) } @@ -265,7 +263,7 @@ impl Filesystem { /// /// This function will fail if `path` doesn't already exist, but if it does /// then it will acquire a shared lock on `path`. If the process must block - /// waiting for the lock, the `msg` is printed to [`GlobalContext`]. + /// waiting for the lock, the `msg` is shown to the user via [`ReportBlocking`]. /// /// The returned file can be accessed to look at the path and also has read /// access to the underlying file. Any writes to the file will return an @@ -273,16 +271,20 @@ impl Filesystem { pub fn open_ro_shared

( &self, path: P, - gctx: &GlobalContext, + report_blocking: impl ReportBlocking, msg: &str, ) -> CargoResult where P: AsRef, { let (path, f) = self.open(path.as_ref(), &OpenOptions::new().read(true), false)?; - acquire(gctx, msg, &path, &|| f.try_lock_shared(), &|| { - f.lock_shared() - })?; + acquire( + report_blocking, + msg, + &path, + &|| f.try_lock_shared(), + &|| f.lock_shared(), + )?; Ok(FileLock { f: Some(f), path }) } @@ -294,15 +296,19 @@ impl Filesystem { pub fn open_ro_shared_create>( &self, path: P, - gctx: &GlobalContext, + report_blocking: impl ReportBlocking, msg: &str, ) -> CargoResult { let mut opts = OpenOptions::new(); opts.read(true).write(true).create(true); let (path, f) = self.open(path.as_ref(), &opts, true)?; - acquire(gctx, msg, &path, &|| f.try_lock_shared(), &|| { - f.lock_shared() - })?; + acquire( + report_blocking, + msg, + &path, + &|| f.try_lock_shared(), + &|| f.lock_shared(), + )?; Ok(FileLock { f: Some(f), path }) } @@ -400,27 +406,23 @@ fn try_acquire(path: &Path, lock_try: &dyn Fn() -> Result<(), TryLockError>) -> /// This function will acquire the lock on a `path`, printing out a nice message /// to the console if we have to wait for it. It will first attempt to use `try` /// to acquire a lock on the crate, and in the case of contention it will emit a -/// status message based on `msg` to [`GlobalContext`]'s shell, and then use `block` to +/// status message based on `msg` to [`ReportBlocking`], and then use `block` to /// block waiting to acquire a lock. /// /// Returns an error if the lock could not be acquired or if any error other /// than a contention error happens. -fn acquire( - gctx: &GlobalContext, +pub fn acquire( + report_blocking: impl ReportBlocking, msg: &str, path: &Path, lock_try: &dyn Fn() -> Result<(), TryLockError>, lock_block: &dyn Fn() -> io::Result<()>, ) -> CargoResult<()> { - // Ensure `shell` is not already in use, - // regardless of whether we hit contention or not - gctx.debug_assert_shell_not_borrowed(); if try_acquire(path, lock_try)? { return Ok(()); } let msg = format!("waiting for file lock on {}", msg); - gctx.shell() - .status_with_color("Blocking", &msg, &style::NOTE)?; + report_blocking.blocking(&msg)?; lock_block().with_context(|| format!("failed to lock file: {}", path.display()))?; Ok(()) @@ -469,3 +471,7 @@ fn error_unsupported(err: &std::io::Error) -> bool { _ => err.kind() == std::io::ErrorKind::Unsupported, } } + +pub trait ReportBlocking { + fn blocking(&self, msg: &str) -> CargoResult<()>; +} From 256ab644b1ea87666f980ccce13c924d8ddd94e4 Mon Sep 17 00:00:00 2001 From: Ross Sullivan Date: Sun, 4 Jan 2026 20:32:02 +0900 Subject: [PATCH 2/2] feat: Added better blocking messages for fine grain locking --- .../core/compiler/job_queue/job_state.rs | 14 ++++- src/cargo/core/compiler/job_queue/mod.rs | 23 +++++++- src/cargo/core/compiler/locking.rs | 55 ++++++++++++++----- 3 files changed, 74 insertions(+), 18 deletions(-) diff --git a/src/cargo/core/compiler/job_queue/job_state.rs b/src/cargo/core/compiler/job_queue/job_state.rs index 06a672b611d..69f02b8c6d1 100644 --- a/src/cargo/core/compiler/job_queue/job_state.rs +++ b/src/cargo/core/compiler/job_queue/job_state.rs @@ -8,6 +8,7 @@ use crate::core::compiler::future_incompat::FutureBreakageItem; use crate::core::compiler::locking::LockKey; use crate::core::compiler::timings::SectionTiming; use crate::util::Queue; +use crate::util::flock::ReportBlocking; use crate::{CargoResult, core::compiler::locking::LockManager}; use super::{Artifact, DiagDedupe, Job, JobId, Message}; @@ -148,11 +149,11 @@ impl<'a, 'gctx> JobState<'a, 'gctx> { } pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> { - self.lock_manager.lock(lock) + self.lock_manager.lock(lock, self) } pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> { - self.lock_manager.downgrade_to_shared(lock) + self.lock_manager.downgrade_to_shared(lock, self) } pub fn on_section_timing_emitted(&self, section: SectionTiming) { @@ -213,3 +214,12 @@ impl<'a, 'gctx> JobState<'a, 'gctx> { .push(Message::FutureIncompatReport(self.id, report)); } } + +impl ReportBlocking for &JobState<'_, '_> { + fn blocking(&self, msg: &str) -> CargoResult<()> { + self.messages.push_bounded(Message::Blocking { + msg: msg.to_string(), + }); + Ok(()) + } +} diff --git a/src/cargo/core/compiler/job_queue/mod.rs b/src/cargo/core/compiler/job_queue/mod.rs index fdaceac904a..3b9ef408a42 100644 --- a/src/cargo/core/compiler/job_queue/mod.rs +++ b/src/cargo/core/compiler/job_queue/mod.rs @@ -144,7 +144,7 @@ use crate::util::context::WarningHandling; use crate::util::diagnostic_server::{self, DiagnosticPrinter}; use crate::util::errors::AlreadyPrintedError; use crate::util::machine_message::{self, Message as _}; -use crate::util::{self, internal}; +use crate::util::{self, internal, style}; use crate::util::{DependencyQueue, GlobalContext, Progress, ProgressStyle, Queue}; /// This structure is backed by the `DependencyQueue` type and manages the @@ -320,6 +320,20 @@ impl<'gctx> DiagDedupe<'gctx> { shell.err().write_all(b"\n")?; Ok(true) } + + /// Emits a flock blocking message + /// + /// Returns `true` if the message was emitted, or `false` if it was + /// suppressed for being a duplicate. + fn emit_blocking(&self, msg: &str) -> CargoResult { + let h = util::hash_u64(msg); + if !self.seen.borrow_mut().insert(h) { + return Ok(false); + } + let mut shell = self.gctx.shell(); + shell.status_with_color("Blocking", &msg, &style::NOTE)?; + Ok(true) + } } /// Possible artifacts that can be produced by compilations, used as edge values @@ -371,6 +385,10 @@ enum Message { warning: String, }, + Blocking { + msg: String, + }, + FixDiagnostic(diagnostic_server::Message), Token(io::Result), Finish(JobId, Artifact, CargoResult<()>), @@ -643,6 +661,9 @@ impl<'gctx> DrainState<'gctx> { let fixable = false; self.bump_warning_count(id, lint, emitted, fixable); } + Message::Blocking { msg } => { + self.diag_dedupe.emit_blocking(&msg)?; + } Message::WarningCount { id, lint, diff --git a/src/cargo/core/compiler/locking.rs b/src/cargo/core/compiler/locking.rs index 155a4dc364e..2f2a1911711 100644 --- a/src/cargo/core/compiler/locking.rs +++ b/src/cargo/core/compiler/locking.rs @@ -3,7 +3,11 @@ use crate::{ CargoResult, core::compiler::{BuildRunner, Unit}, - util::{FileLock, Filesystem}, + util::{ + FileLock, Filesystem, + flock::{self, ReportBlocking}, + interning::InternedString, + }, }; use anyhow::bail; use std::{ @@ -46,23 +50,26 @@ impl LockManager { lock.file().lock_shared()?; } else { let fs = Filesystem::new(key.0.clone()); - let lock_msg = format!( - "{} ({})", - unit.pkg.name(), - build_runner.files().unit_hash(unit) - ); + let lock_msg = key.msg(); let lock = fs.open_ro_shared_create(&key.0, build_runner.bcx.gctx, &lock_msg)?; locks.insert(key.clone(), lock); } Ok(key) } - - #[instrument(skip(self))] - pub fn lock(&self, key: &LockKey) -> CargoResult<()> { + #[instrument(skip(self, report_blocking))] + pub fn lock(&self, key: &LockKey, report_blocking: impl ReportBlocking) -> CargoResult<()> { let mut locks = self.locks.lock().unwrap(); if let Some(lock) = locks.get_mut(&key) { - lock.file().lock()?; + let file = lock.file(); + + flock::acquire( + report_blocking, + &key.msg(), + &key.0, + &|| file.try_lock(), + &|| file.lock(), + )?; } else { bail!("lock was not found in lock manager: {key}"); } @@ -71,13 +78,24 @@ impl LockManager { } /// Upgrades an existing exclusive lock into a shared lock. - #[instrument(skip(self))] - pub fn downgrade_to_shared(&self, key: &LockKey) -> CargoResult<()> { + #[instrument(skip(self, report_blocking))] + pub fn downgrade_to_shared( + &self, + key: &LockKey, + report_blocking: impl ReportBlocking, + ) -> CargoResult<()> { let mut locks = self.locks.lock().unwrap(); let Some(lock) = locks.get_mut(key) else { bail!("lock was not found in lock manager: {key}"); }; - lock.file().lock_shared()?; + let file = lock.file(); + flock::acquire( + report_blocking, + &key.msg(), + &key.0, + &|| file.try_lock_shared(), + &|| file.lock_shared(), + )?; Ok(()) } @@ -93,11 +111,18 @@ impl LockManager { } #[derive(Debug, Clone, Hash, Eq, PartialEq)] -pub struct LockKey(PathBuf); +pub struct LockKey(PathBuf, InternedString, String); impl LockKey { fn from_unit(build_runner: &BuildRunner<'_, '_>, unit: &Unit) -> Self { - Self(build_runner.files().build_unit_lock(unit)) + let name = unit.pkg.name(); + let hash = build_runner.files().unit_hash(unit); + let path = build_runner.files().build_unit_lock(unit); + Self(path, name, hash) + } + + fn msg(&self) -> String { + format!("{} ({})", self.1, self.2) } }