diff --git a/Cargo.toml b/Cargo.toml index c2cce49..6ef54f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ readme = "README.md" exclude = ["src/main.rs"] [dependencies] -crossbeam = { version = "0.8.4", features = ["crossbeam-deque"] } +crossbeam = "0.8.4" image = "0.24.9" img-parts = "0.3.0" thiserror = "1.0.58" diff --git a/src/bulk.rs b/src/bulk.rs index 20c5335..007c59b 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -1,6 +1,6 @@ use crate::{error, Compress, DEVICE, QUALITY}; use crossbeam::channel; -use crossbeam::deque::{Steal, Stealer, Worker}; +use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; /// Custom configuration for building a [`Parallel`]. @@ -8,7 +8,7 @@ use std::thread::{self, JoinHandle}; /// Use [`Parallel::from_vec`] instead. #[derive(Debug, Clone)] pub struct ParallelBuilder { - vec: Vec>, + vec: VecDeque<(usize, Vec)>, quality: u8, device_num: u8, } @@ -26,12 +26,10 @@ impl ParallelBuilder { pub fn build(self) -> Parallel { let (tx, rx) = channel::unbounded(); Parallel { - main_worker: Worker::new_fifo(), - vec: self.vec, to_thread: StuffThatNeedsToBeSent { + vec: self.vec, device_num: self.device_num, quality: self.quality, - stealers: Vec::with_capacity(usize::from(self.device_num)), }, transmitter: tx, receiver: rx, @@ -64,11 +62,12 @@ impl ParallelBuilder { } } } + #[derive(Debug)] pub struct StuffThatNeedsToBeSent { + vec: VecDeque<(usize, Vec)>, device_num: u8, quality: u8, - stealers: Vec>>, } impl StuffThatNeedsToBeSent { /// Compress images in parallel. @@ -77,51 +76,48 @@ impl StuffThatNeedsToBeSent { tx: channel::Sender, error::Error>>, ) -> Vec> { let mut handles = Vec::with_capacity(usize::from(self.device_num)); - let to_steal_from = Arc::new(Mutex::new(self.stealers)); + let counter = Arc::new(Mutex::new(0usize)); + let to_steal_from = Arc::new(Mutex::new(self.vec)); for _ in 0..self.device_num { let local_stealer = Arc::clone(&to_steal_from); + let local_counter = Arc::clone(&counter); let local_transmitter = tx.clone(); let handle = thread::spawn(move || { - let mut are_queues_empty = Vec::with_capacity(usize::from(self.device_num)); let mut payload = Vec::with_capacity(1); loop { { - let Some(stealer_guard) = local_stealer.try_lock().ok() else { + let Some(mut stealer_guard) = local_stealer.lock().ok() else { continue; }; - for stealer in stealer_guard.iter() { - let Steal::Success(jpeg_bytes) = stealer.steal() else { - continue; - }; - payload.push(jpeg_bytes); + if let Some(bytes) = stealer_guard.pop_front() { + payload.push(bytes); + } else { break; } - let _checks = stealer_guard - .iter() - .map(|stealer| { - if stealer.is_empty() { - are_queues_empty.push(true); - } else { - are_queues_empty.push(false); - } - }) - .collect::>(); // lock is no longer needed past this point } - if let Some(bytes) = payload.pop() { - let compress_result = Compress::new(bytes, self.quality).compress(); - match local_transmitter.send(compress_result) { - Err(e) => { - eprintln!("{e:#?}"); + if let Some(content) = payload.pop() { + let compress_result = Compress::new(content.1, self.quality).compress(); + loop { + { + let Some(mut counter_guard) = local_counter.lock().ok() else { + continue; + }; + if !(*counter_guard == content.0) { + continue; + } else { + *counter_guard = *counter_guard + 1; + } + match local_transmitter.send(compress_result) { + Err(e) => { + eprintln!("{e:#?}"); + } + Ok(_) => {} + } } - Ok(_) => {} + break; } } - // if all stealers are empty, exit the loop. - if are_queues_empty.iter().all(|val| val == &true) { - break; - } - are_queues_empty.clear(); payload.clear(); } }); @@ -133,8 +129,6 @@ impl StuffThatNeedsToBeSent { /// Parallelized compression task. #[derive(Debug)] pub struct Parallel { - main_worker: Worker>, - vec: Vec>, to_thread: StuffThatNeedsToBeSent, transmitter: channel::Sender, error::Error>>, receiver: channel::Receiver, error::Error>>, @@ -167,18 +161,16 @@ impl Parallel { /// ``` pub fn from_vec(vec: Vec>) -> ParallelBuilder { ParallelBuilder { - vec, + vec: vec + .into_iter() + .enumerate() + .map(|content| content) + .collect::)>>(), quality: QUALITY, device_num: DEVICE, } } - fn compress(mut self) -> Vec> { - for _ in 0..self.to_thread.device_num { - self.to_thread.stealers.push(self.main_worker.stealer()); - } - for bytes in self.vec { - self.main_worker.push(bytes); - } + fn compress(self) -> Vec> { let handles = self.to_thread.send_to_threads(self.transmitter); handles }