Skip to content

Commit

Permalink
Wait for GC workers to exit using join handles
Browse files Browse the repository at this point in the history
  • Loading branch information
wks committed Feb 5, 2024
1 parent 176ad47 commit b968d73
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 48 deletions.
21 changes: 19 additions & 2 deletions src/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::mmtk::MMTK;
use crate::util::opaque_pointer::*;
use crate::util::options::AffinityKind;
use crate::util::rust_util::array_from_fn;
use crate::vm::gc_thread::GCThreadJoinHandle;
use crate::vm::Collection;
use crate::vm::VMBinding;
use crate::Plan;
Expand Down Expand Up @@ -93,7 +94,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {

/// Ask all GC workers to exit for forking, and wait until all workers exited.
pub fn stop_gc_threads_for_forking(self: &Arc<Self>) {
self.worker_group.prepare_surrender_buffer();
let join_handles = self.worker_group.prepare_surrender_buffer();

debug!("A mutator is requesting GC threads to stop for forking...");
self.worker_monitor.make_request(|requests| {
Expand All @@ -105,7 +106,23 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
}
});

self.worker_group.wait_until_worker_exited();
let pid = unsafe { libc::getpid() };

debug!("Waiting for GC workers to finish... PID: {pid}");

let mut joined_workers = 0;
let total_workers = join_handles.len();
debug_assert_eq!(total_workers, self.worker_group.worker_count());

// Join each GC worker using their join handles.
// The order doesn't matter because all worker threads must stop.
for join_handle in join_handles {
join_handle.join_native_thread();
joined_workers += 1;
debug!("{joined_workers} of {total_workers} GC workers joined. PID: {pid}",);
}

debug!("All workers have exited. PID: {pid}");

self.worker_monitor.on_all_workers_exited();
}
Expand Down
84 changes: 39 additions & 45 deletions src/scheduler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crossbeam::queue::ArrayQueue;
#[cfg(feature = "count_live_bytes_in_gc")]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};

/// Represents the ID of a GC worker thread.
pub type ThreadId = usize;
Expand Down Expand Up @@ -197,10 +197,10 @@ impl<VM: VMBinding> GCWorker<VM> {
/// Each worker will keep polling and executing work packets in a loop.
pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK<VM>) {
probe!(mmtk, gcworker_run);
let pid = unsafe { libc::getpid() };
let tid = unsafe { libc::gettid() };
debug!(
"Worker started. PID: {}, tid: {}, ordinal: {}",
unsafe { libc::getpid() },
unsafe { libc::gettid() },
"Worker started. PID: {pid}, TID: {tid}, ordinal: {}",
self.ordinal
);
WORKER_ORDINAL.with(|x| x.store(self.ordinal, Ordering::SeqCst));
Expand Down Expand Up @@ -235,9 +235,7 @@ impl<VM: VMBinding> GCWorker<VM> {
work.do_work_with_stat(self, mmtk);
}
debug!(
"Worker exiting. PID: {}, tid: {}, ordinal: {}",
unsafe { libc::getpid() },
unsafe { libc::gettid() },
"Worker exiting. PID: {pid}, TID: {tid}, ordinal: {}",
self.ordinal
);
probe!(mmtk, gcworker_exit);
Expand All @@ -263,7 +261,10 @@ enum WorkerCreationState<VM: VMBinding> {
},
/// All `GCWorker`` structs have been transferred to worker threads, and worker threads are
/// running.
Spawned,
Spawned {
/// Join handles for joining the GC worker threads.
join_handles: Vec<VM::VMGCThreadJoinHandle>,
},
}

/// A worker group to manage all the GC workers.
Expand All @@ -272,9 +273,6 @@ pub(crate) struct WorkerGroup<VM: VMBinding> {
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
/// The stateful part
state: Mutex<WorkerCreationState<VM>>,
/// The condition of "all workers exited", i.e. the number of suspended workers is equal to the
/// number of workers.
cond_all_workers_exited: Condvar,
}

/// We have to persuade Rust that `WorkerGroup` is safe to share because it thinks one worker can
Expand Down Expand Up @@ -302,7 +300,6 @@ impl<VM: VMBinding> WorkerGroup<VM> {
state: Mutex::new(WorkerCreationState::NotCreated {
unspawned_local_work_queues,
}),
cond_all_workers_exited: Default::default(),
})
}

Expand Down Expand Up @@ -331,38 +328,54 @@ impl<VM: VMBinding> WorkerGroup<VM> {
}

*state = WorkerCreationState::Resting { suspended_workers };
debug!("GCWorker instances created.");
debug!(
"GCWorker instances created. Total: {}",
self.worker_count()
);
}

/// Spawn all the worker threads
pub fn spawn(&self, tls: VMThread) {
debug!("Spawning GC workers. PID: {}", unsafe { libc::getpid() });
let pid = unsafe { libc::getpid() };
debug!("Spawning GC workers. PID: {pid}");

let mut state = self.state.lock().unwrap();

// TODO: It's better if we can take ownership of the old state directly.
// See https://docs.rs/replace_with/latest/replace_with/
let WorkerCreationState::Resting { ref mut suspended_workers } = *state else {
panic!("GCWorker structs have not been created, yet.");
};

// Drain the queue. We transfer the ownership of each `GCWorker` instance to a GC thread.
for worker in suspended_workers.drain(..) {
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
}
// We transfer the ownership of each `GCWorker` instance to a GC thread.
let join_handles = std::mem::take(suspended_workers)
.into_iter()
.map(|worker| {
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker))
})
.collect();

