Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build/instructions_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ enum SystemClauseType {
GetChar,
#[strum_discriminants(strum(props(Arity = "3", Name = "$get_n_chars")))]
GetNChars,
#[strum_discriminants(strum(props(Arity = "4", Name = "$get_n_chars")))]
GetNCharsWithTimeout,
#[strum_discriminants(strum(props(Arity = "2", Name = "$get_code")))]
GetCode,
#[strum_discriminants(strum(props(Arity = "1", Name = "$get_single_char")))]
Expand Down Expand Up @@ -1737,6 +1739,7 @@ fn generate_instruction_preface() -> TokenStream {
&Instruction::CallGetByte |
&Instruction::CallGetChar |
&Instruction::CallGetNChars |
&Instruction::CallGetNCharsWithTimeout |
&Instruction::CallGetCode |
&Instruction::CallGetSingleChar |
&Instruction::CallTruncateIfNoLiftedHeapGrowthDiff |
Expand Down Expand Up @@ -1998,6 +2001,7 @@ fn generate_instruction_preface() -> TokenStream {
&Instruction::ExecuteGetByte |
&Instruction::ExecuteGetChar |
&Instruction::ExecuteGetNChars |
&Instruction::ExecuteGetNCharsWithTimeout |
&Instruction::ExecuteGetCode |
&Instruction::ExecuteGetSingleChar |
&Instruction::ExecuteTruncateIfNoLiftedHeapGrowthDiff |
Expand Down
24 changes: 24 additions & 0 deletions src/lib/charsio.pl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
chars_utf8bytes/2,
get_single_char/1,
get_n_chars/3,
get_n_chars/4,
get_line_to_chars/3,
read_from_chars/2,
read_term_from_chars/3,
Expand Down Expand Up @@ -335,6 +336,29 @@
'$get_n_chars'(Stream, N, Cs)
).

%% get_n_chars(+Stream, ?N, -Chars, +Timeout).
%
% Read up to N chars from stream Stream with a timeout.
% N can be an integer (maximum chars to read) or a variable (unified with actual chars read).
%
% Timeout can be:
% - A positive integer (timeout in milliseconds)
% - 0 or negative (no timeout, same as get_n_chars/3)
% - infinity/inf (no timeout, blocks indefinitely)
% - nonblock (atom for minimal non-blocking behavior)
%
% Returns whatever data is available within the timeout period.
% On timeout, returns partial data (distinguishable from EOF).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Partial data" in the sense that it may be less than N chars?

% The stream is NOT marked as EOF on timeout.
% When N is a variable, it is unified with the number of chars actually read.
get_n_chars(Stream, N, Cs, Timeout) :-
can_be(integer, N),
( var(N) ->
'$get_n_chars'(Stream, N, Cs, Timeout)
; N >= 0,
'$get_n_chars'(Stream, N, Cs, Timeout)
).

get_n_chars_wrapper(Stream, N, Cs) :-
'$get_n_chars'(Stream, N, Cs).

Expand Down
8 changes: 8 additions & 0 deletions src/machine/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3720,6 +3720,14 @@ impl Machine {
try_or_throw!(self.machine_st, self.get_n_chars());
step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
}
&Instruction::CallGetNCharsWithTimeout => {
try_or_throw!(self.machine_st, self.get_n_chars_with_timeout());
step_or_fail!(self, self.machine_st.p += 1);
}
&Instruction::ExecuteGetNCharsWithTimeout => {
try_or_throw!(self.machine_st, self.get_n_chars_with_timeout());
step_or_fail!(self, self.machine_st.p = self.machine_st.cp);
}
&Instruction::CallGetCode => {
try_or_throw!(self.machine_st, self.get_code());
step_or_fail!(self, self.machine_st.p += 1);
Expand Down
116 changes: 115 additions & 1 deletion src/machine/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
use std::mem::ManuallyDrop;
use std::net::{Shutdown, TcpStream};
use std::ops::{Deref, DerefMut};
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::ptr;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::TryRecvError;
use std::time::Duration;

#[cfg(feature = "tls")]
use native_tls::TlsStream;
Expand Down Expand Up @@ -173,6 +176,41 @@ impl StreamLayout<CharReader<InputFileStream>> {
}
}

impl StreamLayout<CharReader<PipeReader>> {
fn load_incomplete_utf8(&mut self) {
let bytes = self.incomplete_utf8.borrow().clone();
if !bytes.is_empty() {
self.stream.prepend_bytes(&bytes);
self.incomplete_utf8.borrow_mut().clear();
}
}

pub fn read_char(&mut self) -> Option<std::io::Result<char>> {
self.load_incomplete_utf8();
self.stream.read_char()
}

pub fn peek_char(&mut self) -> Option<std::io::Result<char>> {
self.load_incomplete_utf8();
self.stream.peek_char()
}

pub fn put_back_char(&mut self, c: char) {
self.stream.put_back_char(c)
}

pub fn consume(&mut self, nread: usize) {
self.stream.consume(nread)
}
}

impl Read for StreamLayout<CharReader<PipeReader>> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.load_incomplete_utf8();
self.stream.read(buf)
}
}

