From 7f1969535c4b83674cafd5a95a2721b6bdf4f274 Mon Sep 17 00:00:00 2001 From: hkohko Date: Thu, 18 Apr 2024 08:40:37 +0700 Subject: [PATCH 1/9] try doing simple arc mutex vector for multithreading --- src/bulk.rs | 72 ++++++++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index 23fda9e..1a2ad32 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -63,12 +63,12 @@ impl ParallelBuilder { pub fn build(self) -> Parallel { let (tx, rx) = channel::unbounded(); Parallel { - main_worker: Worker::new_fifo(), - vec: self.vec, + // main_worker: Worker::new_fifo(), to_thread: StuffThatNeedsToBeSent { + vec: self.vec, device_num: self.device_num, quality: self.quality, - stealers: Vec::with_capacity(usize::from(self.device_num)), + // stealers: Vec::with_capacity(usize::from(self.device_num)), }, transmitter: tx, receiver: rx, @@ -77,9 +77,10 @@ impl ParallelBuilder { } #[derive(Debug)] pub struct StuffThatNeedsToBeSent { + vec: Vec>, device_num: u8, quality: u8, - stealers: Vec>>, + // stealers: Vec>>, } impl StuffThatNeedsToBeSent { /// Compress images in parallel. @@ -88,35 +89,39 @@ impl StuffThatNeedsToBeSent { tx: channel::Sender, error::Error>>, ) -> Vec> { let mut handles = Vec::with_capacity(usize::from(self.device_num)); - let to_steal_from = Arc::new(Mutex::new(self.stealers)); + // let to_steal_from = Arc::new(Mutex::new(self.stealers)); + let to_steal_from = Arc::new(Mutex::new(self.vec)); for _ in 0..self.device_num { let local_stealer = Arc::clone(&to_steal_from); let local_transmitter = tx.clone(); let handle = thread::spawn(move || { - let mut are_queues_empty = Vec::with_capacity(usize::from(self.device_num)); + // let mut are_queues_empty = Vec::with_capacity(usize::from(self.device_num)); let mut payload = Vec::with_capacity(1); loop { { - let Some(stealer_guard) = local_stealer.try_lock().ok() else { + let Some(mut stealer_guard) = local_stealer.try_lock().ok() else { continue; }; - for stealer in stealer_guard.iter() { - let Steal::Success(direntry) = stealer.steal() else { - continue; - }; - payload.push(direntry); + // for stealer in stealer_guard.iter() { + // let Steal::Success(direntry) = stealer.steal() else { + // continue; + // }; + if let Some(bytes) = stealer_guard.pop() { + payload.push(bytes); + } else { break; } - let _checks = stealer_guard - .iter() - .map(|stealer| { - if stealer.is_empty() { - are_queues_empty.push(true); - } else { - are_queues_empty.push(false); - } - }) - .collect::>(); + // let _checks = stealer_guard + // .iter() + // .map(|stealer| { + // if stealer.is_empty() { + // are_queues_empty.push(true); + // } else { + // are_queues_empty.push(false); + // } + // }) + // .collect::>(); + // lock is no longer needed past this point } if let Some(bytes) = payload.pop() { @@ -129,10 +134,10 @@ impl StuffThatNeedsToBeSent { } } // if all stealers are empty, exit the loop. - if are_queues_empty.iter().all(|val| val == &true) { - break; - } - are_queues_empty.clear(); + // if are_queues_empty.iter().all(|val| val == &true) { + // break; + // } + // are_queues_empty.clear(); payload.clear(); } }); @@ -144,8 +149,7 @@ impl StuffThatNeedsToBeSent { /// Parallelized compression task. #[derive(Debug)] pub struct Parallel { - main_worker: Worker>, - vec: Vec>, + // main_worker: Worker>, to_thread: StuffThatNeedsToBeSent, transmitter: channel::Sender, error::Error>>, receiver: channel::Receiver, error::Error>>, @@ -184,12 +188,12 @@ impl Parallel { } } fn compress(mut self) -> Vec> { - for _ in 0..self.to_thread.device_num { - self.to_thread.stealers.push(self.main_worker.stealer()); - } - for bytes in self.vec { - self.main_worker.push(bytes); - } + // for _ in 0..self.to_thread.device_num { + // self.to_thread.stealers.push(self.main_worker.stealer()); + // } + // for bytes in self.vec { + // self.main_worker.push(bytes); + // } let handles = self.to_thread.send_to_threads(self.transmitter); handles } From 939a095e077505d7dd5d1195a8a6993fffb5ade0 Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 17:12:39 +0700 Subject: [PATCH 2/9] fix: syncing transmission of data --- src/bulk.rs | 47 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index 1a2ad32..112d5d5 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -1,6 +1,7 @@ use crate::{error, Compress, DEVICE, QUALITY}; use crossbeam::channel; use crossbeam::deque::{Steal, Stealer, Worker}; +use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; /// Custom configuration for building a [`Parallel`]. @@ -8,7 +9,7 @@ use std::thread::{self, JoinHandle}; /// Use [`Parallel::from_vec`] instead. #[derive(Debug, Clone)] pub struct ParallelBuilder { - vec: Vec>, + vec: VecDeque<(usize, Vec)>, quality: u8, device_num: u8, } @@ -77,7 +78,7 @@ impl ParallelBuilder { } #[derive(Debug)] pub struct StuffThatNeedsToBeSent { - vec: Vec>, + vec: VecDeque<(usize, Vec)>, device_num: u8, quality: u8, // stealers: Vec>>, @@ -89,10 +90,12 @@ impl StuffThatNeedsToBeSent { tx: channel::Sender, error::Error>>, ) -> Vec> { let mut handles = Vec::with_capacity(usize::from(self.device_num)); + let counter = Arc::new(Mutex::new(0usize)); // let to_steal_from = Arc::new(Mutex::new(self.stealers)); let to_steal_from = Arc::new(Mutex::new(self.vec)); for _ in 0..self.device_num { let local_stealer = Arc::clone(&to_steal_from); + let local_counter = Arc::clone(&counter); let local_transmitter = tx.clone(); let handle = thread::spawn(move || { // let mut are_queues_empty = Vec::with_capacity(usize::from(self.device_num)); @@ -106,7 +109,7 @@ impl StuffThatNeedsToBeSent { // let Steal::Success(direntry) = stealer.steal() else { // continue; // }; - if let Some(bytes) = stealer_guard.pop() { + if let Some(bytes) = stealer_guard.pop_front() { payload.push(bytes); } else { break; @@ -124,13 +127,31 @@ impl StuffThatNeedsToBeSent { // lock is no longer needed past this point } - if let Some(bytes) = payload.pop() { - let compress_result = Compress::new(bytes, self.quality).compress(); - match local_transmitter.send(compress_result) { - Err(e) => { - eprintln!("{e:#?}"); + if let Some(content) = payload.pop() { + let compress_result = Compress::new(content.1, self.quality).compress(); + loop { + let Some(mut counter_guard) = local_counter.try_lock().ok() else { + // println!("contending for lock"); + continue; + }; + if !(*counter_guard == content.0) { + // println!("{}", content.0); + // println!("stuck in this check"); + drop(counter_guard); + continue; + } else { + *counter_guard = *counter_guard + 1; + // println!("{}", counter_guard); + drop(counter_guard); + match local_transmitter.send(compress_result) { + Err(e) => { + eprintln!("{e:#?}"); + } + Ok(_) => {} + } + // println!("here"); + break; } - Ok(_) => {} } } // if all stealers are empty, exit the loop. @@ -182,12 +203,16 @@ impl Parallel { /// ``` pub fn from_vec(vec: Vec>) -> ParallelBuilder { ParallelBuilder { - vec, + vec: vec + .into_iter() + .enumerate() + .map(|content| content) + .collect::)>>(), quality: QUALITY, device_num: DEVICE, } } - fn compress(mut self) -> Vec> { + fn compress(self) -> Vec> { // for _ in 0..self.to_thread.device_num { // self.to_thread.stealers.push(self.main_worker.stealer()); // } From fc59d21fcddd13377f2c6837dfd91a9a57554973 Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 17:48:20 +0700 Subject: [PATCH 3/9] (wip) fix: scoping lock --- src/bulk.rs | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index 112d5d5..8865895 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -130,28 +130,28 @@ impl StuffThatNeedsToBeSent { if let Some(content) = payload.pop() { let compress_result = Compress::new(content.1, self.quality).compress(); loop { - let Some(mut counter_guard) = local_counter.try_lock().ok() else { - // println!("contending for lock"); - continue; - }; - if !(*counter_guard == content.0) { - // println!("{}", content.0); - // println!("stuck in this check"); - drop(counter_guard); - continue; - } else { - *counter_guard = *counter_guard + 1; - // println!("{}", counter_guard); - drop(counter_guard); - match local_transmitter.send(compress_result) { - Err(e) => { - eprintln!("{e:#?}"); - } - Ok(_) => {} + { + let Some(mut counter_guard) = local_counter.try_lock().ok() else { + // println!("contending for lock"); + continue; + }; + if !(*counter_guard == content.0) { + // println!("{}", content.0); + // println!("stuck in this check"); + // drop(counter_guard); + continue; + } else { + *counter_guard = *counter_guard + 1; } - // println!("here"); - break; } + match local_transmitter.send(compress_result) { + Err(e) => { + eprintln!("{e:#?}"); + } + Ok(_) => {} + } + // println!("here"); + break; } } // if all stealers are empty, exit the loop. From 4ed44d5515efff25f97ed03ae2189ab45c01b6a6 Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 19:27:14 +0700 Subject: [PATCH 4/9] removed: comments/commented code --- src/bulk.rs | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index 8865895..57d3581 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -105,26 +105,11 @@ impl StuffThatNeedsToBeSent { let Some(mut stealer_guard) = local_stealer.try_lock().ok() else { continue; }; - // for stealer in stealer_guard.iter() { - // let Steal::Success(direntry) = stealer.steal() else { - // continue; - // }; if let Some(bytes) = stealer_guard.pop_front() { payload.push(bytes); } else { break; } - // let _checks = stealer_guard - // .iter() - // .map(|stealer| { - // if stealer.is_empty() { - // are_queues_empty.push(true); - // } else { - // are_queues_empty.push(false); - // } - // }) - // .collect::>(); - // lock is no longer needed past this point } if let Some(content) = payload.pop() { @@ -132,13 +117,9 @@ impl StuffThatNeedsToBeSent { loop { { let Some(mut counter_guard) = local_counter.try_lock().ok() else { - // println!("contending for lock"); continue; }; if !(*counter_guard == content.0) { - // println!("{}", content.0); - // println!("stuck in this check"); - // drop(counter_guard); continue; } else { *counter_guard = *counter_guard + 1; @@ -150,15 +131,9 @@ impl StuffThatNeedsToBeSent { } Ok(_) => {} } - // println!("here"); break; } } - // if all stealers are empty, exit the loop. - // if are_queues_empty.iter().all(|val| val == &true) { - // break; - // } - // are_queues_empty.clear(); payload.clear(); } }); From 868544243075f329b7f58cbef528ffc74c4bf393 Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 19:30:14 +0700 Subject: [PATCH 5/9] fix: hold mutex until the data is sent --- src/bulk.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index 57d3581..9a8e644 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -124,12 +124,12 @@ impl StuffThatNeedsToBeSent { } else { *counter_guard = *counter_guard + 1; } - } - match local_transmitter.send(compress_result) { - Err(e) => { - eprintln!("{e:#?}"); + match local_transmitter.send(compress_result) { + Err(e) => { + eprintln!("{e:#?}"); + } + Ok(_) => {} } - Ok(_) => {} } break; } From d3e94bb3a18c074cdc3e1c68b88e04bf5a9b3459 Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 20:05:26 +0700 Subject: [PATCH 6/9] removed: more comments/commented code --- src/bulk.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index 9a8e644..eb34905 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -1,6 +1,5 @@ use crate::{error, Compress, DEVICE, QUALITY}; use crossbeam::channel; -use crossbeam::deque::{Steal, Stealer, Worker}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; @@ -64,12 +63,10 @@ impl ParallelBuilder { pub fn build(self) -> Parallel { let (tx, rx) = channel::unbounded(); Parallel { - // main_worker: Worker::new_fifo(), to_thread: StuffThatNeedsToBeSent { vec: self.vec, device_num: self.device_num, quality: self.quality, - // stealers: Vec::with_capacity(usize::from(self.device_num)), }, transmitter: tx, receiver: rx, @@ -81,7 +78,6 @@ pub struct StuffThatNeedsToBeSent { vec: VecDeque<(usize, Vec)>, device_num: u8, quality: u8, - // stealers: Vec>>, } impl StuffThatNeedsToBeSent { /// Compress images in parallel. @@ -98,7 +94,6 @@ impl StuffThatNeedsToBeSent { let local_counter = Arc::clone(&counter); let local_transmitter = tx.clone(); let handle = thread::spawn(move || { - // let mut are_queues_empty = Vec::with_capacity(usize::from(self.device_num)); let mut payload = Vec::with_capacity(1); loop { { @@ -145,7 +140,6 @@ impl StuffThatNeedsToBeSent { /// Parallelized compression task. #[derive(Debug)] pub struct Parallel { - // main_worker: Worker>, to_thread: StuffThatNeedsToBeSent, transmitter: channel::Sender, error::Error>>, receiver: channel::Receiver, error::Error>>, @@ -188,12 +182,6 @@ impl Parallel { } } fn compress(self) -> Vec> { - // for _ in 0..self.to_thread.device_num { - // self.to_thread.stealers.push(self.main_worker.stealer()); - // } - // for bytes in self.vec { - // self.main_worker.push(bytes); - // } let handles = self.to_thread.send_to_threads(self.transmitter); handles } From 1b9d0f49b99bbf6b9aea3f3e48be9a978f5a08a8 Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 20:09:49 +0700 Subject: [PATCH 7/9] changed to blocking locks --- src/bulk.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index eb34905..bca7199 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -87,7 +87,6 @@ impl StuffThatNeedsToBeSent { ) -> Vec> { let mut handles = Vec::with_capacity(usize::from(self.device_num)); let counter = Arc::new(Mutex::new(0usize)); - // let to_steal_from = Arc::new(Mutex::new(self.stealers)); let to_steal_from = Arc::new(Mutex::new(self.vec)); for _ in 0..self.device_num { let local_stealer = Arc::clone(&to_steal_from); @@ -97,7 +96,7 @@ impl StuffThatNeedsToBeSent { let mut payload = Vec::with_capacity(1); loop { { - let Some(mut stealer_guard) = local_stealer.try_lock().ok() else { + let Some(mut stealer_guard) = local_stealer.lock().ok() else { continue; }; if let Some(bytes) = stealer_guard.pop_front() { @@ -111,7 +110,7 @@ impl StuffThatNeedsToBeSent { let compress_result = Compress::new(content.1, self.quality).compress(); loop { { - let Some(mut counter_guard) = local_counter.try_lock().ok() else { + let Some(mut counter_guard) = local_counter.lock().ok() else { continue; }; if !(*counter_guard == content.0) { From eb0ec3b6bc9481053156da5b810604bbaad6662a Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 20:17:49 +0700 Subject: [PATCH 8/9] removed crossbeam feature: deque --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c2cce49..6ef54f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ readme = "README.md" exclude = ["src/main.rs"] [dependencies] -crossbeam = { version = "0.8.4", features = ["crossbeam-deque"] } +crossbeam = "0.8.4" image = "0.24.9" img-parts = "0.3.0" thiserror = "1.0.58" From cd03f732e6e4ee7b4de3ef9962897b06bc97fd56 Mon Sep 17 00:00:00 2001 From: hkohko Date: Sat, 20 Apr 2024 23:05:18 +0700 Subject: [PATCH 9/9] resolve merge conflict --- src/bulk.rs | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/src/bulk.rs b/src/bulk.rs index 7ac4442..007c59b 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -26,12 +26,10 @@ impl ParallelBuilder { pub fn build(self) -> Parallel { let (tx, rx) = channel::unbounded(); Parallel { - main_worker: Worker::new_fifo(), - vec: self.vec, to_thread: StuffThatNeedsToBeSent { + vec: self.vec, device_num: self.device_num, quality: self.quality, - stealers: Vec::with_capacity(usize::from(self.device_num)), }, transmitter: tx, receiver: rx, @@ -64,30 +62,6 @@ impl ParallelBuilder { } } } -impl ParallelBuilder { - /// Builds a new [`Parallel`] with default or specified configuration. - /// # Example - /// This is the minimal requirements for using this method: - /// ``` - /// # fn main() { - /// # use jippigy::Parallel; - /// let mut vector_of_bytes: Vec> = Vec::new(); - /// let _build = Parallel::from_vec(vector_of_bytes).build(); - /// # } - /// ``` - pub fn build(self) -> Parallel { - let (tx, rx) = channel::unbounded(); - Parallel { - to_thread: StuffThatNeedsToBeSent { - vec: self.vec, - device_num: self.device_num, - quality: self.quality, - }, - transmitter: tx, - receiver: rx, - } - } -} #[derive(Debug)] pub struct StuffThatNeedsToBeSent {