*state = WorkerCreationState::Spawned;
*state = WorkerCreationState::Spawned { join_handles };

debug!(
"Spawned {} worker threads. PID: {}",
"Spawned {} worker threads. PID: {pid}",
self.worker_count(),
unsafe { libc::getpid() }
);
}

/// Prepare the buffer for workers to surrender their `GCWorker` structs.
pub fn prepare_surrender_buffer(&self) {
pub fn prepare_surrender_buffer(&self) -> Vec<VM::VMGCThreadJoinHandle> {
let mut state = self.state.lock().unwrap();
*state = WorkerCreationState::Resting {
suspended_workers: Vec::with_capacity(self.worker_count()),
}
let old_state = std::mem::replace(
&mut *state,
WorkerCreationState::Resting {
suspended_workers: Vec::with_capacity(self.worker_count()),
},
);
let WorkerCreationState::Spawned { join_handles } = old_state else {
panic!("prepare_surrender_buffer called when GC worker threads have not been spawned.");
};
join_handles
}

/// Surrender the `GCWorker` struct when a GC worker exits.
Expand All @@ -373,31 +386,12 @@ impl<VM: VMBinding> WorkerGroup<VM> {
};
let ordinal = worker.ordinal;
suspended_workers.push(worker);
trace!(
debug!(
"Worker {} surrendered. ({}/{})",
ordinal,
suspended_workers.len(),
self.worker_count()
);
if suspended_workers.len() == self.worker_count() {
debug!("All {} workers surrendered.", self.worker_count());
self.cond_all_workers_exited.notify_all();
}
}

/// Wait until all workers exited.
pub fn wait_until_worker_exited(&self) {
let guard = self.state.lock().unwrap();
let _guard = self.cond_all_workers_exited.wait_while(guard, |state| {
let WorkerCreationState::Resting { ref suspended_workers } = *state else {
panic!("GCWorker structs have not been created, yet.");
};
suspended_workers.len() != self.worker_count()
});

debug!("All workers have exited. PID: {}", unsafe {
libc::getpid()
});
}

/// Get the number of workers in the group
Expand Down
3 changes: 3 additions & 0 deletions src/scheduler/worker_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ impl WorkerMonitor {
/// Called when all workers have exited.
pub fn on_all_workers_exited(&self) {
let mut sync = self.sync.try_lock().unwrap();

// Consider the current goal, i.e. `StopForFork`, as finished.
// When the GC threads are re-spawn, they should see no goals.
sync.goals.current = None;
}
}
5 changes: 4 additions & 1 deletion src/vm/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ pub trait Collection<VM: VMBinding> {
/// The spawned thread shall call `memory_manager::start_worker`.
/// Currently `Worker` is the only kind of thread which mmtk-core will create.
/// In either case, the `Box` inside should be passed back to the called function.
fn spawn_gc_thread(tls: VMThread, ctx: GCThreadContext<VM>);
///
/// It returns a join handle for joining the thread. This is currently used for preparing for
/// forking.
fn spawn_gc_thread(tls: VMThread, ctx: GCThreadContext<VM>) -> VM::VMGCThreadJoinHandle;

/// Inform the VM of an out-of-memory error. The binding should hook into the VM's error
/// routine for OOM. Note that there are two different categories of OOM:
Expand Down
21 changes: 21 additions & 0 deletions src/vm/gc_thread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! This module defines types for supporting GC threads.
/// This type is used for joining GC threads, i.e. waiting for GC threads to exit.
///
/// This type is used for supporting forking. [`crate::MMTK::prepare_to_fork`] will block until
/// all GC workers have exited so that the VM can safely call the `fork()` system call. If a VM
/// never calls `fork()`, or calls `fork()` only for calling `exec()`, the VM can use
/// [`UnimplementedGCThreadJoinHandle`] instead.
pub trait GCThreadJoinHandle: Send {
/// Block until the underlying native thread of the GC thread exited.
fn join_native_thread(self);
}

/// An unimplemented [`GCThreadJoinHandle`]. Calling `join_native_thread` will panic.
struct UnimplementedGCThreadJoinHandle;

impl GCThreadJoinHandle for UnimplementedGCThreadJoinHandle {
fn join_native_thread(self) {
unimplemented!()
}
}
4 changes: 4 additions & 0 deletions src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod active_plan;
mod collection;
/// Allows MMTk to access edges in a VM-defined way.
pub mod edge_shape;
pub mod gc_thread;
pub(crate) mod object_model;
mod reference_glue;
mod scanning;
Expand Down Expand Up @@ -64,6 +65,9 @@ where
/// The type of heap memory slice in this VM.
type VMMemorySlice: edge_shape::MemorySlice<Edge = Self::VMEdge>;

/// The type of join handle for GC threads.
type VMGCThreadJoinHandle: gc_thread::GCThreadJoinHandle;

/// A value to fill in alignment gaps. This value can be used for debugging.
const ALIGNMENT_VALUE: usize = 0xdead_beef;
/// Allowed minimal alignment in bytes.
Expand Down

0 comments on commit b968d73

Please sign in to comment.