diff --git a/src/lib.rs b/src/lib.rs index 3d590c1..ea27b89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::thread::JoinHandle; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; pub struct Scheduler { running: Arc, @@ -158,6 +158,10 @@ impl JobStore { fn signal(&self) { self.signal.notify_one() } + + fn next_fire(&self) -> Option { + Some(Duration::from_millis(1)) + } } impl Default for JobStore { diff --git a/src/threading/mod.rs b/src/threading/mod.rs index 86e6deb..069ba5a 100644 --- a/src/threading/mod.rs +++ b/src/threading/mod.rs @@ -1,16 +1,85 @@ mod thread_pool; +use crate::JobStore; use std::num::NonZeroUsize; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::Acquire; +use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; +use std::time::Duration; use thread_pool::WorkerPool; +const DEFAULT_WAIT_NO_WORK: Duration = Duration::from_secs(1); + pub(super) struct SchedulerThread { - workers: WorkerPool, + halted: Arc, + workers: Arc>, + store: Arc, + handle: JoinHandle<()>, } impl SchedulerThread { - pub fn new(pool_size: NonZeroUsize) -> Self { + pub fn new(pool_size: NonZeroUsize, store: Arc) -> Self { + let halted = Arc::new(AtomicBool::default()); + let workers = Arc::new(WorkerPool::new(pool_size)); + let handle = { + let workers = workers.clone(); + let halted = halted.clone(); + let store = store.clone(); + + thread::spawn(move || { + while !halted.load(Acquire) { + let available_workers = workers.available_workers(); + // todo delete this! + if cfg!(test) { + println!("We have {} workers available", available_workers); + } + let next_fire = store.next_fire().unwrap_or(DEFAULT_WAIT_NO_WORK); + thread::sleep(next_fire); + } + }) + }; + + // todo delete this! + if cfg!(test) { + workers.submit(()).unwrap(); + thread::sleep(Duration::from_millis(1)); + workers.submit(()).unwrap(); + thread::sleep(Duration::from_millis(1)); + workers.submit(()).unwrap(); + } + Self { - workers: WorkerPool::new(pool_size), + halted, + workers, + store, + handle, } } + + pub fn shutdown(self) { + self.halted.store(true, std::sync::atomic::Ordering::Release); + self.handle.join().expect("Scheduler thread panicked"); + Arc::try_unwrap(self.workers) + .expect("worker pool is still being used!") + .shutdown(); + } +} + +#[cfg(test)] +mod tests { + use crate::threading::SchedulerThread; + use crate::JobStore; + use std::num::NonZeroUsize; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + #[test] + fn api() { + let t = SchedulerThread::new(NonZeroUsize::new(3).unwrap(), Arc::new(JobStore::new())); + thread::sleep(Duration::from_millis(3)); + t.shutdown(); + } } diff --git a/src/threading/thread_pool.rs b/src/threading/thread_pool.rs index 69ef4c5..5569c78 100644 --- a/src/threading/thread_pool.rs +++ b/src/threading/thread_pool.rs @@ -4,19 +4,22 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::thread::JoinHandle; +use std::time::Duration; -pub(super) struct WorkerPool { +#[derive(Debug)] +pub(super) struct WorkerPool { running: Arc, - workers: Vec<(Arc, JoinHandle<()>)>, + workers: Vec<(Arc>, JoinHandle<()>)>, } -struct Worker { +#[derive(Debug)] +struct Worker { busy: AtomicBool, - task: Mutex>, + task: Mutex>, cvar: Condvar, } -impl WorkerPool { +impl WorkerPool { pub fn new(size: NonZeroUsize) -> Self { let size = size.get(); @@ -25,10 +28,10 @@ impl WorkerPool { for _ in 0..size { let running = running.clone(); - let worker: Arc = Arc::default(); + let worker: Arc> = Arc::default(); let w = worker.clone(); let jh = thread::spawn(move || { - while running.load(Ordering::SeqCst) { + while running.load(Ordering::Acquire) { worker.do_work(); } }); @@ -38,8 +41,24 @@ impl WorkerPool { Self { running, workers } } + pub fn submit(&self, task: T) -> Result<(), T> { + match self + .workers + .iter() + .find(|(w, _)| !w.busy.load(Ordering::Acquire)) + .map(|(w, _)| w) + { + Some(w) => w.assign(task), + None => Err(task), + } + } + + pub fn available_workers(&self) -> usize { + self.workers.iter().filter(|w| !w.0.busy()).count() + } + pub fn shutdown(mut self) { - self.running.store(false, Ordering::SeqCst); + self.running.store(false, Ordering::Release); for (worker, handle) in self.workers.drain(..) { worker.wake_up(); handle.join().expect("Worker thread panicked"); @@ -47,15 +66,22 @@ impl WorkerPool { } } -impl Drop for WorkerPool { +impl Drop for WorkerPool { fn drop(&mut self) { if !self.workers.is_empty() { - eprintln!("WorkerPool hasn't been shutdown prior to being Dropped!"); + if cfg!(test) { + assert!( + self.workers.is_empty(), + "WorkerPool hasn't been shutdown prior to being Dropped!" + ); + } else { + eprintln!("WorkerPool hasn't been shutdown properly!"); + } } } } -impl Worker { +impl Worker { fn new() -> Self { Self { busy: Default::default(), // todo non atomic? if only accessed from within the scheduler's lock @@ -64,9 +90,9 @@ impl Worker { } } - fn assign(&self, work: ()) -> Result<(), ()> { + fn assign(&self, work: T) -> Result<(), T> { if self.busy.load(Ordering::Acquire) { - return Err(()); + return Err(work); } let mut task = self.task.lock().unwrap(); match *task { @@ -76,7 +102,7 @@ impl Worker { self.cvar.notify_one(); Ok(()) }, - Some(_) => Err(()), + Some(_) => Err(work), } } @@ -99,7 +125,14 @@ impl Worker { task = self.cvar.wait(task).unwrap(); } task.take().expect("task must be available"); - // todo do the work + // todo delete this! + if cfg!(test) { + println!("Doing work!"); + thread::sleep(Duration::from_millis(2)); + // todo do the work + println!("Done!"); + } + self.busy.store(false, Ordering::Release); } @@ -111,7 +144,7 @@ impl Worker { } } -impl Default for Worker { +impl Default for Worker { fn default() -> Self { Self::new() } @@ -124,10 +157,23 @@ mod tests { #[test] fn test_thread_pool() { - let pool = WorkerPool::new(NonZeroUsize::new(1).unwrap()); + let pool = WorkerPool::<()>::new(NonZeroUsize::new(1).unwrap()); pool.shutdown(); } + #[test] + fn available_workers() { + let pool = WorkerPool::<()>::new(NonZeroUsize::new(2).unwrap()); + assert_eq!(pool.available_workers(), 2); + pool.shutdown(); + } + + #[test] + #[should_panic(expected = "WorkerPool hasn't been shutdown prior to being Dropped!")] + fn test_thread_pool_dropped_panics() { + let _pool = WorkerPool::<()>::new(NonZeroUsize::new(1).unwrap()); + } + #[test] fn worker_api() { let worker = Worker::new();