Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PyMutex wrappers #4523

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions newsfragments/4523.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Added a Rust wrapper for `PyMutex`, available on Python 3.13 and newer.
16 changes: 14 additions & 2 deletions pyo3-ffi/src/cpython/lock.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
use std::marker::PhantomPinned;
use std::sync::atomic::AtomicU8;

#[repr(transparent)]
#[derive(Debug)]
pub struct PyMutex {
pub(crate) _bits: AtomicU8,
pub(crate) _pin: PhantomPinned,
}

impl PyMutex {
pub const fn new() -> PyMutex {
PyMutex {
_bits: AtomicU8::new(0),
}
}
}

impl Default for PyMutex {
fn default() -> Self {
Self::new()
}
}

extern "C" {
Expand Down
9 changes: 2 additions & 7 deletions pyo3-ffi/src/object.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::pyport::{Py_hash_t, Py_ssize_t};
#[cfg(Py_GIL_DISABLED)]
use crate::PyMutex;
#[cfg(Py_GIL_DISABLED)]
use std::marker::PhantomPinned;
use std::mem;
use std::os::raw::{c_char, c_int, c_uint, c_ulong, c_void};
use std::ptr;
#[cfg(Py_GIL_DISABLED)]
use std::sync::atomic::{AtomicIsize, AtomicU32, AtomicU8, Ordering::Relaxed};
use std::sync::atomic::{AtomicIsize, AtomicU32, Ordering::Relaxed};

#[cfg(Py_LIMITED_API)]
opaque_struct!(PyTypeObject);
Expand Down Expand Up @@ -39,10 +37,7 @@ pub const PyObject_HEAD_INIT: PyObject = PyObject {
#[cfg(Py_GIL_DISABLED)]
_padding: 0,
#[cfg(Py_GIL_DISABLED)]
ob_mutex: PyMutex {
_bits: AtomicU8::new(0),
_pin: PhantomPinned,
},
ob_mutex: PyMutex::new(),
#[cfg(Py_GIL_DISABLED)]
ob_gc_bits: 0,
#[cfg(Py_GIL_DISABLED)]
Expand Down
4 changes: 4 additions & 0 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub use self::list::{PyList, PyListMethods};
pub use self::mapping::{PyMapping, PyMappingMethods};
pub use self::memoryview::PyMemoryView;
pub use self::module::{PyModule, PyModuleMethods};
#[cfg(all(not(Py_LIMITED_API), Py_3_13))]
pub use self::mutex::{PyMutex, PyMutexGuard};
pub use self::none::PyNone;
pub use self::notimplemented::PyNotImplemented;
#[allow(deprecated)]
Expand Down Expand Up @@ -251,6 +253,8 @@ pub(crate) mod list;
pub(crate) mod mapping;
mod memoryview;
pub(crate) mod module;
#[cfg(all(not(Py_LIMITED_API), Py_3_13))]
mod mutex;
mod none;
mod notimplemented;
mod num;
Expand Down
157 changes: 157 additions & 0 deletions src/types/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};

/// Wrapper for [`PyMutex`](https://docs.python.org/3/c-api/init.html#c.PyMutex), exposing an RAII guard interface.
///
/// Comapred with `std::sync::Mutex` or `parking_lot::Mutex`, this is a very

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comapred -> Compared

/// stripped-down locking primitive that only supports blocking lock and unlock
/// operations.
///
/// `PyMutex` is hooked into CPython's garbage collector and the GIL in GIL-enabled
/// builds. If a thread is blocked on aquiring the mutex and holds the GIL or would
/// prevent Python from entering garbage collection, then Python will release the
/// thread state, allowing garbage collection or other threads blocked by the GIL to
/// proceed. This means it is impossible for PyMutex to deadlock with the GIL.
pub struct PyMutex<T: ?Sized> {
mutex: UnsafeCell<crate::ffi::PyMutex>,
data: UnsafeCell<T>,
}

/// RAII guard to handle releasing a PyMutex lock.
///
/// The lock is released when `PyMutexGuard` is dropped.
pub struct PyMutexGuard<'a, T> {
inner: &'a PyMutex<T>,
}

