Skip to content

Commit

Permalink
Wiring scheduler thread in
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Snaps <alex@wcgw.dev>
  • Loading branch information
alexsnaps committed Dec 19, 2024
1 parent 12367aa commit cb27679
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 21 deletions.
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

pub struct Scheduler {
running: Arc<AtomicBool>,
Expand Down Expand Up @@ -158,6 +158,10 @@ impl JobStore {
fn signal(&self) {
self.signal.notify_one()
}

fn next_fire(&self) -> Option<Duration> {
Some(Duration::from_millis(1))
}
}

impl Default for JobStore {
Expand Down
75 changes: 72 additions & 3 deletions src/threading/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,85 @@
mod thread_pool;

use crate::JobStore;
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Acquire;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use thread_pool::WorkerPool;

const DEFAULT_WAIT_NO_WORK: Duration = Duration::from_secs(1);

pub(super) struct SchedulerThread {
workers: WorkerPool,
halted: Arc<AtomicBool>,
workers: Arc<WorkerPool<()>>,
store: Arc<JobStore>,
handle: JoinHandle<()>,
}

impl SchedulerThread {
pub fn new(pool_size: NonZeroUsize) -> Self {
pub fn new(pool_size: NonZeroUsize, store: Arc<JobStore>) -> Self {
let halted = Arc::new(AtomicBool::default());
let workers = Arc::new(WorkerPool::new(pool_size));
let handle = {
let workers = workers.clone();
let halted = halted.clone();
let store = store.clone();

thread::spawn(move || {
while !halted.load(Acquire) {
let available_workers = workers.available_workers();
// todo delete this!
if cfg!(test) {
println!("We have {} workers available", available_workers);
}
let next_fire = store.next_fire().unwrap_or(DEFAULT_WAIT_NO_WORK);
thread::sleep(next_fire);
}
})
};

// todo delete this!
if cfg!(test) {
workers.submit(()).unwrap();
thread::sleep(Duration::from_millis(1));
workers.submit(()).unwrap();
thread::sleep(Duration::from_millis(1));
workers.submit(()).unwrap();
}

Self {
workers: WorkerPool::new(pool_size),
halted,
workers,
store,
handle,
}
}

pub fn shutdown(self) {
self.halted.store(true, std::sync::atomic::Ordering::Release);
self.handle.join().expect("Scheduler thread panicked");
Arc::try_unwrap(self.workers)
.expect("worker pool is still being used!")
.shutdown();
}
}

#[cfg(test)]
mod tests {
use crate::threading::SchedulerThread;
use crate::JobStore;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

#[test]
fn api() {
let t = SchedulerThread::new(NonZeroUsize::new(3).unwrap(), Arc::new(JobStore::new()));
thread::sleep(Duration::from_millis(3));
t.shutdown();
}
}
80 changes: 63 additions & 17 deletions src/threading/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;

pub(super) struct WorkerPool {
#[derive(Debug)]
pub(super) struct WorkerPool<T> {
running: Arc<AtomicBool>,
workers: Vec<(Arc<Worker>, JoinHandle<()>)>,
workers: Vec<(Arc<Worker<T>>, JoinHandle<()>)>,
}

struct Worker {
#[derive(Debug)]
struct Worker<T> {
busy: AtomicBool,
task: Mutex<Option<()>>,
task: Mutex<Option<T>>,
cvar: Condvar,
}

impl WorkerPool {
impl<T: Send + 'static> WorkerPool<T> {
pub fn new(size: NonZeroUsize) -> Self {
let size = size.get();

Expand All @@ -25,10 +28,10 @@ impl WorkerPool {

for _ in 0..size {
let running = running.clone();
let worker: Arc<Worker> = Arc::default();
let worker: Arc<Worker<T>> = Arc::default();
let w = worker.clone();
let jh = thread::spawn(move || {
while running.load(Ordering::SeqCst) {
while running.load(Ordering::Acquire) {
worker.do_work();
}
});
Expand All @@ -38,24 +41,47 @@ impl WorkerPool {
Self { running, workers }
}

pub fn submit(&self, task: T) -> Result<(), T> {
match self
.workers
.iter()
.find(|(w, _)| !w.busy.load(Ordering::Acquire))
.map(|(w, _)| w)
{
Some(w) => w.assign(task),
None => Err(task),
}
}

pub fn available_workers(&self) -> usize {
self.workers.iter().filter(|w| !w.0.busy()).count()
}

pub fn shutdown(mut self) {
self.running.store(false, Ordering::SeqCst);
self.running.store(false, Ordering::Release);
for (worker, handle) in self.workers.drain(..) {
worker.wake_up();
handle.join().expect("Worker thread panicked");
}
}
}

impl Drop for WorkerPool {
impl<T> Drop for WorkerPool<T> {
fn drop(&mut self) {
if !self.workers.is_empty() {
eprintln!("WorkerPool hasn't been shutdown prior to being Dropped!");
if cfg!(test) {
assert!(
self.workers.is_empty(),
"WorkerPool hasn't been shutdown prior to being Dropped!"
);
} else {
eprintln!("WorkerPool hasn't been shutdown properly!");
}
}
}
}

impl Worker {
impl<T> Worker<T> {
fn new() -> Self {
Self {
busy: Default::default(), // todo non atomic? if only accessed from within the scheduler's lock
Expand All @@ -64,9 +90,9 @@ impl Worker {
}
}

fn assign(&self, work: ()) -> Result<(), ()> {
fn assign(&self, work: T) -> Result<(), T> {
if self.busy.load(Ordering::Acquire) {
return Err(());
return Err(work);
}
let mut task = self.task.lock().unwrap();
match *task {
Expand All @@ -76,7 +102,7 @@ impl Worker {
self.cvar.notify_one();
Ok(())
},
Some(_) => Err(()),
Some(_) => Err(work),
}
}

Expand All @@ -99,7 +125,14 @@ impl Worker {
task = self.cvar.wait(task).unwrap();
}
task.take().expect("task must be available");
// todo do the work
// todo delete this!
if cfg!(test) {
println!("Doing work!");
thread::sleep(Duration::from_millis(2));
// todo do the work
println!("Done!");
}

self.busy.store(false, Ordering::Release);
}

Expand All @@ -111,7 +144,7 @@ impl Worker {
}
}

impl Default for Worker {
impl<T> Default for Worker<T> {
fn default() -> Self {
Self::new()
}
Expand All @@ -124,10 +157,23 @@ mod tests {

#[test]
fn test_thread_pool() {
let pool = WorkerPool::new(NonZeroUsize::new(1).unwrap());
let pool = WorkerPool::<()>::new(NonZeroUsize::new(1).unwrap());
pool.shutdown();
}

#[test]
fn available_workers() {
let pool = WorkerPool::<()>::new(NonZeroUsize::new(2).unwrap());
assert_eq!(pool.available_workers(), 2);
pool.shutdown();
}

#[test]
#[should_panic(expected = "WorkerPool hasn't been shutdown prior to being Dropped!")]
fn test_thread_pool_dropped_panics() {
let _pool = WorkerPool::<()>::new(NonZeroUsize::new(1).unwrap());
}

#[test]
fn worker_api() {
let worker = Worker::new();
Expand Down

0 comments on commit cb27679

Please sign in to comment.