#[derive(Debug)]
pub struct OutputFileStream {
file_name: Atom,
Expand Down Expand Up @@ -507,12 +545,13 @@ impl Default for StreamOptions {
}
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Clone)]
pub struct StreamLayout<T> {
pub options: StreamOptions,
pub lines_read: usize,
past_end_of_stream: bool,
stream: T,
pub(crate) incomplete_utf8: std::cell::RefCell<Vec<u8>>,
}

impl<T> StreamLayout<T> {
Expand All @@ -523,6 +562,7 @@ impl<T> StreamLayout<T> {
lines_read: 0,
past_end_of_stream: false,
stream,
incomplete_utf8: std::cell::RefCell::new(Vec::new()),
}
}
}
Expand Down Expand Up @@ -1263,6 +1303,80 @@ impl Stream {
}
}

#[inline]
pub(crate) fn set_read_timeout(&mut self, timeout: Option<Duration>) -> std::io::Result<()> {
match self {
Stream::NamedTcp(stream) => {
stream.stream.inner_mut().tcp_stream.set_read_timeout(timeout)
}
Stream::PipeReader(_stream) => {
// Pipe timeout is handled via poll_read_ready() at the read level
Ok(())
}
_ => Ok(()),
}
}

#[cfg(unix)]
pub(crate) fn poll_read_ready(&mut self, timeout: Duration) -> std::io::Result<bool> {
match self {
Stream::PipeReader(stream) => {
let fd = stream.stream.inner_mut().as_raw_fd();

let mut pollfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};

let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32;

let result = unsafe {
libc::poll(&mut pollfd as *mut libc::pollfd, 1, timeout_ms)
};

if result < 0 {
Err(std::io::Error::last_os_error())
} else if result == 0 {
// Timeout
Ok(false)
} else {
// Data available
Ok(true)
}
}
Stream::NamedTcp(stream) => {
let fd = stream.stream.inner_mut().tcp_stream.as_raw_fd();

let mut pollfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};

let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32;

let result = unsafe {
libc::poll(&mut pollfd as *mut libc::pollfd, 1, timeout_ms)
};

if result < 0 {
Err(std::io::Error::last_os_error())
} else if result == 0 {
Ok(false)
} else {
Ok(true)
}
}
_ => Ok(true), // Other streams don't need polling
}
}

#[cfg(not(unix))]
pub(crate) fn poll_read_ready(&mut self, _timeout: Duration) -> std::io::Result<bool> {
Ok(true)
}

#[inline]
pub(crate) fn position_relative_to_end(&mut self) -> AtEndOfStream {
if self.past_end_of_stream() {
Expand Down
Loading