Skip to content

Commit

Permalink
Merge pull request #41 from rfdzan/sync_thread_output
Browse files Browse the repository at this point in the history
Sync thread output
  • Loading branch information
rfdzan authored Apr 20, 2024
2 parents bff7d9a + cd03f73 commit 7b317e1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ readme = "README.md"
exclude = ["src/main.rs"]

[dependencies]
crossbeam = { version = "0.8.4", features = ["crossbeam-deque"] }
crossbeam = "0.8.4"
image = "0.24.9"
img-parts = "0.3.0"
thiserror = "1.0.58"
Expand Down
82 changes: 37 additions & 45 deletions src/bulk.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{error, Compress, DEVICE, QUALITY};
use crossbeam::channel;
use crossbeam::deque::{Steal, Stealer, Worker};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
/// Custom configuration for building a [`Parallel`].
/// This struct is not meant to be used directly.
/// Use [`Parallel::from_vec`] instead.
#[derive(Debug, Clone)]
pub struct ParallelBuilder {
vec: Vec<Vec<u8>>,
vec: VecDeque<(usize, Vec<u8>)>,
quality: u8,
device_num: u8,
}
Expand All @@ -26,12 +26,10 @@ impl ParallelBuilder {
pub fn build(self) -> Parallel {
let (tx, rx) = channel::unbounded();
Parallel {
main_worker: Worker::new_fifo(),
vec: self.vec,
to_thread: StuffThatNeedsToBeSent {
vec: self.vec,
device_num: self.device_num,
quality: self.quality,
stealers: Vec::with_capacity(usize::from(self.device_num)),
},
transmitter: tx,
receiver: rx,
Expand Down Expand Up @@ -64,11 +62,12 @@ impl ParallelBuilder {
}
}
}

#[derive(Debug)]
pub struct StuffThatNeedsToBeSent {
vec: VecDeque<(usize, Vec<u8>)>,
device_num: u8,
quality: u8,
stealers: Vec<Stealer<Vec<u8>>>,
}
impl StuffThatNeedsToBeSent {
/// Compress images in parallel.
Expand All @@ -77,51 +76,48 @@ impl StuffThatNeedsToBeSent {
tx: channel::Sender<Result<Vec<u8>, error::Error>>,
) -> Vec<thread::JoinHandle<()>> {
let mut handles = Vec::with_capacity(usize::from(self.device_num));
let to_steal_from = Arc::new(Mutex::new(self.stealers));
let counter = Arc::new(Mutex::new(0usize));
let to_steal_from = Arc::new(Mutex::new(self.vec));
for _ in 0..self.device_num {
let local_stealer = Arc::clone(&to_steal_from);
let local_counter = Arc::clone(&counter);
let local_transmitter = tx.clone();
let handle = thread::spawn(move || {
let mut are_queues_empty = Vec::with_capacity(usize::from(self.device_num));
let mut payload = Vec::with_capacity(1);
loop {
{
let Some(stealer_guard) = local_stealer.try_lock().ok() else {
let Some(mut stealer_guard) = local_stealer.lock().ok() else {
continue;
};
for stealer in stealer_guard.iter() {
let Steal::Success(jpeg_bytes) = stealer.steal() else {
continue;
};
payload.push(jpeg_bytes);
if let Some(bytes) = stealer_guard.pop_front() {
payload.push(bytes);
} else {
break;
}
let _checks = stealer_guard
.iter()
.map(|stealer| {
if stealer.is_empty() {
are_queues_empty.push(true);
} else {
are_queues_empty.push(false);
}
})
.collect::<Vec<_>>();
// lock is no longer needed past this point
}
if let Some(bytes) = payload.pop() {
let compress_result = Compress::new(bytes, self.quality).compress();
match local_transmitter.send(compress_result) {
Err(e) => {
eprintln!("{e:#?}");
if let Some(content) = payload.pop() {
let compress_result = Compress::new(content.1, self.quality).compress();
loop {
{
let Some(mut counter_guard) = local_counter.lock().ok() else {
continue;
};
if !(*counter_guard == content.0) {
continue;
} else {
*counter_guard = *counter_guard + 1;
}
match local_transmitter.send(compress_result) {
Err(e) => {
eprintln!("{e:#?}");
}
Ok(_) => {}
}
}
Ok(_) => {}
break;
}
}
// if all stealers are empty, exit the loop.
if are_queues_empty.iter().all(|val| val == &true) {
break;
}
are_queues_empty.clear();
payload.clear();
}
});
Expand All @@ -133,8 +129,6 @@ impl StuffThatNeedsToBeSent {
/// Parallelized compression task.
#[derive(Debug)]
pub struct Parallel {
main_worker: Worker<Vec<u8>>,
vec: Vec<Vec<u8>>,
to_thread: StuffThatNeedsToBeSent,
transmitter: channel::Sender<Result<Vec<u8>, error::Error>>,
receiver: channel::Receiver<Result<Vec<u8>, error::Error>>,
Expand Down Expand Up @@ -167,18 +161,16 @@ impl Parallel {
/// ```
pub fn from_vec(vec: Vec<Vec<u8>>) -> ParallelBuilder {
ParallelBuilder {
vec,
vec: vec
.into_iter()
.enumerate()
.map(|content| content)
.collect::<VecDeque<(usize, Vec<u8>)>>(),
quality: QUALITY,
device_num: DEVICE,
}
}
fn compress(mut self) -> Vec<JoinHandle<()>> {
for _ in 0..self.to_thread.device_num {
self.to_thread.stealers.push(self.main_worker.stealer());
}
for bytes in self.vec {
self.main_worker.push(bytes);
}
fn compress(self) -> Vec<JoinHandle<()>> {
let handles = self.to_thread.send_to_threads(self.transmitter);
handles
}
Expand Down

0 comments on commit 7b317e1

Please sign in to comment.