diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs index dfc5d99e4c..9cf301b78d 100644 --- a/src/concurrency/thread.rs +++ b/src/concurrency/thread.rs @@ -152,6 +152,8 @@ pub enum BlockReason { InitOnce(InitOnceId), /// Blocked on epoll. Epoll, + /// Blocked on eventfd. + Eventfd, } /// The state of a thread. diff --git a/src/shims/unix/linux/eventfd.rs b/src/shims/unix/linux/eventfd.rs index 35bc933885..63b7d37b13 100644 --- a/src/shims/unix/linux/eventfd.rs +++ b/src/shims/unix/linux/eventfd.rs @@ -4,7 +4,7 @@ use std::io; use std::io::ErrorKind; use crate::concurrency::VClock; -use crate::shims::unix::fd::FileDescriptionRef; +use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef}; use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _}; use crate::shims::unix::*; use crate::*; @@ -26,6 +26,10 @@ struct Event { counter: Cell, is_nonblock: bool, clock: RefCell, + /// A list of thread ids blocked on eventfd::read. + blocked_read_tid: RefCell>, + /// A list of thread ids blocked on eventfd::write. + blocked_write_tid: RefCell>, } impl FileDescription for Event { @@ -72,31 +76,8 @@ impl FileDescription for Event { // eventfd read at the size of u64. let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty); - // Block when counter == 0. - let counter = self.counter.get(); - if counter == 0 { - if self.is_nonblock { - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } - - throw_unsup_format!("eventfd: blocking is unsupported"); - } else { - // Synchronize with all prior `write` calls to this FD. - ecx.acquire_clock(&self.clock.borrow()); - - // Give old counter value to userspace, and set counter value to 0. - ecx.write_int(counter, &buf_place)?; - self.counter.set(0); - - // When any of the event happened, we check and update the status of all supported event - // types for current file description. - ecx.check_and_update_readiness(self_ref)?; - - // Tell userspace how many bytes we wrote. - ecx.write_int(buf_place.layout.size.bytes(), dest)?; - } - - interp_ok(()) + let weak_eventfd = self_ref.downgrade(); + eventfd_read(buf_place, dest, weak_eventfd, ecx) } /// A write call adds the 8-byte integer value supplied in @@ -127,7 +108,7 @@ impl FileDescription for Event { return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest); } - // Read the user supplied value from the pointer. + // Read the user-supplied value from the pointer. let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty); let num = ecx.read_scalar(&buf_place)?.to_u64()?; @@ -137,27 +118,8 @@ impl FileDescription for Event { } // If the addition does not let the counter to exceed the maximum value, update the counter. // Else, block. - match self.counter.get().checked_add(num) { - Some(new_count @ 0..=MAX_COUNTER) => { - // Future `read` calls will synchronize with this write, so update the FD clock. - ecx.release_clock(|clock| { - self.clock.borrow_mut().join(clock); - }); - self.counter.set(new_count); - } - None | Some(u64::MAX) => - if self.is_nonblock { - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } else { - throw_unsup_format!("eventfd: blocking is unsupported"); - }, - }; - // When any of the event happened, we check and update the status of all supported event - // types for current file description. - ecx.check_and_update_readiness(self_ref)?; - - // Return how many bytes we read. - ecx.write_int(buf_place.layout.size.bytes(), dest) + let weak_eventfd = self_ref.downgrade(); + eventfd_write(num, buf_place, dest, weak_eventfd, ecx) } } @@ -217,8 +179,151 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { counter: Cell::new(val.into()), is_nonblock, clock: RefCell::new(VClock::default()), + blocked_read_tid: RefCell::new(Vec::new()), + blocked_write_tid: RefCell::new(Vec::new()), }); interp_ok(Scalar::from_i32(fd_value)) } } + +/// Block thread if the value addition will exceed u64::MAX -1, +/// else just add the user-supplied value to current counter. +fn eventfd_write<'tcx>( + num: u64, + buf_place: MPlaceTy<'tcx>, + dest: &MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + ecx: &mut MiriInterpCx<'tcx>, +) -> InterpResult<'tcx> { + let Some(eventfd_ref) = weak_eventfd.upgrade() else { + throw_unsup_format!("eventfd FD got closed while blocking.") + }; + + // Since we pass the weak file description ref, it is guaranteed to be + // an eventfd file description. + let eventfd = eventfd_ref.downcast::().unwrap(); + + match eventfd.counter.get().checked_add(num) { + Some(new_count @ 0..=MAX_COUNTER) => { + // Future `read` calls will synchronize with this write, so update the FD clock. + ecx.release_clock(|clock| { + eventfd.clock.borrow_mut().join(clock); + }); + + // When this function is called, the addition is guaranteed to not exceed u64::MAX - 1. + eventfd.counter.set(new_count); + + // When any of the event happened, we check and update the status of all supported event + // types for current file description. + ecx.check_and_update_readiness(&eventfd_ref)?; + + // Unblock *all* threads previously blocked on `read`. + // We need to take out the blocked thread ids and unblock them together, + // because `unblock_threads` may block them again and end up re-adding the + // thread to the blocked list. + let waiting_threads = std::mem::take(&mut *eventfd.blocked_read_tid.borrow_mut()); + // FIXME: We can randomize the order of unblocking. + for thread_id in waiting_threads { + ecx.unblock_thread(thread_id, BlockReason::Eventfd)?; + } + + // Return how many bytes we wrote. + return ecx.write_int(buf_place.layout.size.bytes(), dest); + } + None | Some(u64::MAX) => { + if eventfd.is_nonblock { + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); + } + + let dest = dest.clone(); + + eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread()); + + ecx.block_thread( + BlockReason::Eventfd, + None, + callback!( + @capture<'tcx> { + num: u64, + buf_place: MPlaceTy<'tcx>, + dest: MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + } + @unblock = |this| { + eventfd_write(num, buf_place, &dest, weak_eventfd, this) + } + ), + ); + } + }; + interp_ok(()) +} + +/// Block thread if the current counter is 0, +/// else just return the current counter value to the caller and set the counter to 0. +fn eventfd_read<'tcx>( + buf_place: MPlaceTy<'tcx>, + dest: &MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + ecx: &mut MiriInterpCx<'tcx>, +) -> InterpResult<'tcx> { + let Some(eventfd_ref) = weak_eventfd.upgrade() else { + throw_unsup_format!("eventfd FD got closed while blocking.") + }; + + // Since we pass the weak file description ref to the callback function, it is guaranteed to be + // an eventfd file description. + let eventfd = eventfd_ref.downcast::().unwrap(); + + // Block when counter == 0. + let counter = eventfd.counter.replace(0); + + if counter == 0 { + if eventfd.is_nonblock { + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); + } + let dest = dest.clone(); + + eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread()); + + ecx.block_thread( + BlockReason::Eventfd, + None, + callback!( + @capture<'tcx> { + buf_place: MPlaceTy<'tcx>, + dest: MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + } + @unblock = |this| { + eventfd_read(buf_place, &dest, weak_eventfd, this) + } + ), + ); + } else { + // Synchronize with all prior `write` calls to this FD. + ecx.acquire_clock(&eventfd.clock.borrow()); + + // Give old counter value to userspace, and set counter value to 0. + ecx.write_int(counter, &buf_place)?; + + // When any of the events happened, we check and update the status of all supported event + // types for current file description. + ecx.check_and_update_readiness(&eventfd_ref)?; + + // Unblock *all* threads previously blocked on `write`. + // We need to take out the blocked thread ids and unblock them together, + // because `unblock_threads` may block them again and end up re-adding the + // thread to the blocked list. + let waiting_threads = std::mem::take(&mut *eventfd.blocked_write_tid.borrow_mut()); + // FIXME: We can randomize the order of unblocking. + for thread_id in waiting_threads { + ecx.unblock_thread(thread_id, BlockReason::Eventfd)?; + } + + // Tell userspace how many bytes we read. + return ecx.write_int(buf_place.layout.size.bytes(), dest); + } + interp_ok(()) +} diff --git a/tests/fail-dep/libc/eventfd_block_read_twice.rs b/tests/fail-dep/libc/eventfd_block_read_twice.rs new file mode 100644 index 0000000000..65d29b2c6b --- /dev/null +++ b/tests/fail-dep/libc/eventfd_block_read_twice.rs @@ -0,0 +1,65 @@ +//@only-target: linux +//~^ERROR: deadlocked +//~^^ERROR: deadlocked +//@compile-flags: -Zmiri-preemption-rate=0 +//@error-in-other-file: deadlock + +use std::thread; + +// Test the behaviour of a thread being blocked on an eventfd read, get unblocked, and then +// get blocked again. + +// The expected execution is +// 1. Thread 1 blocks. +// 2. Thread 2 blocks. +// 3. Thread 3 unblocks both thread 1 and thread 2. +// 4. Thread 1 reads. +// 5. Thread 2's `read` deadlocked. + +fn main() { + // eventfd write will block when EFD_NONBLOCK flag is clear + // and the addition caused counter to exceed u64::MAX - 1. + let flags = libc::EFD_CLOEXEC; + let fd = unsafe { libc::eventfd(0, flags) }; + + let thread1 = thread::spawn(move || { + thread::park(); + let mut buf: [u8; 8] = [0; 8]; + // This read will block initially. + let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() }; + assert_eq!(res, 8); + let counter = u64::from_ne_bytes(buf); + assert_eq!(counter, 1_u64); + }); + + let thread2 = thread::spawn(move || { + thread::park(); + let mut buf: [u8; 8] = [0; 8]; + // This read will block initially, then get unblocked by thread3, then get blocked again + // because the `read` in thread1 executes first and set the counter to 0 again. + let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() }; + //~^ERROR: deadlocked + assert_eq!(res, 8); + let counter = u64::from_ne_bytes(buf); + assert_eq!(counter, 1_u64); + }); + + let thread3 = thread::spawn(move || { + thread::park(); + let sized_8_data = 1_u64.to_ne_bytes(); + // Write 1 to the counter, so both thread1 and thread2 will unblock. + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + // Make sure that write is successful. + assert_eq!(res, 8); + }); + + thread1.thread().unpark(); + thread2.thread().unpark(); + thread3.thread().unpark(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); +} diff --git a/tests/fail-dep/libc/eventfd_block_read_twice.stderr b/tests/fail-dep/libc/eventfd_block_read_twice.stderr new file mode 100644 index 0000000000..bb235345c5 --- /dev/null +++ b/tests/fail-dep/libc/eventfd_block_read_twice.stderr @@ -0,0 +1,41 @@ +error: deadlock: the evaluated program deadlocked + --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + | +LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE: + = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC + = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC +note: inside `main` + --> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC + | +LL | thread2.join().unwrap(); + | ^^^^^^^^^^^^^^ + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +error: deadlock: the evaluated program deadlocked + --> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC + | +LL | let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE on thread `unnamed-ID`: + = note: inside closure at tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 4 previous errors + diff --git a/tests/fail-dep/libc/eventfd_block_write_twice.rs b/tests/fail-dep/libc/eventfd_block_write_twice.rs new file mode 100644 index 0000000000..f9d34d2fb5 --- /dev/null +++ b/tests/fail-dep/libc/eventfd_block_write_twice.rs @@ -0,0 +1,71 @@ +//@only-target: linux +//~^ERROR: deadlocked +//~^^ERROR: deadlocked +//@compile-flags: -Zmiri-preemption-rate=0 +//@error-in-other-file: deadlock + +use std::thread; + +// Test the behaviour of a thread being blocked on an eventfd `write`, get unblocked, and then +// get blocked again. + +// The expected execution is +// 1. Thread 1 blocks. +// 2. Thread 2 blocks. +// 3. Thread 3 unblocks both thread 1 and thread 2. +// 4. Thread 1 writes u64::MAX. +// 5. Thread 2's `write` deadlocked. +fn main() { + // eventfd write will block when EFD_NONBLOCK flag is clear + // and the addition caused counter to exceed u64::MAX - 1. + let flags = libc::EFD_CLOEXEC; + let fd = unsafe { libc::eventfd(0, flags) }; + // Write u64 - 1, so the all subsequent write will block. + let sized_8_data: [u8; 8] = (u64::MAX - 1).to_ne_bytes(); + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + assert_eq!(res, 8); + + let thread1 = thread::spawn(move || { + thread::park(); + let sized_8_data = (u64::MAX - 1).to_ne_bytes(); + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + // Make sure that write is successful. + assert_eq!(res, 8); + }); + + let thread2 = thread::spawn(move || { + thread::park(); + let sized_8_data = (u64::MAX - 1).to_ne_bytes(); + // Write u64::MAX - 1, so the all subsequent write will block. + let res: i64 = unsafe { + // This `write` will initially blocked, then get unblocked by thread3, then get blocked again + // because the `write` in thread1 executes first. + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + //~^ERROR: deadlocked + }; + // Make sure that write is successful. + assert_eq!(res, 8); + }); + + let thread3 = thread::spawn(move || { + thread::park(); + let mut buf: [u8; 8] = [0; 8]; + // This will unblock both `write` in thread1 and thread2. + let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() }; + assert_eq!(res, 8); + let counter = u64::from_ne_bytes(buf); + assert_eq!(counter, (u64::MAX - 1)); + }); + + thread1.thread().unpark(); + thread2.thread().unpark(); + thread3.thread().unpark(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); +} diff --git a/tests/fail-dep/libc/eventfd_block_write_twice.stderr b/tests/fail-dep/libc/eventfd_block_write_twice.stderr new file mode 100644 index 0000000000..d9163a5748 --- /dev/null +++ b/tests/fail-dep/libc/eventfd_block_write_twice.stderr @@ -0,0 +1,41 @@ +error: deadlock: the evaluated program deadlocked + --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + | +LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE: + = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC + = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC +note: inside `main` + --> tests/fail-dep/libc/eventfd_block_write_twice.rs:LL:CC + | +LL | thread2.join().unwrap(); + | ^^^^^^^^^^^^^^ + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +error: deadlock: the evaluated program deadlocked + --> tests/fail-dep/libc/eventfd_block_write_twice.rs:LL:CC + | +LL | libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + | ^ the evaluated program deadlocked + | + = note: BACKTRACE on thread `unnamed-ID`: + = note: inside closure at tests/fail-dep/libc/eventfd_block_write_twice.rs:LL:CC + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 4 previous errors + diff --git a/tests/fail-dep/libc/libc_eventfd_read_block.rs b/tests/fail-dep/libc/libc_eventfd_read_block.rs deleted file mode 100644 index 0212a63bd0..0000000000 --- a/tests/fail-dep/libc/libc_eventfd_read_block.rs +++ /dev/null @@ -1,11 +0,0 @@ -//@only-target: linux -fn main() { - // eventfd read will block when EFD_NONBLOCK flag is clear and counter = 0. - // This will pass when blocking is implemented. - let flags = libc::EFD_CLOEXEC; - let fd = unsafe { libc::eventfd(0, flags) }; - let mut buf: [u8; 8] = [0; 8]; - let _res: i32 = unsafe { - libc::read(fd, buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap() //~ERROR: blocking is unsupported - }; -} diff --git a/tests/fail-dep/libc/libc_eventfd_read_block.stderr b/tests/fail-dep/libc/libc_eventfd_read_block.stderr deleted file mode 100644 index aff30c81eb..0000000000 --- a/tests/fail-dep/libc/libc_eventfd_read_block.stderr +++ /dev/null @@ -1,14 +0,0 @@ -error: unsupported operation: eventfd: blocking is unsupported - --> tests/fail-dep/libc/libc_eventfd_read_block.rs:LL:CC - | -LL | libc::read(fd, buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap() - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ eventfd: blocking is unsupported - | - = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support - = note: BACKTRACE: - = note: inside `main` at tests/fail-dep/libc/libc_eventfd_read_block.rs:LL:CC - -note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace - -error: aborting due to 1 previous error - diff --git a/tests/fail-dep/libc/libc_eventfd_write_block.rs b/tests/fail-dep/libc/libc_eventfd_write_block.rs deleted file mode 100644 index ed6ad46690..0000000000 --- a/tests/fail-dep/libc/libc_eventfd_write_block.rs +++ /dev/null @@ -1,21 +0,0 @@ -//@only-target: linux -fn main() { - // eventfd write will block when EFD_NONBLOCK flag is clear - // and the addition caused counter to exceed u64::MAX - 1. - // This will pass when blocking is implemented. - let flags = libc::EFD_CLOEXEC; - let fd = unsafe { libc::eventfd(0, flags) }; - // Write u64 - 1. - let mut sized_8_data: [u8; 8] = (u64::MAX - 1).to_ne_bytes(); - let res: i64 = unsafe { - libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() - }; - assert_eq!(res, 8); - - // Write 1. - sized_8_data = 1_u64.to_ne_bytes(); - // Write 1 to the counter. - let _res: i64 = unsafe { - libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() //~ERROR: blocking is unsupported - }; -} diff --git a/tests/fail-dep/libc/libc_eventfd_write_block.stderr b/tests/fail-dep/libc/libc_eventfd_write_block.stderr deleted file mode 100644 index 2b60660579..0000000000 --- a/tests/fail-dep/libc/libc_eventfd_write_block.stderr +++ /dev/null @@ -1,14 +0,0 @@ -error: unsupported operation: eventfd: blocking is unsupported - --> tests/fail-dep/libc/libc_eventfd_write_block.rs:LL:CC - | -LL | libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ eventfd: blocking is unsupported - | - = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support - = note: BACKTRACE: - = note: inside `main` at tests/fail-dep/libc/libc_eventfd_write_block.rs:LL:CC - -note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace - -error: aborting due to 1 previous error - diff --git a/tests/pass-dep/libc/libc-eventfd.rs b/tests/pass-dep/libc/libc-eventfd.rs index c92d9c3fe7..dd9c0eb0b5 100644 --- a/tests/pass-dep/libc/libc-eventfd.rs +++ b/tests/pass-dep/libc/libc-eventfd.rs @@ -1,5 +1,5 @@ //@only-target: linux -// test_race depends on a deterministic schedule. +// test_race, test_blocking_read and test_blocking_write depend on a deterministic schedule. //@compile-flags: -Zmiri-preemption-rate=0 // FIXME(static_mut_refs): Do not allow `static_mut_refs` lint @@ -11,6 +11,9 @@ fn main() { test_read_write(); test_race(); test_syscall(); + test_blocking_read(); + test_blocking_write(); + test_two_threads_blocked_on_eventfd(); } fn read_bytes(fd: i32, buf: &mut [u8; N]) -> i32 { @@ -118,3 +121,117 @@ fn test_syscall() { let fd = unsafe { libc::syscall(libc::SYS_eventfd2, initval, flags) }; assert_ne!(fd, -1); } + +// This test will block on eventfd read then get unblocked by `write`. +fn test_blocking_read() { + // eventfd read will block when EFD_NONBLOCK flag is clear and counter = 0. + let flags = libc::EFD_CLOEXEC; + let fd = unsafe { libc::eventfd(0, flags) }; + let thread1 = thread::spawn(move || { + let mut buf: [u8; 8] = [0; 8]; + // This will block. + let res = read_bytes(fd, &mut buf); + // read returns number of bytes has been read, which is always 8. + assert_eq!(res, 8); + let counter = u64::from_ne_bytes(buf); + assert_eq!(counter, 1); + }); + let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes(); + // Pass control to thread1 so it can block on eventfd `read`. + thread::yield_now(); + // Write 1 to the counter to unblock thread1. + let res = write_bytes(fd, sized_8_data); + assert_eq!(res, 8); + thread1.join().unwrap(); +} + +/// This test will block on eventfd `write` then get unblocked by `read`. +fn test_blocking_write() { + // eventfd write will block when EFD_NONBLOCK flag is clear + // and the addition caused counter to exceed u64::MAX - 1. + let flags = libc::EFD_CLOEXEC; + let fd = unsafe { libc::eventfd(0, flags) }; + // Write u64 - 1, so the all subsequent write will block. + let sized_8_data: [u8; 8] = (u64::MAX - 1).to_ne_bytes(); + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + assert_eq!(res, 8); + + let thread1 = thread::spawn(move || { + let sized_8_data = 1_u64.to_ne_bytes(); + // Write 1 to the counter, this will block. + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + // Make sure that write is successful. + assert_eq!(res, 8); + }); + let mut buf: [u8; 8] = [0; 8]; + // Pass control to thread1 so it can block on eventfd `write`. + thread::yield_now(); + // This will unblock previously blocked eventfd read. + let res = read_bytes(fd, &mut buf); + // read returns number of bytes has been read, which is always 8. + assert_eq!(res, 8); + let counter = u64::from_ne_bytes(buf); + assert_eq!(counter, (u64::MAX - 1)); + thread1.join().unwrap(); +} + +// Test two threads blocked on eventfd. +// Expected behaviour: +// 1. thread1 and thread2 both blocked on `write`. +// 2. thread3 unblocks both thread1 and thread2 +// 3. The write in thread1 and thread2 return successfully. +fn test_two_threads_blocked_on_eventfd() { + // eventfd write will block when EFD_NONBLOCK flag is clear + // and the addition caused counter to exceed u64::MAX - 1. + let flags = libc::EFD_CLOEXEC; + let fd = unsafe { libc::eventfd(0, flags) }; + // Write u64 - 1, so the all subsequent write will block. + let sized_8_data: [u8; 8] = (u64::MAX - 1).to_ne_bytes(); + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + assert_eq!(res, 8); + + let thread1 = thread::spawn(move || { + thread::park(); + let sized_8_data = 1_u64.to_ne_bytes(); + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + // Make sure that write is successful. + assert_eq!(res, 8); + }); + + let thread2 = thread::spawn(move || { + thread::park(); + let sized_8_data = 1_u64.to_ne_bytes(); + let res: i64 = unsafe { + libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap() + }; + // Make sure that write is successful. + assert_eq!(res, 8); + }); + + let thread3 = thread::spawn(move || { + thread::park(); + let mut buf: [u8; 8] = [0; 8]; + // This will unblock previously blocked eventfd read. + let res = read_bytes(fd, &mut buf); + // read returns number of bytes has been read, which is always 8. + assert_eq!(res, 8); + let counter = u64::from_ne_bytes(buf); + assert_eq!(counter, (u64::MAX - 1)); + }); + + thread1.thread().unpark(); + thread2.thread().unpark(); + thread3.thread().unpark(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); +}