From d5889712454979f1f9ac4174b9f725135401b560 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 30 Oct 2023 12:05:54 -0400 Subject: [PATCH] walk: Use unbounded channels We originally switched to bounded channels for backpressure to fix #918. However, bounded channels have a significant initialization overhead as they pre-allocate a fixed-size buffer for the messages. This implementation uses a different backpressure strategy: each thread gets a limited-size pool of WorkerResults. When the size limit is hit, the sender thread has to wait for the receiver thread to handle a result from that pool and recycle it. Inspired by [snmalloc], results are recycled by sending the boxed result over a channel back to the thread that allocated it. By allocating and freeing each WorkerResult from the same thread, allocator contention is reduced dramatically. And since we now pass results by pointer instead of by value, message passing overhead is reduced as well. Fixes #1408. [snmalloc]: https://github.com/microsoft/snmalloc --- src/exec/job.rs | 31 +++++++------- src/walk.rs | 107 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 107 insertions(+), 31 deletions(-) diff --git a/src/exec/job.rs b/src/exec/job.rs index af603cc63..379eaaf21 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -3,10 +3,9 @@ use std::sync::Mutex; use crossbeam_channel::Receiver; use crate::config::Config; -use crate::dir_entry::DirEntry; use crate::error::print_error; use crate::exit_codes::{merge_exitcodes, ExitCode}; -use crate::walk::WorkerResult; +use crate::walk::{WorkerMsg, WorkerResult}; use super::CommandSet; @@ -14,7 +13,7 @@ use super::CommandSet; /// generate a command with the supplied command template. The generated command will then /// be executed, and this process will continue until the receiver's sender has closed. pub fn job( - rx: Receiver, + rx: Receiver, cmd: &CommandSet, out_perm: &Mutex<()>, config: &Config, @@ -26,7 +25,8 @@ pub fn job( loop { // Obtain the next result from the receiver, else if the channel // has closed, exit from the loop - let dir_entry: DirEntry = match rx.recv() { + let result = rx.recv().map(WorkerMsg::take); + let dir_entry = match result { Ok(WorkerResult::Entry(dir_entry)) => dir_entry, Ok(WorkerResult::Error(err)) => { if config.show_filesystem_errors { @@ -49,18 +49,19 @@ pub fn job( merge_exitcodes(results) } -pub fn batch(rx: Receiver, cmd: &CommandSet, config: &Config) -> ExitCode { - let paths = rx - .into_iter() - .filter_map(|worker_result| match worker_result { - WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)), - WorkerResult::Error(err) => { - if config.show_filesystem_errors { - print_error(err.to_string()); +pub fn batch(rx: Receiver, cmd: &CommandSet, config: &Config) -> ExitCode { + let paths = + rx.into_iter() + .map(WorkerMsg::take) + .filter_map(|worker_result| match worker_result { + WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)), + WorkerResult::Error(err) => { + if config.show_filesystem_errors { + print_error(err.to_string()); + } + None } - None - } - }); + }); cmd.execute_batch(paths, config.batch_size, config.path_separator.as_deref()) } diff --git a/src/walk.rs b/src/walk.rs index 691c5d0f0..35c8e68d6 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -9,7 +9,7 @@ use std::thread; use std::time::{Duration, Instant}; use anyhow::{anyhow, Result}; -use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use etcetera::BaseStrategy; use ignore::overrides::{Override, OverrideBuilder}; use ignore::{self, WalkBuilder, WalkParallel, WalkState}; @@ -43,6 +43,77 @@ pub enum WorkerResult { Error(ignore::Error), } +/// Storage for a WorkerResult. +type ResultBox = Box>; + +/// A WorkerResult that recycles itself. +pub struct WorkerMsg { + inner: Option, + tx: Sender, +} + +impl WorkerMsg { + /// Create a new message. + fn new(inner: ResultBox, tx: Sender) -> Self { + Self { + inner: Some(inner), + tx, + } + } + + /// Extract the result from this message. + pub fn take(mut self) -> WorkerResult { + self.inner.as_mut().unwrap().take().unwrap() + } +} + +impl Drop for WorkerMsg { + fn drop(&mut self) { + let _ = self.tx.send(self.inner.take().unwrap()); + } +} + +/// A pool of WorkerResults that can be recycled. +struct ResultPool { + size: usize, + tx: Sender, + rx: Receiver, +} + +/// Capacity was chosen empircally to perform similarly to an unbounded channel +const RESULT_POOL_CAPACITY: usize = 0x4000; + +impl ResultPool { + /// Create an empty pool. + fn new() -> Self { + let (tx, rx) = unbounded(); + + Self { size: 0, tx, rx } + } + + /// Allocate or recycle a WorkerResult from the pool. + fn get(&mut self, result: WorkerResult) -> WorkerMsg { + let inner = if self.size < RESULT_POOL_CAPACITY { + match self.rx.try_recv() { + Ok(mut inner) => { + *inner = Some(result); + inner + } + Err(_) => { + self.size += 1; + Box::new(Some(result)) + } + } + } else { + let mut inner = self.rx.recv().unwrap(); + *inner = Some(result); + inner + }; + + WorkerMsg::new(inner, self.tx.clone()) + } +} + /// Maximum size of the output buffer before flushing results to the console const MAX_BUFFER_LENGTH: usize = 1000; /// Default duration until output buffering switches to streaming. @@ -56,8 +127,8 @@ struct ReceiverBuffer<'a, W> { quit_flag: &'a AtomicBool, /// The ^C notifier. interrupt_flag: &'a AtomicBool, - /// Receiver for worker results. - rx: Receiver, + /// Receiver for worker messages. + rx: Receiver, /// Standard output. stdout: W, /// The current buffer mode. @@ -72,7 +143,7 @@ struct ReceiverBuffer<'a, W> { impl<'a, W: Write> ReceiverBuffer<'a, W> { /// Create a new receiver buffer. - fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { + fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { let config = &state.config; let quit_flag = state.quit_flag.as_ref(); let interrupt_flag = state.interrupt_flag.as_ref(); @@ -104,7 +175,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> { /// Receive the next worker result. fn recv(&self) -> Result { - match self.mode { + let result = match self.mode { ReceiverMode::Buffering => { // Wait at most until we should switch to streaming self.rx.recv_deadline(self.deadline) @@ -113,7 +184,8 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> { // Wait however long it takes for a result Ok(self.rx.recv()?) } - } + }; + result.map(WorkerMsg::take) } /// Wait for a result or state change. @@ -319,7 +391,7 @@ impl WorkerState { /// Run the receiver work, either on this thread or a pool of background /// threads (for --exec). - fn receive(&self, rx: Receiver) -> ExitCode { + fn receive(&self, rx: Receiver) -> ExitCode { let config = &self.config; // This will be set to `Some` if the `--exec` argument was supplied. @@ -355,12 +427,13 @@ impl WorkerState { } /// Spawn the sender threads. - fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { + fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { walker.run(|| { let patterns = &self.patterns; let config = &self.config; let quit_flag = self.quit_flag.as_ref(); let tx = tx.clone(); + let mut pool = ResultPool::new(); Box::new(move |entry| { if quit_flag.load(Ordering::Relaxed) { @@ -387,20 +460,22 @@ impl WorkerState { DirEntry::broken_symlink(path) } _ => { - return match tx.send(WorkerResult::Error(ignore::Error::WithPath { + let result = pool.get(WorkerResult::Error(ignore::Error::WithPath { path, err: inner_err, - })) { + })); + return match tx.send(result) { Ok(_) => WalkState::Continue, Err(_) => WalkState::Quit, - } + }; } }, Err(err) => { - return match tx.send(WorkerResult::Error(err)) { + let result = pool.get(WorkerResult::Error(err)); + return match tx.send(result) { Ok(_) => WalkState::Continue, Err(_) => WalkState::Quit, - } + }; } }; @@ -509,7 +584,8 @@ impl WorkerState { } } - let send_result = tx.send(WorkerResult::Entry(entry)); + let result = pool.get(WorkerResult::Entry(entry)); + let send_result = tx.send(result); if send_result.is_err() { return WalkState::Quit; @@ -545,8 +621,7 @@ impl WorkerState { .unwrap(); } - // Channel capacity was chosen empircally to perform similarly to an unbounded channel - let (tx, rx) = bounded(0x4000 * config.threads); + let (tx, rx) = unbounded(); let exit_code = thread::scope(|scope| { // Spawn the receiver thread(s)