From b968d7350a11c048c72a965912943a0f5235dd35 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Mon, 5 Feb 2024 21:44:06 +0800 Subject: [PATCH] Wait for GC workers to exit using join handles --- src/scheduler/scheduler.rs | 21 ++++++++- src/scheduler/worker.rs | 84 +++++++++++++++------------------ src/scheduler/worker_monitor.rs | 3 ++ src/vm/collection.rs | 5 +- src/vm/gc_thread.rs | 21 +++++++++ src/vm/mod.rs | 4 ++ 6 files changed, 90 insertions(+), 48 deletions(-) create mode 100644 src/vm/gc_thread.rs diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 960fca80e8..4301ffc6b7 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -12,6 +12,7 @@ use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; use crate::util::options::AffinityKind; use crate::util::rust_util::array_from_fn; +use crate::vm::gc_thread::GCThreadJoinHandle; use crate::vm::Collection; use crate::vm::VMBinding; use crate::Plan; @@ -93,7 +94,7 @@ impl GCWorkScheduler { /// Ask all GC workers to exit for forking, and wait until all workers exited. pub fn stop_gc_threads_for_forking(self: &Arc) { - self.worker_group.prepare_surrender_buffer(); + let join_handles = self.worker_group.prepare_surrender_buffer(); debug!("A mutator is requesting GC threads to stop for forking..."); self.worker_monitor.make_request(|requests| { @@ -105,7 +106,23 @@ impl GCWorkScheduler { } }); - self.worker_group.wait_until_worker_exited(); + let pid = unsafe { libc::getpid() }; + + debug!("Waiting for GC workers to finish... PID: {pid}"); + + let mut joined_workers = 0; + let total_workers = join_handles.len(); + debug_assert_eq!(total_workers, self.worker_group.worker_count()); + + // Join each GC worker using their join handles. + // The order doesn't matter because all worker threads must stop. + for join_handle in join_handles { + join_handle.join_native_thread(); + joined_workers += 1; + debug!("{joined_workers} of {total_workers} GC workers joined. PID: {pid}",); + } + + debug!("All workers have exited. PID: {pid}"); self.worker_monitor.on_all_workers_exited(); } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 5d19d003ac..3260358150 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -12,7 +12,7 @@ use crossbeam::queue::ArrayQueue; #[cfg(feature = "count_live_bytes_in_gc")] use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; /// Represents the ID of a GC worker thread. pub type ThreadId = usize; @@ -197,10 +197,10 @@ impl GCWorker { /// Each worker will keep polling and executing work packets in a loop. pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK) { probe!(mmtk, gcworker_run); + let pid = unsafe { libc::getpid() }; + let tid = unsafe { libc::gettid() }; debug!( - "Worker started. PID: {}, tid: {}, ordinal: {}", - unsafe { libc::getpid() }, - unsafe { libc::gettid() }, + "Worker started. PID: {pid}, TID: {tid}, ordinal: {}", self.ordinal ); WORKER_ORDINAL.with(|x| x.store(self.ordinal, Ordering::SeqCst)); @@ -235,9 +235,7 @@ impl GCWorker { work.do_work_with_stat(self, mmtk); } debug!( - "Worker exiting. PID: {}, tid: {}, ordinal: {}", - unsafe { libc::getpid() }, - unsafe { libc::gettid() }, + "Worker exiting. PID: {pid}, TID: {tid}, ordinal: {}", self.ordinal ); probe!(mmtk, gcworker_exit); @@ -263,7 +261,10 @@ enum WorkerCreationState { }, /// All `GCWorker`` structs have been transferred to worker threads, and worker threads are /// running. - Spawned, + Spawned { + /// Join handles for joining the GC worker threads. + join_handles: Vec, + }, } /// A worker group to manage all the GC workers. @@ -272,9 +273,6 @@ pub(crate) struct WorkerGroup { pub workers_shared: Vec>>, /// The stateful part state: Mutex>, - /// The condition of "all workers exited", i.e. the number of suspended workers is equal to the - /// number of workers. - cond_all_workers_exited: Condvar, } /// We have to persuade Rust that `WorkerGroup` is safe to share because it thinks one worker can @@ -302,7 +300,6 @@ impl WorkerGroup { state: Mutex::new(WorkerCreationState::NotCreated { unspawned_local_work_queues, }), - cond_all_workers_exited: Default::default(), }) } @@ -331,38 +328,54 @@ impl WorkerGroup { } *state = WorkerCreationState::Resting { suspended_workers }; - debug!("GCWorker instances created."); + debug!( + "GCWorker instances created. Total: {}", + self.worker_count() + ); } /// Spawn all the worker threads pub fn spawn(&self, tls: VMThread) { - debug!("Spawning GC workers. PID: {}", unsafe { libc::getpid() }); + let pid = unsafe { libc::getpid() }; + debug!("Spawning GC workers. PID: {pid}"); + let mut state = self.state.lock().unwrap(); + // TODO: It's better if we can take ownership of the old state directly. + // See https://docs.rs/replace_with/latest/replace_with/ let WorkerCreationState::Resting { ref mut suspended_workers } = *state else { panic!("GCWorker structs have not been created, yet."); }; - // Drain the queue. We transfer the ownership of each `GCWorker` instance to a GC thread. - for worker in suspended_workers.drain(..) { - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); - } + // We transfer the ownership of each `GCWorker` instance to a GC thread. + let join_handles = std::mem::take(suspended_workers) + .into_iter() + .map(|worker| { + VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)) + }) + .collect(); - *state = WorkerCreationState::Spawned; + *state = WorkerCreationState::Spawned { join_handles }; debug!( - "Spawned {} worker threads. PID: {}", + "Spawned {} worker threads. PID: {pid}", self.worker_count(), - unsafe { libc::getpid() } ); } /// Prepare the buffer for workers to surrender their `GCWorker` structs. - pub fn prepare_surrender_buffer(&self) { + pub fn prepare_surrender_buffer(&self) -> Vec { let mut state = self.state.lock().unwrap(); - *state = WorkerCreationState::Resting { - suspended_workers: Vec::with_capacity(self.worker_count()), - } + let old_state = std::mem::replace( + &mut *state, + WorkerCreationState::Resting { + suspended_workers: Vec::with_capacity(self.worker_count()), + }, + ); + let WorkerCreationState::Spawned { join_handles } = old_state else { + panic!("prepare_surrender_buffer called when GC worker threads have not been spawned."); + }; + join_handles } /// Surrender the `GCWorker` struct when a GC worker exits. @@ -373,31 +386,12 @@ impl WorkerGroup { }; let ordinal = worker.ordinal; suspended_workers.push(worker); - trace!( + debug!( "Worker {} surrendered. ({}/{})", ordinal, suspended_workers.len(), self.worker_count() ); - if suspended_workers.len() == self.worker_count() { - debug!("All {} workers surrendered.", self.worker_count()); - self.cond_all_workers_exited.notify_all(); - } - } - - /// Wait until all workers exited. - pub fn wait_until_worker_exited(&self) { - let guard = self.state.lock().unwrap(); - let _guard = self.cond_all_workers_exited.wait_while(guard, |state| { - let WorkerCreationState::Resting { ref suspended_workers } = *state else { - panic!("GCWorker structs have not been created, yet."); - }; - suspended_workers.len() != self.worker_count() - }); - - debug!("All workers have exited. PID: {}", unsafe { - libc::getpid() - }); } /// Get the number of workers in the group diff --git a/src/scheduler/worker_monitor.rs b/src/scheduler/worker_monitor.rs index cba34c6928..228da89b93 100644 --- a/src/scheduler/worker_monitor.rs +++ b/src/scheduler/worker_monitor.rs @@ -242,6 +242,9 @@ impl WorkerMonitor { /// Called when all workers have exited. pub fn on_all_workers_exited(&self) { let mut sync = self.sync.try_lock().unwrap(); + + // Consider the current goal, i.e. `StopForFork`, as finished. + // When the GC threads are re-spawn, they should see no goals. sync.goals.current = None; } } diff --git a/src/vm/collection.rs b/src/vm/collection.rs index b3b172ded0..ec0f0c8020 100644 --- a/src/vm/collection.rs +++ b/src/vm/collection.rs @@ -55,7 +55,10 @@ pub trait Collection { /// The spawned thread shall call `memory_manager::start_worker`. /// Currently `Worker` is the only kind of thread which mmtk-core will create. /// In either case, the `Box` inside should be passed back to the called function. - fn spawn_gc_thread(tls: VMThread, ctx: GCThreadContext); + /// + /// It returns a join handle for joining the thread. This is currently used for preparing for + /// forking. + fn spawn_gc_thread(tls: VMThread, ctx: GCThreadContext) -> VM::VMGCThreadJoinHandle; /// Inform the VM of an out-of-memory error. The binding should hook into the VM's error /// routine for OOM. Note that there are two different categories of OOM: diff --git a/src/vm/gc_thread.rs b/src/vm/gc_thread.rs new file mode 100644 index 0000000000..ae111857f5 --- /dev/null +++ b/src/vm/gc_thread.rs @@ -0,0 +1,21 @@ +//! This module defines types for supporting GC threads. + +/// This type is used for joining GC threads, i.e. waiting for GC threads to exit. +/// +/// This type is used for supporting forking. [`crate::MMTK::prepare_to_fork`] will block until +/// all GC workers have exited so that the VM can safely call the `fork()` system call. If a VM +/// never calls `fork()`, or calls `fork()` only for calling `exec()`, the VM can use +/// [`UnimplementedGCThreadJoinHandle`] instead. +pub trait GCThreadJoinHandle: Send { + /// Block until the underlying native thread of the GC thread exited. + fn join_native_thread(self); +} + +/// An unimplemented [`GCThreadJoinHandle`]. Calling `join_native_thread` will panic. +struct UnimplementedGCThreadJoinHandle; + +impl GCThreadJoinHandle for UnimplementedGCThreadJoinHandle { + fn join_native_thread(self) { + unimplemented!() + } +} diff --git a/src/vm/mod.rs b/src/vm/mod.rs index e9b3003664..cc1ff40d9e 100644 --- a/src/vm/mod.rs +++ b/src/vm/mod.rs @@ -19,6 +19,7 @@ mod active_plan; mod collection; /// Allows MMTk to access edges in a VM-defined way. pub mod edge_shape; +pub mod gc_thread; pub(crate) mod object_model; mod reference_glue; mod scanning; @@ -64,6 +65,9 @@ where /// The type of heap memory slice in this VM. type VMMemorySlice: edge_shape::MemorySlice; + /// The type of join handle for GC threads. + type VMGCThreadJoinHandle: gc_thread::GCThreadJoinHandle; + /// A value to fill in alignment gaps. This value can be used for debugging. const ALIGNMENT_VALUE: usize = 0xdead_beef; /// Allowed minimal alignment in bytes.