diff --git a/src/shims/unix/unnamed_socket.rs b/src/shims/unix/unnamed_socket.rs index 86ebe95762..4fa795621c 100644 --- a/src/shims/unix/unnamed_socket.rs +++ b/src/shims/unix/unnamed_socket.rs @@ -96,25 +96,6 @@ impl FileDescription for AnonSocket { dest: &MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - // Always succeed on read size 0. - if len == 0 { - return ecx.return_read_success(ptr, &[], 0, dest); - } - - let Some(readbuf) = &self.readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("reading from the write end of a pipe"); - }; - - if readbuf.borrow().buf.is_empty() && self.is_nonblock { - // Non-blocking socketpair with writer and empty buffer. - // https://linux.die.net/man/2/read - // EAGAIN or EWOULDBLOCK can be returned for socket, - // POSIX.1-2001 allows either error to be returned for this case. - // Since there is no ErrorKind for EAGAIN, WouldBlock is used. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx) } @@ -127,30 +108,6 @@ impl FileDescription for AnonSocket { dest: &MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - // Always succeed on write size 0. - // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.") - if len == 0 { - return ecx.return_write_success(0, dest); - } - - // We are writing to our peer's readbuf. - let Some(peer_fd) = self.peer_fd().upgrade() else { - // If the upgrade from Weak to Rc fails, it indicates that all read ends have been - // closed. - return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest); - }; - - let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("writing to the reading end of a pipe"); - }; - let available_space = - MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); - if available_space == 0 && self.is_nonblock { - // Non-blocking socketpair with a full buffer. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx) } @@ -167,10 +124,15 @@ fn anonsocket_write<'tcx>( dest: MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let Some(self_ref) = weak_self_ref.upgrade() else { - // FIXME: We should raise a deadlock error if the self_ref upgrade failed. - throw_unsup_format!("This will be a deadlock error in future") - }; + // Always succeed on write size 0. + // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.") + if len == 0 { + return ecx.return_write_success(0, &dest); + } + + // If this fd is closed while blocking on `write`, any subsequent `read` on peer_fd will never + // wake up this thread. So the `unwrap` here is safe. + let self_ref = weak_self_ref.upgrade().unwrap(); let self_anonsocket = self_ref.downcast::().unwrap(); let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else { // If the upgrade from Weak to Rc fails, it indicates that all read ends have been @@ -186,24 +148,28 @@ fn anonsocket_write<'tcx>( let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); if available_space == 0 { - // Blocking socketpair with a full buffer. - let dest = dest.clone(); - self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread()); - ecx.block_thread( - BlockReason::UnnamedSocket, - None, - callback!( - @capture<'tcx> { - weak_self_ref: WeakFileDescriptionRef, - ptr: Pointer, - len: usize, - dest: MPlaceTy<'tcx>, - } - @unblock = |this| { - anonsocket_write(weak_self_ref, ptr, len, dest, this) - } - ), - ); + if self_anonsocket.is_nonblock { + // Non-blocking socketpair with a full buffer. + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, &dest); + } else { + // Blocking socketpair with a full buffer. + self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread()); + ecx.block_thread( + BlockReason::UnnamedSocket, + None, + callback!( + @capture<'tcx> { + weak_self_ref: WeakFileDescriptionRef, + ptr: Pointer, + len: usize, + dest: MPlaceTy<'tcx>, + } + @unblock = |this| { + anonsocket_write(weak_self_ref, ptr, len, dest, this) + } + ), + ); + } } else { let mut writebuf = writebuf.borrow_mut(); // Remember this clock so `read` can synchronize with us. @@ -221,8 +187,8 @@ fn anonsocket_write<'tcx>( // Notification should be provided for peer fd as it became readable. // The kernel does this even if the fd was already readable before, so we follow suit. ecx.check_and_update_readiness(&peer_fd)?; - let peer_anonsocket = peer_fd.downcast::().unwrap(); // Unblock all threads that are currently blocked on peer_fd's read. + let peer_anonsocket = peer_fd.downcast::().unwrap(); let waiting_threads = std::mem::take(&mut *peer_anonsocket.blocked_read_tid.borrow_mut()); // FIXME: We can randomize the order of unblocking. for thread_id in waiting_threads { @@ -242,10 +208,14 @@ fn anonsocket_read<'tcx>( dest: MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let Some(self_ref) = weak_self_ref.upgrade() else { - // FIXME: We should raise a deadlock error if the self_ref upgrade failed. - throw_unsup_format!("This will be a deadlock error in future") - }; + // Always succeed on read size 0. + if len == 0 { + return ecx.return_read_success(ptr, &[], 0, &dest); + } + + // If this fd is closed while blocking, it is impossible to unblock this `read` through + // another `write` in peer_fd, so the `unwrap` here is fine. + let self_ref = weak_self_ref.upgrade().unwrap(); let self_anonsocket = self_ref.downcast::().unwrap(); let Some(readbuf) = &self_anonsocket.readbuf else { @@ -255,13 +225,19 @@ fn anonsocket_read<'tcx>( }; if readbuf.borrow_mut().buf.is_empty() { - if self_anonsocket.peer_fd().upgrade().is_none() { + if self_anonsocket.is_nonblock { + // Non-blocking socketpair with writer and empty buffer. + // https://linux.die.net/man/2/read + // EAGAIN or EWOULDBLOCK can be returned for socket, + // POSIX.1-2001 allows either error to be returned for this case. + // Since there is no ErrorKind for EAGAIN, WouldBlock is used. + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, &dest); + } else if self_anonsocket.peer_fd().upgrade().is_none() { // Socketpair with no peer and empty buffer. // 0 bytes successfully read indicates end-of-file. return ecx.return_read_success(ptr, &[], 0, &dest); } else { // Blocking socketpair with writer and empty buffer. - let weak_self_ref = weak_self_ref.clone(); self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread()); ecx.block_thread( BlockReason::UnnamedSocket, @@ -303,8 +279,8 @@ fn anonsocket_read<'tcx>( // notifications than the real system. if let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() { ecx.check_and_update_readiness(&peer_fd)?; - let peer_anonsocket = peer_fd.downcast::().unwrap(); // Unblock all threads that are currently blocked on peer_fd's write. + let peer_anonsocket = peer_fd.downcast::().unwrap(); let waiting_threads = std::mem::take(&mut *peer_anonsocket.blocked_write_tid.borrow_mut()); // FIXME: We can randomize the order of unblocking. diff --git a/tests/fail-dep/libc/socketpair_closed_while_read_blocking.rs b/tests/fail-dep/libc/socketpair_closed_while_read_blocking.rs new file mode 100644 index 0000000000..ba38546937 --- /dev/null +++ b/tests/fail-dep/libc/socketpair_closed_while_read_blocking.rs @@ -0,0 +1,27 @@ +//@ignore-target: windows # No libc socketpair on Windows +//@compile-flags: -Zmiri-preemption-rate=0 + +use std::thread; + +fn main() { + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let thread1 = thread::spawn(move || { + // Let this thread block on read. + let mut buf: [u8; 3] = [0; 3]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(&buf, "abc".as_bytes()); + }); + let thread2 = thread::spawn(move || { + // Close the socketpair fd while thread1 is blocking on it. + assert_eq!(unsafe { libc::close(fds[1]) }, 0); + let data = "abc".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + // This will fail because we can't write anything if the peer_fd is closed. + assert_eq!(res, -1); + }); + thread1.join().unwrap(); + thread2.join().unwrap(); +} diff --git a/tests/fail-dep/libc/socketpair_closed_while_read_blocking.stderr b/tests/fail-dep/libc/socketpair_closed_while_read_blocking.stderr new file mode 100644 index 0000000000..bb1709fda6 --- /dev/null +++ b/tests/fail-dep/libc/socketpair_closed_while_read_blocking.stderr @@ -0,0 +1,35 @@ +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +error: deadlock: the evaluated program deadlocked + --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + | +LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE: + = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC + = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC +note: inside `main` + --> tests/fail-dep/libc/socketpair_closed_while_read_blocking.rs:LL:CC + | +LL | thread1.join().unwrap(); + | ^^^^^^^^^^^^^^ + +error: deadlock: the evaluated program deadlocked + --> tests/fail-dep/libc/socketpair_closed_while_read_blocking.rs:LL:CC + | +LL | let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE on thread `unnamed-ID`: + = note: inside closure at tests/fail-dep/libc/socketpair_closed_while_read_blocking.rs:LL:CC + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 3 previous errors + diff --git a/tests/fail-dep/libc/socketpair_closed_while_write_blocking.rs b/tests/fail-dep/libc/socketpair_closed_while_write_blocking.rs new file mode 100644 index 0000000000..1d7b46abf6 --- /dev/null +++ b/tests/fail-dep/libc/socketpair_closed_while_write_blocking.rs @@ -0,0 +1,31 @@ +//@ignore-target: windows # No libc socketpair on Windows +//@compile-flags: -Zmiri-preemption-rate=0 + +use std::thread; + +fn main() { + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let arr1: [u8; 212992] = [1; 212992]; + // Exhaust the space in the buffer so the subsequent write will block. + let res = unsafe { libc::write(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) }; + assert_eq!(res, 212992); + let thread1 = thread::spawn(move || { + let data = "abc".as_bytes().as_ptr(); + // The write below will be blocked because the buffer is already full. + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + assert_eq!(res, 3); + }); + let thread2 = thread::spawn(move || { + // Close the socketpair fd while thread1 is blocking on it. + assert_eq!(unsafe { libc::close(fds[0]) }, 0); + // Unblock thread1 by freeing up some space. + let mut buf: [u8; 3] = [0; 3]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(buf, [1, 1, 1]); + }); + thread1.join().unwrap(); + thread2.join().unwrap(); +} diff --git a/tests/fail-dep/libc/socketpair_closed_while_write_blocking.stderr b/tests/fail-dep/libc/socketpair_closed_while_write_blocking.stderr new file mode 100644 index 0000000000..3ce20a1c3e --- /dev/null +++ b/tests/fail-dep/libc/socketpair_closed_while_write_blocking.stderr @@ -0,0 +1,35 @@ +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +error: deadlock: the evaluated program deadlocked + --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + | +LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE: + = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC + = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC +note: inside `main` + --> tests/fail-dep/libc/socketpair_closed_while_write_blocking.rs:LL:CC + | +LL | thread1.join().unwrap(); + | ^^^^^^^^^^^^^^ + +error: deadlock: the evaluated program deadlocked + --> tests/fail-dep/libc/socketpair_closed_while_write_blocking.rs:LL:CC + | +LL | let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE on thread `unnamed-ID`: + = note: inside closure at tests/fail-dep/libc/socketpair_closed_while_write_blocking.rs:LL:CC + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 3 previous errors +