From 2226854357f6e2b1c882a8b937d26d97d604aecc Mon Sep 17 00:00:00 2001 From: shamb0 Date: Tue, 24 Dec 2024 12:08:26 +0530 Subject: [PATCH] Add callback support to FileDescription - Implementing atomic reads for contiguous buffers - Supports read operations with callback-based completion. Signed-off-by: shamb0 --- src/shims/files.rs | 35 +++++-- src/shims/unix/fd.rs | 27 ++++- src/shims/unix/fs.rs | 146 +++++++++++++++++++++++++-- src/shims/unix/linux_like/eventfd.rs | 75 ++++++++------ src/shims/unix/unnamed_socket.rs | 105 ++++++++++--------- 5 files changed, 288 insertions(+), 100 deletions(-) diff --git a/src/shims/files.rs b/src/shims/files.rs index 73425eee51..9dd59bc8dc 100644 --- a/src/shims/files.rs +++ b/src/shims/files.rs @@ -121,6 +121,10 @@ impl FileDescriptionExt for T { pub type DynFileDescriptionRef = FileDescriptionRef; +/// Represents a dynamic callback for file I/O operations that is invoked upon completion. +/// The callback receives either the number of bytes successfully read (u64) or an IoError. +pub type DynFileDescriptionCallback<'tcx> = DynMachineCallback<'tcx, Result>; + impl FileDescriptionRef { pub fn downcast(self) -> Option> { let inner = self.into_rc_any().downcast::>().ok()?; @@ -134,13 +138,14 @@ pub trait FileDescription: std::fmt::Debug + FileDescriptionExt { /// Reads as much as possible into the given buffer `ptr`. /// `len` indicates how many bytes we should try to read. - /// `dest` is where the return value should be stored: number of bytes read, or `-1` in case of error. + /// `finish` Callback to be invoked on operation completion with bytes read or error + #[allow(dead_code)] fn read<'tcx>( self: FileDescriptionRef, _communicate_allowed: bool, _ptr: Pointer, _len: usize, - _dest: &MPlaceTy<'tcx>, + _finish: DynFileDescriptionCallback<'tcx>, _ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { throw_unsup_format!("cannot read from {}", self.name()); @@ -207,18 +212,32 @@ impl FileDescription for io::Stdin { communicate_allowed: bool, ptr: Pointer, len: usize, - dest: &MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let mut bytes = vec![0; len]; + // First handle isolation mode check if !communicate_allowed { // We want isolation mode to be deterministic, so we have to disallow all reads, even stdin. helpers::isolation_abort_error("`read` from stdin")?; } - let result = Read::read(&mut &*self, &mut bytes); - match result { - Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest), - Err(e) => ecx.set_last_error_and_return(e, dest), + + let mut bytes = vec![0; len]; + + match Read::read(&mut &*self, &mut bytes) { + Ok(actual_read_size) => { + // Write the successfully read bytes to the destination pointer + ecx.write_bytes_ptr(ptr, bytes[..actual_read_size].iter().copied())?; + + let Ok(read_size) = u64::try_from(actual_read_size) else { + throw_unsup_format!( + "Read operation returned size {} which exceeds maximum allowed value", + actual_read_size + ) + }; + + finish.call(ecx, Ok(read_size)) + } + Err(e) => finish.call(ecx, Err(e.into())), } } diff --git a/src/shims/unix/fd.rs b/src/shims/unix/fd.rs index 0b59490308..eac6df0481 100644 --- a/src/shims/unix/fd.rs +++ b/src/shims/unix/fd.rs @@ -7,7 +7,7 @@ use std::io::ErrorKind; use rustc_abi::Size; use crate::helpers::check_min_arg_count; -use crate::shims::files::FileDescription; +use crate::shims::files::{DynFileDescriptionCallback, FileDescription}; use crate::shims::unix::linux_like::epoll::EpollReadyEvents; use crate::shims::unix::*; use crate::*; @@ -203,7 +203,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { interp_ok(Scalar::from_i32(this.try_unwrap_io_result(result)?)) } - /// Read data from `fd` into buffer specified by `buf` and `count`. + /// Reads data from a file descriptor using callback-based completion. /// /// If `offset` is `None`, reads data from current cursor position associated with `fd` /// and updates cursor position on completion. Otherwise, reads from the specified offset @@ -239,13 +239,32 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { return this.set_last_error_and_return(LibcError("EBADF"), dest); }; - trace!("read: FD mapped to {fd:?}"); + trace!("Reading from FD {}, size {}, offset {:?}", fd_num, count, offset); // We want to read at most `count` bytes. We are sure that `count` is not negative // because it was a target's `usize`. Also we are sure that its smaller than // `usize::MAX` because it is bounded by the host's `isize`. + // Clone the result destination for use in the completion callback + let result_destination = dest.clone(); + + let completion_callback: DynFileDescriptionCallback<'tcx> = callback!( + @capture<'tcx> { + result_destination: MPlaceTy<'tcx>, + } + |this, read_result: Result| { + match read_result { + Ok(read_size) => { + this.write_int(read_size, &result_destination) + } + Err(_err_code) => { + this.set_last_error_and_return(LibcError("EIO"), &result_destination) + } + } + } + ); + match offset { - None => fd.read(communicate, buf, count, dest, this)?, + None => fd.read(communicate, buf, count, completion_callback, this)?, Some(offset) => { let Ok(offset) = u64::try_from(offset) else { return this.set_last_error_and_return(LibcError("EINVAL"), dest); diff --git a/src/shims/unix/fs.rs b/src/shims/unix/fs.rs index 25594b7803..2224e43a5b 100644 --- a/src/shims/unix/fs.rs +++ b/src/shims/unix/fs.rs @@ -14,7 +14,10 @@ use rustc_data_structures::fx::FxHashMap; use self::shims::time::system_time_to_duration; use crate::helpers::check_min_arg_count; -use crate::shims::files::{EvalContextExt as _, FileDescription, FileDescriptionRef}; +use crate::shims::files::{ + DynFileDescriptionCallback, EvalContextExt as _, FileDescription, FileDescriptionRef, + WeakFileDescriptionRef, +}; use crate::shims::os_str::bytes_to_os_str; use crate::shims::unix::fd::{FlockOp, UnixFileDescription}; use crate::*; @@ -23,6 +26,92 @@ use crate::*; struct FileHandle { file: File, writable: bool, + /// Mutex for synchronizing file access across threads. + file_lock: MutexRef, +} + +impl VisitProvenance for FileHandle { + fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { + // No provenance tracking needed for FileHandle as it contains no references. + // This implementation satisfies the trait requirement but performs no operations. + } +} + +impl FileHandle { + /// Creates a new FileHandle with specified permissions and synchronization primitive. + fn new(file: File, writable: bool, file_lock: MutexRef) -> Self { + Self { file, writable, file_lock } + } + + /// Attempts to create a clone of the file handle while preserving all attributes. + /// + /// # Errors + /// Returns an `InterpResult` error if file handle cloning fails. + fn try_clone<'tcx>(&self) -> InterpResult<'tcx, FileHandle> { + let cloned_file = self + .file + .try_clone() + .map_err(|e| err_unsup_format!("Failed to clone file handle: {}", e))?; + + interp_ok(FileHandle { + file: cloned_file, + writable: self.writable, + file_lock: self.file_lock.clone(), + }) + } + + /// Performs a synchronized file read with callback completion. + fn perform_read<'tcx>( + this: &mut MiriInterpCx<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, + mut file_handle: FileHandle, + weak_fd: WeakFileDescriptionRef, + buffer_ptr: Pointer, + length: usize, + ) -> InterpResult<'tcx> { + this.mutex_lock(&file_handle.file_lock); + + let result = { + // Verify file descriptor is still valid. + if weak_fd.upgrade().is_none() { + throw_unsup_format!("file got closed while blocking") + } + + let mut bytes = vec![0; length]; + let read_result = file_handle.file.read(&mut bytes); + + // Handle the read result. + match read_result { + Ok(read_size) => { + // Write the bytes to memory. + if let Err(err_code) = this + .write_bytes_ptr(buffer_ptr, bytes[..read_size].iter().copied()) + .report_err() + { + throw_unsup_format!( + "Memory write failed during file read operation: {:#?}", + err_code + ) + } + + let Ok(read_size) = u64::try_from(read_size) else { + throw_unsup_format!( + "Read operation returned size {} which exceeds maximum allowed value", + read_size + ) + }; + + finish.call(this, Ok(read_size)) + } + Err(err_code) => finish.call(this, Err(err_code.into())), + } + }; + + // Always unlock the mutex, even if the read operation failed. + this.mutex_unlock(&file_handle.file_lock)?; + + result + } } impl FileDescription for FileHandle { @@ -35,15 +124,42 @@ impl FileDescription for FileHandle { communicate_allowed: bool, ptr: Pointer, len: usize, - dest: &MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { + let this = ecx; assert!(communicate_allowed, "isolation should have prevented even opening a file"); - let mut bytes = vec![0; len]; - let result = (&mut &self.file).read(&mut bytes); - match result { - Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest), - Err(e) => ecx.set_last_error_and_return(e, dest), + + // Clone the underlying File. + let clone_file_handle = match self.try_clone().report_err() { + Ok(handle) => handle, + Err(ec) => throw_unsup_format!("unable to clone file discp {:#?}", ec), + }; + + let weak_fd = FileDescriptionRef::downgrade(&self); + + if this.mutex_is_locked(&self.file_lock) { + this.block_thread( + BlockReason::Mutex, + None, + callback!( + @capture<'tcx> { + finish: DynFileDescriptionCallback<'tcx>, + clone_file_handle: FileHandle, + weak_fd: WeakFileDescriptionRef, + ptr: Pointer, + len: usize, + } + |this, unblock: UnblockKind| { + assert_eq!(unblock, UnblockKind::Ready); + FileHandle::perform_read(this, finish, clone_file_handle, weak_fd, ptr, len) + } + ), + ); + + unreachable!() + } else { + FileHandle::perform_read(this, finish, clone_file_handle, weak_fd, ptr, len) } } @@ -584,9 +700,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { return this.set_last_error_and_return_i32(ErrorKind::PermissionDenied); } - let fd = options - .open(path) - .map(|file| this.machine.fds.insert_new(FileHandle { file, writable })); + let fd = options.open(path).map(|file| { + this.machine.fds.insert_new(FileHandle::new( + file, + writable, + this.machine.sync.mutex_create(), + )) + }); interp_ok(Scalar::from_i32(this.try_unwrap_io_result(fd)?)) } @@ -1645,7 +1765,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { match file { Ok(f) => { - let fd = this.machine.fds.insert_new(FileHandle { file: f, writable: true }); + let fd = this.machine.fds.insert_new(FileHandle::new( + f, + true, + this.machine.sync.mutex_create(), + )); return interp_ok(Scalar::from_i32(fd)); } Err(e) => diff --git a/src/shims/unix/linux_like/eventfd.rs b/src/shims/unix/linux_like/eventfd.rs index 4b76bbb2b4..409fc9cd73 100644 --- a/src/shims/unix/linux_like/eventfd.rs +++ b/src/shims/unix/linux_like/eventfd.rs @@ -4,7 +4,9 @@ use std::io; use std::io::ErrorKind; use crate::concurrency::VClock; -use crate::shims::files::{FileDescription, FileDescriptionRef, WeakFileDescriptionRef}; +use crate::shims::files::{ + DynFileDescriptionCallback, FileDescription, FileDescriptionRef, WeakFileDescriptionRef, +}; use crate::shims::unix::UnixFileDescription; use crate::shims::unix::linux_like::epoll::{EpollReadyEvents, EvalContextExt as _}; use crate::*; @@ -51,20 +53,22 @@ impl FileDescription for EventFd { _communicate_allowed: bool, ptr: Pointer, len: usize, - dest: &MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { // We're treating the buffer as a `u64`. let ty = ecx.machine.layouts.u64; + // Check the size of slice, and return error only if the size of the slice < 8. if len < ty.size.bytes_usize() { - return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest); + return finish.call(ecx, Err(ErrorKind::InvalidInput.into())); } // Turn the pointer into a place at the right type. let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty); - eventfd_read(buf_place, dest, self, ecx) + // Now handle the eventfd read operation with callbacks + eventfd_read(buf_place, finish, self, ecx) } /// A write call adds the 8-byte integer value supplied in @@ -260,7 +264,7 @@ fn eventfd_write<'tcx>( /// else just return the current counter value to the caller and set the counter to 0. fn eventfd_read<'tcx>( buf_place: MPlaceTy<'tcx>, - dest: &MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, eventfd: FileDescriptionRef, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { @@ -270,20 +274,24 @@ fn eventfd_read<'tcx>( // Block when counter == 0. if counter == 0 { if eventfd.is_nonblock { - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); + // For non-blocking mode, return WouldBlock error through callback + return finish.call(ecx, Err(ErrorKind::WouldBlock.into())); } - let dest = dest.clone(); + // Add current thread to blocked readers list eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread()); + // Create weak reference for blocking operation let weak_eventfd = FileDescriptionRef::downgrade(&eventfd); + + // Block the thread with a completion callback ecx.block_thread( BlockReason::Eventfd, None, callback!( @capture<'tcx> { buf_place: MPlaceTy<'tcx>, - dest: MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, weak_eventfd: WeakFileDescriptionRef, } |this, unblock: UnblockKind| { @@ -291,33 +299,38 @@ fn eventfd_read<'tcx>( // When we get unblocked, try again. We know the ref is still valid, // otherwise there couldn't be a `write` that unblocks us. let eventfd_ref = weak_eventfd.upgrade().unwrap(); - eventfd_read(buf_place, &dest, eventfd_ref, this) + eventfd_read(buf_place, finish, eventfd_ref, this) } ), ); - } else { - // Synchronize with all prior `write` calls to this FD. - ecx.acquire_clock(&eventfd.clock.borrow()); - - // Return old counter value into user-space buffer. - ecx.write_int(counter, &buf_place)?; - - // Unblock *all* threads previously blocked on `write`. - // We need to take out the blocked thread ids and unblock them together, - // because `unblock_threads` may block them again and end up re-adding the - // thread to the blocked list. - let waiting_threads = std::mem::take(&mut *eventfd.blocked_write_tid.borrow_mut()); - // FIXME: We can randomize the order of unblocking. - for thread_id in waiting_threads { - ecx.unblock_thread(thread_id, BlockReason::Eventfd)?; - } + return interp_ok(()); + } - // The state changed; we check and update the status of all supported event - // types for current file description. - ecx.check_and_update_readiness(eventfd)?; + // Handle successful read case - // Tell userspace how many bytes we put into the buffer. - return ecx.write_int(buf_place.layout.size.bytes(), dest); + // Synchronize with all prior `write` calls to this FD. + ecx.acquire_clock(&eventfd.clock.borrow()); + + // Return old counter value into user-space buffer. + ecx.write_int(counter, &buf_place)?; + + // Unblock *all* threads previously blocked on `write`. + // We need to take out the blocked thread ids and unblock them together, + // because `unblock_threads` may block them again and end up re-adding the + // thread to the blocked list. + let waiting_threads = std::mem::take(&mut *eventfd.blocked_write_tid.borrow_mut()); + + // FIXME: We can randomize the order of unblocking. + for thread_id in waiting_threads { + ecx.unblock_thread(thread_id, BlockReason::Eventfd)?; } - interp_ok(()) + + // The state changed; we check and update the status of all supported event + // types for current file description. + ecx.check_and_update_readiness(eventfd)?; + + // Complete the operation with the number of bytes written + let bytes_written = buf_place.layout.size.bytes(); + + finish.call(ecx, Ok(bytes_written)) } diff --git a/src/shims/unix/unnamed_socket.rs b/src/shims/unix/unnamed_socket.rs index 08515b815a..ed8c298c87 100644 --- a/src/shims/unix/unnamed_socket.rs +++ b/src/shims/unix/unnamed_socket.rs @@ -11,7 +11,8 @@ use rustc_abi::Size; use crate::concurrency::VClock; use crate::shims::files::{ - EvalContextExt as _, FileDescription, FileDescriptionRef, WeakFileDescriptionRef, + DynFileDescriptionCallback, EvalContextExt as _, FileDescription, FileDescriptionRef, + WeakFileDescriptionRef, }; use crate::shims::unix::UnixFileDescription; use crate::shims::unix::linux_like::epoll::{EpollReadyEvents, EvalContextExt as _}; @@ -92,10 +93,10 @@ impl FileDescription for AnonSocket { _communicate_allowed: bool, ptr: Pointer, len: usize, - dest: &MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - anonsocket_read(self, len, ptr, dest, ecx) + anonsocket_read(self, len, ptr, finish, ecx) } fn write<'tcx>( @@ -202,17 +203,18 @@ fn anonsocket_write<'tcx>( interp_ok(()) } -/// Read from AnonSocket and return the number of bytes read. +/// Reads from an unnamed socket with proper handling of blocking states. +/// Handles EOF detection, non-blocking reads, and writer notification. fn anonsocket_read<'tcx>( self_ref: FileDescriptionRef, len: usize, ptr: Pointer, - dest: &MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { // Always succeed on read size 0. if len == 0 { - return ecx.return_read_success(ptr, &[], 0, dest); + return finish.call(ecx, Ok(0)); } let Some(readbuf) = &self_ref.readbuf else { @@ -221,24 +223,24 @@ fn anonsocket_read<'tcx>( throw_unsup_format!("reading from the write end of a pipe") }; + // Handle empty buffer cases if readbuf.borrow_mut().buf.is_empty() { if self_ref.peer_fd().upgrade().is_none() { // Socketpair with no peer and empty buffer. // 0 bytes successfully read indicates end-of-file. - return ecx.return_read_success(ptr, &[], 0, dest); + return finish.call(ecx, Ok(0)); } else if self_ref.is_nonblock { // Non-blocking socketpair with writer and empty buffer. // https://linux.die.net/man/2/read // EAGAIN or EWOULDBLOCK can be returned for socket, // POSIX.1-2001 allows either error to be returned for this case. // Since there is no ErrorKind for EAGAIN, WouldBlock is used. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); + return finish.call(ecx, Err(ErrorKind::WouldBlock.into())); } else { self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread()); // Blocking socketpair with writer and empty buffer. // Block the current thread; only keep a weak ref for this. let weak_self_ref = FileDescriptionRef::downgrade(&self_ref); - let dest = dest.clone(); ecx.block_thread( BlockReason::UnnamedSocket, None, @@ -247,55 +249,66 @@ fn anonsocket_read<'tcx>( weak_self_ref: WeakFileDescriptionRef, len: usize, ptr: Pointer, - dest: MPlaceTy<'tcx>, + finish: DynFileDescriptionCallback<'tcx>, } |this, unblock: UnblockKind| { assert_eq!(unblock, UnblockKind::Ready); // If we got unblocked, then our peer successfully upgraded its weak // ref to us. That means we can also upgrade our weak ref. let self_ref = weak_self_ref.upgrade().unwrap(); - anonsocket_read(self_ref, len, ptr, &dest, this) + anonsocket_read(self_ref, len, ptr, finish, this) } ), ); + return interp_ok(()); } - } else { - // There's data to be read! - let mut bytes = vec![0; len]; - let mut readbuf = readbuf.borrow_mut(); - // Synchronize with all previous writes to this buffer. - // FIXME: this over-synchronizes; a more precise approach would be to - // only sync with the writes whose data we will read. - ecx.acquire_clock(&readbuf.clock); - - // Do full read / partial read based on the space available. - // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. - let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap(); - - // Need to drop before others can access the readbuf again. - drop(readbuf); - - // A notification should be provided for the peer file description even when it can - // only write 1 byte. This implementation is not compliant with the actual Linux kernel - // implementation. For optimization reasons, the kernel will only mark the file description - // as "writable" when it can write more than a certain number of bytes. Since we - // don't know what that *certain number* is, we will provide a notification every time - // a read is successful. This might result in our epoll emulation providing more - // notifications than the real system. - if let Some(peer_fd) = self_ref.peer_fd().upgrade() { - // Unblock all threads that are currently blocked on peer_fd's write. - let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut()); - // FIXME: We can randomize the order of unblocking. - for thread_id in waiting_threads { - ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?; - } - // Notify epoll waiters. - ecx.check_and_update_readiness(peer_fd)?; - }; + } - return ecx.return_read_success(ptr, &bytes, actual_read_size, dest); + // There's data to be read! + let mut bytes = vec![0; len]; + let mut readbuf = readbuf.borrow_mut(); + // Synchronize with all previous writes to this buffer. + // FIXME: this over-synchronizes; a more precise approach would be to + // only sync with the writes whose data we will read. + ecx.acquire_clock(&readbuf.clock); + + // Do full read / partial read based on the space available. + // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. + let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap(); + + // Need to drop before others can access the readbuf again. + drop(readbuf); + + // A notification should be provided for the peer file description even when it can + // only write 1 byte. This implementation is not compliant with the actual Linux kernel + // implementation. For optimization reasons, the kernel will only mark the file description + // as "writable" when it can write more than a certain number of bytes. Since we + // don't know what that *certain number* is, we will provide a notification every time + // a read is successful. This might result in our epoll emulation providing more + // notifications than the real system. + if let Some(peer_fd) = self_ref.peer_fd().upgrade() { + // Unblock all threads that are currently blocked on peer_fd's write. + let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut()); + // FIXME: We can randomize the order of unblocking. + for thread_id in waiting_threads { + ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?; + } + // Notify epoll waiters. + ecx.check_and_update_readiness(peer_fd)?; } - interp_ok(()) + + // Write the data to the destination pointer + ecx.write_bytes_ptr(ptr, bytes[..actual_read_size].iter().copied())?; + + let Ok(read_size) = u64::try_from(actual_read_size) else { + throw_unsup_format!( + "Read operation returned size {} which exceeds maximum allowed value", + actual_read_size + ) + }; + + // Complete the operation with the number of bytes read + finish.call(ecx, Ok(read_size)) } impl UnixFileDescription for AnonSocket {