impl<T> PyMutex<T> {
/// Acquire the mutex, blocking the current thread until it is able to do so.
pub fn lock(&self) -> PyMutexGuard<'_, T> {
unsafe { crate::ffi::PyMutex_Lock(UnsafeCell::raw_get(&self.mutex)) };
PyMutexGuard { inner: self }
}

/// Create a new mutex in an unlocked state ready for use.
pub const fn new(value: T) -> Self {
Self {
mutex: UnsafeCell::new(crate::ffi::PyMutex::new()),
data: UnsafeCell::new(value),
}
}
}

// safety: PyMutex serializes access
unsafe impl<T: Send> Sync for PyMutex<T> {}

impl<'a, T> Drop for PyMutexGuard<'a, T> {
fn drop(&mut self) {
unsafe { crate::ffi::PyMutex_Unlock(UnsafeCell::raw_get(&self.inner.mutex)) };
}
}

impl<'a, T> Deref for PyMutexGuard<'a, T> {
ngoldbaum marked this conversation as resolved.
Show resolved Hide resolved
type Target = T;

fn deref(&self) -> &T {
// safety: cannot be null pointer because PyMutex::new always
// creates a valid PyMutex pointer
unsafe { &*self.inner.data.get() }
}
}

impl<'a, T> DerefMut for PyMutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
// safety: cannot be null pointer because PyMutex::new always
// creates a valid PyMutex pointer
unsafe { &mut *self.inner.data.get() }
}
}

#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::sync_channel,
};

use super::*;
use crate::types::{PyAnyMethods, PyDict, PyDictMethods, PyNone};
use crate::Py;
use crate::Python;

#[test]
fn test_pymutex() {
let mutex = Python::with_gil(|py| -> PyMutex<Py<PyDict>> {
let d = PyDict::new(py);
PyMutex::new(d.unbind())
});

Python::with_gil(|py| {
let mutex = py.allow_threads(|| -> PyMutex<Py<PyDict>> {
std::thread::spawn(|| {
let dict_guard = mutex.lock();
Python::with_gil(|py| {
let dict = dict_guard.bind(py);
dict.set_item(PyNone::get(py), PyNone::get(py)).unwrap();
});
drop(dict_guard);
mutex
})
.join()
.unwrap()
});

let dict_guard = mutex.lock();
let d = dict_guard.bind(py);

assert!(d
.get_item(PyNone::get(py))
.unwrap()
.unwrap()
.eq(PyNone::get(py))
.unwrap());
});
}

#[test]
fn test_pymutex_blocks() {
let mutex = PyMutex::new(());
let first_thread_locked_once = AtomicBool::new(false);
let second_thread_locked_once = AtomicBool::new(false);
let finished = AtomicBool::new(false);
let (sender, receiver) = sync_channel::<bool>(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a std::sync::Barrier


std::thread::scope(|s| {
s.spawn(|| {
let guard = mutex.lock();
first_thread_locked_once.store(true, Ordering::SeqCst);
while !finished.load(Ordering::SeqCst) {
if second_thread_locked_once.load(Ordering::SeqCst) {
// Wait a little to guard against the unlikely event that
// the other thread isn't blocked on acquiring the mutex yet.
// If PyMutex had a try_lock implementation this would be
// unnecessary
std::thread::sleep(std::time::Duration::from_millis(10));
// block (and hold the mutex) until the receiver actually receives something
sender.send(true).unwrap();
finished.store(true, Ordering::SeqCst);
}
}
drop(guard);
});

s.spawn(|| {
while !first_thread_locked_once.load(Ordering::SeqCst) {
std::hint::spin_loop();
}
second_thread_locked_once.store(true, Ordering::SeqCst);
let guard = mutex.lock();
assert!(finished.load(Ordering::SeqCst));
drop(guard);
});

// threads are blocked until we receive
receiver.recv().unwrap();
});
}
}
Loading