diff --git a/src/exec/job.rs b/src/exec/job.rs index af603cc63..379eaaf21 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -3,10 +3,9 @@ use std::sync::Mutex; use crossbeam_channel::Receiver; use crate::config::Config; -use crate::dir_entry::DirEntry; use crate::error::print_error; use crate::exit_codes::{merge_exitcodes, ExitCode}; -use crate::walk::WorkerResult; +use crate::walk::{WorkerMsg, WorkerResult}; use super::CommandSet; @@ -14,7 +13,7 @@ use super::CommandSet; /// generate a command with the supplied command template. The generated command will then /// be executed, and this process will continue until the receiver's sender has closed. pub fn job( - rx: Receiver, + rx: Receiver, cmd: &CommandSet, out_perm: &Mutex<()>, config: &Config, @@ -26,7 +25,8 @@ pub fn job( loop { // Obtain the next result from the receiver, else if the channel // has closed, exit from the loop - let dir_entry: DirEntry = match rx.recv() { + let result = rx.recv().map(WorkerMsg::take); + let dir_entry = match result { Ok(WorkerResult::Entry(dir_entry)) => dir_entry, Ok(WorkerResult::Error(err)) => { if config.show_filesystem_errors { @@ -49,18 +49,19 @@ pub fn job( merge_exitcodes(results) } -pub fn batch(rx: Receiver, cmd: &CommandSet, config: &Config) -> ExitCode { - let paths = rx - .into_iter() - .filter_map(|worker_result| match worker_result { - WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)), - WorkerResult::Error(err) => { - if config.show_filesystem_errors { - print_error(err.to_string()); +pub fn batch(rx: Receiver, cmd: &CommandSet, config: &Config) -> ExitCode { + let paths = + rx.into_iter() + .map(WorkerMsg::take) + .filter_map(|worker_result| match worker_result { + WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)), + WorkerResult::Error(err) => { + if config.show_filesystem_errors { + print_error(err.to_string()); + } + None } - None - } - }); + }); cmd.execute_batch(paths, config.batch_size, config.path_separator.as_deref()) } diff --git a/src/walk.rs b/src/walk.rs index 64eef2bf5..258bafb59 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -9,7 +9,7 @@ use std::thread; use std::time::{Duration, Instant}; use anyhow::{anyhow, Result}; -use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use etcetera::BaseStrategy; use ignore::overrides::{Override, OverrideBuilder}; use ignore::{self, WalkBuilder, WalkParallel, WalkState}; @@ -43,6 +43,77 @@ pub enum WorkerResult { Error(ignore::Error), } +/// Storage for a WorkerResult. +type ResultBox = Box>; + +/// A WorkerResult that recycles itself. +pub struct WorkerMsg { + inner: Option, + tx: Sender, +} + +impl WorkerMsg { + /// Create a new message. + fn new(inner: ResultBox, tx: Sender) -> Self { + Self { + inner: Some(inner), + tx, + } + } + + /// Extract the result from this message. + pub fn take(mut self) -> WorkerResult { + self.inner.as_mut().unwrap().take().unwrap() + } +} + +impl Drop for WorkerMsg { + fn drop(&mut self) { + let _ = self.tx.send(self.inner.take().unwrap()); + } +} + +/// A pool of WorkerResults that can be recycled. +struct ResultPool { + size: usize, + tx: Sender, + rx: Receiver, +} + +/// Capacity was chosen empircally to perform similarly to an unbounded channel +const RESULT_POOL_CAPACITY: usize = 0x4000; + +impl ResultPool { + /// Create an empty pool. + fn new() -> Self { + let (tx, rx) = unbounded(); + + Self { size: 0, tx, rx } + } + + /// Allocate or recycle a WorkerResult from the pool. + fn get(&mut self, result: WorkerResult) -> WorkerMsg { + let inner = if self.size < RESULT_POOL_CAPACITY { + match self.rx.try_recv() { + Ok(mut inner) => { + *inner = Some(result); + inner + } + Err(_) => { + self.size += 1; + Box::new(Some(result)) + } + } + } else { + let mut inner = self.rx.recv().unwrap(); + *inner = Some(result); + inner + }; + + WorkerMsg::new(inner, self.tx.clone()) + } +} + /// Maximum size of the output buffer before flushing results to the console const MAX_BUFFER_LENGTH: usize = 1000; /// Default duration until output buffering switches to streaming. @@ -56,8 +127,8 @@ struct ReceiverBuffer<'a, W> { quit_flag: &'a AtomicBool, /// The ^C notifier. interrupt_flag: &'a AtomicBool, - /// Receiver for worker results. - rx: Receiver, + /// Receiver for worker messages. + rx: Receiver, /// Standard output. stdout: W, /// The current buffer mode. @@ -72,7 +143,7 @@ struct ReceiverBuffer<'a, W> { impl<'a, W: Write> ReceiverBuffer<'a, W> { /// Create a new receiver buffer. - fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { + fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { let config = &state.config; let quit_flag = state.quit_flag.as_ref(); let interrupt_flag = state.interrupt_flag.as_ref(); @@ -104,7 +175,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> { /// Receive the next worker result. fn recv(&self) -> Result { - match self.mode { + let result = match self.mode { ReceiverMode::Buffering => { // Wait at most until we should switch to streaming self.rx.recv_deadline(self.deadline) @@ -113,7 +184,8 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> { // Wait however long it takes for a result Ok(self.rx.recv()?) } - } + }; + result.map(WorkerMsg::take) } /// Wait for a result or state change. @@ -319,7 +391,7 @@ impl WorkerState { /// Run the receiver work, either on this thread or a pool of background /// threads (for --exec). - fn receive(&self, rx: Receiver) -> ExitCode { + fn receive(&self, rx: Receiver) -> ExitCode { let config = &self.config; // This will be set to `Some` if the `--exec` argument was supplied. @@ -355,12 +427,13 @@ impl WorkerState { } /// Spawn the sender threads. - fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { + fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { walker.run(|| { let patterns = &self.patterns; let config = &self.config; let quit_flag = self.quit_flag.as_ref(); let tx = tx.clone(); + let mut pool = ResultPool::new(); Box::new(move |entry| { if quit_flag.load(Ordering::Relaxed) { @@ -387,20 +460,22 @@ impl WorkerState { DirEntry::broken_symlink(path) } _ => { - return match tx.send(WorkerResult::Error(ignore::Error::WithPath { + let result = pool.get(WorkerResult::Error(ignore::Error::WithPath { path, err: inner_err, - })) { + })); + return match tx.send(result) { Ok(_) => WalkState::Continue, Err(_) => WalkState::Quit, - } + }; } }, Err(err) => { - return match tx.send(WorkerResult::Error(err)) { + let result = pool.get(WorkerResult::Error(err)); + return match tx.send(result) { Ok(_) => WalkState::Continue, Err(_) => WalkState::Quit, - } + }; } }; @@ -509,7 +584,8 @@ impl WorkerState { } } - let send_result = tx.send(WorkerResult::Entry(entry)); + let result = pool.get(WorkerResult::Entry(entry)); + let send_result = tx.send(result); if send_result.is_err() { return WalkState::Quit; @@ -545,8 +621,7 @@ impl WorkerState { .unwrap(); } - // Channel capacity was chosen empircally to perform similarly to an unbounded channel - let (tx, rx) = bounded(0x4000 * config.threads); + let (tx, rx) = unbounded(); // Spawn the sender threads. self.spawn_senders(walker, tx);