diff --git a/build/instructions_template.rs b/build/instructions_template.rs index 6b5ba80e9..ad7af45f8 100644 --- a/build/instructions_template.rs +++ b/build/instructions_template.rs @@ -293,6 +293,8 @@ enum SystemClauseType { GetChar, #[strum_discriminants(strum(props(Arity = "3", Name = "$get_n_chars")))] GetNChars, + #[strum_discriminants(strum(props(Arity = "4", Name = "$get_n_chars")))] + GetNCharsWithTimeout, #[strum_discriminants(strum(props(Arity = "2", Name = "$get_code")))] GetCode, #[strum_discriminants(strum(props(Arity = "1", Name = "$get_single_char")))] @@ -1737,6 +1739,7 @@ fn generate_instruction_preface() -> TokenStream { &Instruction::CallGetByte | &Instruction::CallGetChar | &Instruction::CallGetNChars | + &Instruction::CallGetNCharsWithTimeout | &Instruction::CallGetCode | &Instruction::CallGetSingleChar | &Instruction::CallTruncateIfNoLiftedHeapGrowthDiff | @@ -1998,6 +2001,7 @@ fn generate_instruction_preface() -> TokenStream { &Instruction::ExecuteGetByte | &Instruction::ExecuteGetChar | &Instruction::ExecuteGetNChars | + &Instruction::ExecuteGetNCharsWithTimeout | &Instruction::ExecuteGetCode | &Instruction::ExecuteGetSingleChar | &Instruction::ExecuteTruncateIfNoLiftedHeapGrowthDiff | diff --git a/src/lib/charsio.pl b/src/lib/charsio.pl index 536aeac69..083027e51 100644 --- a/src/lib/charsio.pl +++ b/src/lib/charsio.pl @@ -10,6 +10,7 @@ chars_utf8bytes/2, get_single_char/1, get_n_chars/3, + get_n_chars/4, get_line_to_chars/3, read_from_chars/2, read_term_from_chars/3, @@ -335,6 +336,29 @@ '$get_n_chars'(Stream, N, Cs) ). +%% get_n_chars(+Stream, ?N, -Chars, +Timeout). +% +% Read up to N chars from stream Stream with a timeout. +% N can be an integer (maximum chars to read) or a variable (unified with actual chars read). +% +% Timeout can be: +% - A positive integer (timeout in milliseconds) +% - 0 or negative (no timeout, same as get_n_chars/3) +% - infinity/inf (no timeout, blocks indefinitely) +% - nonblock (atom for minimal non-blocking behavior) +% +% Returns whatever data is available within the timeout period. +% On timeout, returns partial data (distinguishable from EOF). +% The stream is NOT marked as EOF on timeout. +% When N is a variable, it is unified with the number of chars actually read. +get_n_chars(Stream, N, Cs, Timeout) :- + can_be(integer, N), + ( var(N) -> + '$get_n_chars'(Stream, N, Cs, Timeout) + ; N >= 0, + '$get_n_chars'(Stream, N, Cs, Timeout) + ). + get_n_chars_wrapper(Stream, N, Cs) :- '$get_n_chars'(Stream, N, Cs). diff --git a/src/machine/dispatch.rs b/src/machine/dispatch.rs index fc13d8258..9eaa08e4e 100644 --- a/src/machine/dispatch.rs +++ b/src/machine/dispatch.rs @@ -3720,6 +3720,14 @@ impl Machine { try_or_throw!(self.machine_st, self.get_n_chars()); step_or_fail!(self, self.machine_st.p = self.machine_st.cp); } + &Instruction::CallGetNCharsWithTimeout => { + try_or_throw!(self.machine_st, self.get_n_chars_with_timeout()); + step_or_fail!(self, self.machine_st.p += 1); + } + &Instruction::ExecuteGetNCharsWithTimeout => { + try_or_throw!(self.machine_st, self.get_n_chars_with_timeout()); + step_or_fail!(self, self.machine_st.p = self.machine_st.cp); + } &Instruction::CallGetCode => { try_or_throw!(self.machine_st, self.get_code()); step_or_fail!(self, self.machine_st.p += 1); diff --git a/src/machine/streams.rs b/src/machine/streams.rs index c0f5c7b36..73778229d 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -27,10 +27,13 @@ use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write}; use std::mem::ManuallyDrop; use std::net::{Shutdown, TcpStream}; use std::ops::{Deref, DerefMut}; +#[cfg(unix)] +use std::os::unix::io::AsRawFd; use std::path::PathBuf; use std::ptr; use std::sync::mpsc::Receiver; use std::sync::mpsc::TryRecvError; +use std::time::Duration; #[cfg(feature = "tls")] use native_tls::TlsStream; @@ -173,6 +176,41 @@ impl StreamLayout> { } } +impl StreamLayout> { + fn load_incomplete_utf8(&mut self) { + let bytes = self.incomplete_utf8.borrow().clone(); + if !bytes.is_empty() { + self.stream.prepend_bytes(&bytes); + self.incomplete_utf8.borrow_mut().clear(); + } + } + + pub fn read_char(&mut self) -> Option> { + self.load_incomplete_utf8(); + self.stream.read_char() + } + + pub fn peek_char(&mut self) -> Option> { + self.load_incomplete_utf8(); + self.stream.peek_char() + } + + pub fn put_back_char(&mut self, c: char) { + self.stream.put_back_char(c) + } + + pub fn consume(&mut self, nread: usize) { + self.stream.consume(nread) + } +} + +impl Read for StreamLayout> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.load_incomplete_utf8(); + self.stream.read(buf) + } +} + #[derive(Debug)] pub struct OutputFileStream { file_name: Atom, @@ -507,12 +545,13 @@ impl Default for StreamOptions { } } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] pub struct StreamLayout { pub options: StreamOptions, pub lines_read: usize, past_end_of_stream: bool, stream: T, + pub(crate) incomplete_utf8: std::cell::RefCell>, } impl StreamLayout { @@ -523,6 +562,7 @@ impl StreamLayout { lines_read: 0, past_end_of_stream: false, stream, + incomplete_utf8: std::cell::RefCell::new(Vec::new()), } } } @@ -1263,6 +1303,80 @@ impl Stream { } } + #[inline] + pub(crate) fn set_read_timeout(&mut self, timeout: Option) -> std::io::Result<()> { + match self { + Stream::NamedTcp(stream) => { + stream.stream.inner_mut().tcp_stream.set_read_timeout(timeout) + } + Stream::PipeReader(_stream) => { + // Pipe timeout is handled via poll_read_ready() at the read level + Ok(()) + } + _ => Ok(()), + } + } + + #[cfg(unix)] + pub(crate) fn poll_read_ready(&mut self, timeout: Duration) -> std::io::Result { + match self { + Stream::PipeReader(stream) => { + let fd = stream.stream.inner_mut().as_raw_fd(); + + let mut pollfd = libc::pollfd { + fd, + events: libc::POLLIN, + revents: 0, + }; + + let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32; + + let result = unsafe { + libc::poll(&mut pollfd as *mut libc::pollfd, 1, timeout_ms) + }; + + if result < 0 { + Err(std::io::Error::last_os_error()) + } else if result == 0 { + // Timeout + Ok(false) + } else { + // Data available + Ok(true) + } + } + Stream::NamedTcp(stream) => { + let fd = stream.stream.inner_mut().tcp_stream.as_raw_fd(); + + let mut pollfd = libc::pollfd { + fd, + events: libc::POLLIN, + revents: 0, + }; + + let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32; + + let result = unsafe { + libc::poll(&mut pollfd as *mut libc::pollfd, 1, timeout_ms) + }; + + if result < 0 { + Err(std::io::Error::last_os_error()) + } else if result == 0 { + Ok(false) + } else { + Ok(true) + } + } + _ => Ok(true), // Other streams don't need polling + } + } + + #[cfg(not(unix))] + pub(crate) fn poll_read_ready(&mut self, _timeout: Duration) -> std::io::Result { + Ok(true) + } + #[inline] pub(crate) fn position_relative_to_end(&mut self) -> AtEndOfStream { if self.past_end_of_stream() { diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 5fc4088b1..080ac2fca 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -3574,27 +3574,151 @@ impl Machine { #[inline(always)] pub(crate) fn get_n_chars(&mut self) -> CallResult { + self.get_n_chars_impl(3) + } + + #[inline(always)] + pub(crate) fn get_n_chars_with_timeout(&mut self) -> CallResult { + self.get_n_chars_impl(4) + } + + fn parse_timeout(&mut self, register_idx: usize) -> CallResult> { + let timeout_term = self.deref_register(register_idx); + + // Try to parse as a number first + if let Ok(number) = Number::try_from((timeout_term, &self.machine_st.arena.f64_tbl)) { + match number { + Number::Fixnum(n) => { + let ms = n.get_num(); + if ms <= 0 { + // Zero or negative means no timeout (infinite) + return Ok(None); + } else { + return Ok(Some(Duration::from_millis(ms as u64))); + } + } + Number::Integer(n) => { + if n.sign() != Sign::Positive { + return Ok(None); + } + // Try to convert to u64, if too large treat as infinite + let ms: u64 = match (&*n).try_into() { + Ok(val) => val, + Err(_) => return Ok(None), // Too large, treat as infinite + }; + return Ok(Some(Duration::from_millis(ms))); + } + Number::Float(f) => { + let ms = f.into_inner(); + if ms <= 0.0 { + // Zero or negative means no timeout (infinite) + return Ok(None); + } else { + return Ok(Some(Duration::from_millis(ms as u64))); + } + } + _ => {} + } + } + + // Try to parse as an atom + read_heap_cell!(timeout_term, + (HeapCellValueTag::Atom, (atom, _arity)) => { + if atom == atom!("infinity") || atom == atom!("inf") { + return Ok(None); + } else if atom == atom!("nonblock") { + return Ok(Some(Duration::from_millis(0))); + } else { + // Invalid timeout atom + let stub = functor_stub(atom!("get_n_chars"), 4); + let err = self.machine_st.type_error(ValidType::Integer, timeout_term); + return Err(self.machine_st.error_form(err, stub)); + } + } + _ => { + let stub = functor_stub(atom!("get_n_chars"), 4); + let err = self.machine_st.type_error(ValidType::Integer, timeout_term); + return Err(self.machine_st.error_form(err, stub)); + } + ) + } + + fn get_n_chars_impl(&mut self, arity: usize) -> CallResult { let _guard = RawReadGuard::new(); - let stream = self.machine_st.get_stream_or_alias( + + let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], &self.indices, atom!("get_n_chars"), - 3, + arity, )?; - let num = match Number::try_from((self.deref_register(2), &self.machine_st.arena.f64_tbl)) { - Ok(Number::Fixnum(n)) => usize::try_from(n.get_num()).unwrap(), - Ok(Number::Integer(n)) => match (&*n).try_into() as Result { - Ok(u) => u, - _ => { - self.machine_st.fail = true; - return Ok(()); - } - }, + let timeout = if arity == 4 { + self.parse_timeout(4)? + } else { + None + }; + + let n_term = self.deref_register(2); + let (num, n_is_var) = read_heap_cell!(n_term, + (HeapCellValueTag::Var | HeapCellValueTag::AttrVar | HeapCellValueTag::StackVar) => { + // N is unbound - use a reasonable maximum and unify later + (16384usize, true) + } _ => { - unreachable!() + // N is bound - use it as the maximum + match Number::try_from((n_term, &self.machine_st.arena.f64_tbl)) { + Ok(Number::Fixnum(n)) => (usize::try_from(n.get_num()).unwrap(), false), + Ok(Number::Integer(n)) => match (&*n).try_into() as Result { + Ok(u) => (u, false), + _ => { + self.machine_st.fail = true; + return Ok(()); + } + }, + Ok(Number::Float(f)) => { + let val = f.into_inner(); + if val >= 0.0 && val.is_finite() { + (val as usize, false) + } else { + self.machine_st.fail = true; + return Ok(()); + } + } + _ => { + // Type error - N is not a number + let err = self.machine_st.type_error(ValidType::Integer, n_term); + let stub = functor_stub(atom!("get_n_chars"), arity); + return Err(self.machine_st.error_form(err, stub)); + } + } } - }; + ); + + if let Some(t) = timeout { + stream.set_read_timeout(Some(t)).ok(); + + // For pipes, check if data is ready with poll before reading + #[cfg(unix)] + { + use crate::machine::streams::Stream; + match stream { + Stream::PipeReader(_) => { + if !stream.poll_read_ready(t).unwrap_or(false) { + // Timeout occurred, return empty string + let output = self.deref_register(3); + let cstr_cell = resource_error_call_result!( + self.machine_st, + self.machine_st.heap.allocate_cstr("") + ); + unify!(self.machine_st, cstr_cell, output); + return Ok(()); + } + } + _ => {} + } + } + } let mut string = String::new(); @@ -3608,33 +3732,164 @@ impl Machine { string.push(c as char); } } else { - let mut iter = self.machine_st.open_parsing_stream(stream).map_err(|e| { - let err = self.machine_st.session_error(SessionError::from(e)); - let stub = functor_stub(atom!("get_n_chars"), 2); - - self.machine_st.error_form(err, stub) - })?; + // For pipes with timeout, use direct byte reading with poll + #[cfg(unix)] + use crate::machine::streams::Stream; + #[cfg(unix)] + let is_pipe_with_timeout = timeout.is_some() && matches!(stream, Stream::PipeReader(_)); + + #[cfg(unix)] + if is_pipe_with_timeout { + // Track start time for timeout calculation + use std::time::Instant; + let start_time = Instant::now(); + let timeout_duration = timeout.unwrap(); + + // Read UTF-8 bytes directly with poll before each read + let mut utf8_buf = Vec::new(); + let mut chars_read = 0; + + while chars_read < num { + // Check if we've exceeded the total timeout + let elapsed = start_time.elapsed(); + if elapsed >= timeout_duration { + break; + } - for _ in 0..num { - let result = iter.read_char(); + // Calculate remaining timeout + let remaining = timeout_duration - elapsed; - match result { - Some(Ok(c)) => { - string.push(c); + // Poll to see if data is available + match stream.poll_read_ready(remaining) { + Ok(true) => { + // Data is available, read one byte directly from inner reader to bypass buffering + let mut byte = [0u8; 1]; + let read_result = if let Stream::PipeReader(ref mut ptr) = stream { + (**ptr).inner_mut().read(&mut byte) + } else { + unreachable!() + }; + match read_result { + Ok(0) => break, // EOF + Ok(_) => { + utf8_buf.push(byte[0]); + // Try to decode accumulated bytes as UTF-8 + match std::str::from_utf8(&utf8_buf) { + Ok(s) => { + // Valid UTF-8 sequence, add characters + string.push_str(s); + chars_read += s.chars().count(); + utf8_buf.clear(); + } + Err(e) if e.error_len().is_none() => { + // Incomplete UTF-8 sequence, continue reading + continue; + } + Err(_) => { + // Invalid UTF-8, skip this byte + utf8_buf.clear(); + } + } + } + Err(e) if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::Interrupted => { + continue; + } + Err(_) => break, + } + } + Ok(false) | Err(_) => { + // Timeout or error + break; + } } - Some(Err(e)) => { - let stub = functor_stub(atom!("$get_n_chars"), 3); - let err = self.machine_st.session_error(SessionError::from(e)); + } + } else { + // Use regular CharReader for non-pipe streams or no timeout + let mut iter = self.machine_st.open_parsing_stream(stream).map_err(|e| { + let err = self.machine_st.session_error(SessionError::from(e)); + let stub = functor_stub(atom!("get_n_chars"), 2); - return Err(self.machine_st.error_form(err, stub)); + self.machine_st.error_form(err, stub) + })?; + + for _ in 0..num { + let result = iter.read_char(); + + match result { + Some(Ok(c)) => { + string.push(c); + } + Some(Err(e)) => { + if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut { + break; + } + let stub = functor_stub(atom!("$get_n_chars"), arity); + let err = self.machine_st.session_error(SessionError::from(e)); + + return Err(self.machine_st.error_form(err, stub)); + } + _ => { + break; + } } - _ => { - break; + } + } + + #[cfg(not(unix))] + { + let mut iter = self.machine_st.open_parsing_stream(stream).map_err(|e| { + let err = self.machine_st.session_error(SessionError::from(e)); + let stub = functor_stub(atom!("get_n_chars"), 2); + + self.machine_st.error_form(err, stub) + })?; + + for _ in 0..num { + let result = iter.read_char(); + + match result { + Some(Ok(c)) => { + string.push(c); + } + Some(Err(e)) => { + if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut { + break; + } + let stub = functor_stub(atom!("$get_n_chars"), arity); + let err = self.machine_st.session_error(SessionError::from(e)); + + return Err(self.machine_st.error_form(err, stub)); + } + _ => { + break; + } } } } }; + if timeout.is_some() { + stream.set_read_timeout(None).ok(); + } + + let actual_length = string.chars().count(); + + // If N was a variable, unify it with the actual length + if n_is_var { + let n_term = self.deref_register(2); + let length_number = Number::arena_from(actual_length, &mut self.machine_st.arena); + + match length_number { + Number::Fixnum(n) => { + self.machine_st.unify_fixnum(n, n_term); + } + Number::Integer(n) => { + self.machine_st.unify_big_int(n, n_term); + } + _ => unreachable!(), + } + } + let output = self.deref_register(3); let cstr_cell = resource_error_call_result!( self.machine_st, diff --git a/src/parser/char_reader.rs b/src/parser/char_reader.rs index 4d5d7d747..a9d776d19 100644 --- a/src/parser/char_reader.rs +++ b/src/parser/char_reader.rs @@ -64,6 +64,37 @@ impl CharReader { pub fn rem_buf_len(&self) -> usize { self.buf.len() - self.pos } + + // Prepend bytes to the buffer. Used for handling incomplete UTF-8 sequences. + pub fn prepend_bytes(&mut self, bytes: &[u8]) { + if bytes.is_empty() { + return; + } + + // If we haven't consumed any bytes yet (pos == 0), just insert at the beginning + if self.pos == 0 { + // Insert new bytes at the beginning + for (i, &byte) in bytes.iter().enumerate() { + self.buf.insert(i, byte); + } + } else { + // We've consumed some bytes. Replace the consumed part with our new bytes. + // This maintains the invariant that pos points to the next byte to read. + if bytes.len() <= self.pos { + // New bytes fit in the consumed space + let start = self.pos - bytes.len(); + self.buf[start..self.pos].copy_from_slice(bytes); + self.pos = start; + } else { + // New bytes don't fit, need to remove consumed bytes and insert new ones + self.buf.drain(0..self.pos); + for (i, &byte) in bytes.iter().enumerate() { + self.buf.insert(i, byte); + } + self.pos = 0; + } + } + } } pub trait CharRead { diff --git a/src/tests/get_n_chars.pl b/src/tests/get_n_chars.pl new file mode 100644 index 000000000..840179b51 --- /dev/null +++ b/src/tests/get_n_chars.pl @@ -0,0 +1,346 @@ +:- module(get_n_chars_tests, []). +:- use_module(test_framework). +:- use_module(library(charsio)). +:- use_module(library(iso_ext)). +:- use_module(library(process)). +:- use_module(library(format)). +:- use_module(library(lists)). + +% Test 1: timeout=0 should behave exactly like get_n_chars/3 +test("timeout=0 equals get_n_chars/3", ( + atom_chars('/bin/echo', Echo), + atom_chars('ABCDEFGHIJ', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out1))]), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out2))]), + ( + charsio:get_n_chars(Out1, 5, Chars1), + charsio:get_n_chars(Out2, 5, Chars2, 0), + Chars1 = Chars2 + ), + close(Out2) + ), + close(Out1) + ) +)). + +% Test 2: Variable N with timeout=0 +test("variable N with timeout=0", ( + atom_chars('/bin/echo', Echo), + atom_chars('Testing', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out1))]), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out2))]), + ( + charsio:get_n_chars(Out1, N1, Chars1), + charsio:get_n_chars(Out2, N2, Chars2, 0), + N1 = N2, + Chars1 = Chars2, + N1 = 8, + Chars1 = "Testing\n" + ), + close(Out2) + ), + close(Out1) + ) +)). + +% Test 3: Negative timeout should also mean no timeout +test("negative timeout equals no timeout", ( + atom_chars('/bin/echo', Echo), + atom_chars('NegativeTest', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out1))]), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out2))]), + ( + charsio:get_n_chars(Out1, N1, Chars1), + charsio:get_n_chars(Out2, N2, Chars2, -100), + N1 = N2, + Chars1 = Chars2 + ), + close(Out2) + ), + close(Out1) + ) +)). + +% Test 4: Positive timeout should timeout on slow output +test("positive timeout stops reading", ( + atom_chars('/usr/bin/python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; [print(c,end="",flush=True) or time.sleep(1) for c in "ABCDEFGH"]', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, N, _Chars, 2500), + N >= 2, + N =< 3 + ), + close(Out) + ) +)). + +% Test 5: infinity atom means no timeout +test("infinity atom means no timeout", ( + atom_chars('/bin/echo', Echo), + atom_chars('InfinityTest', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, N, _Chars, infinity), + N > 0 + ), + close(Out) + ) +)). + +% Test 6: Stream remains usable after timeout +test("stream usable after timeout", ( + atom_chars('/usr/bin/python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; print("A",end="",flush=True); time.sleep(2); print("B",end="",flush=True)', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, N1, Chars1, 100), + charsio:get_n_chars(Out, N2, Chars2, 3000), + N1 = 1, + Chars1 = "A", + N2 = 1, + Chars2 = "B" + ), + close(Out) + ) +)). + +% Test 7: Timeout returns partial data, not EOF +test("timeout returns partial data not EOF", ( + atom_chars('/usr/bin/python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; print("ABC",end="",flush=True); time.sleep(5); print("DEF",end="",flush=True)', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, N1, Chars1, 1000), + charsio:get_n_chars(Out, N2, Chars2, 6000), + N1 = 3, + Chars1 = "ABC", + N2 = 3, + Chars2 = "DEF" + ), + close(Out) + ) +)). + +% Test 8: Multiple reads with timeout=0 +test("multiple reads with timeout=0", ( + atom_chars('/bin/echo', Echo), + atom_chars('ABCDEFGHIJKLMNOP', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, 4, Chars1, 0), + charsio:get_n_chars(Out, 4, Chars2, 0), + charsio:get_n_chars(Out, 4, Chars3, 0), + Chars1 = "ABCD", + Chars2 = "EFGH", + Chars3 = "IJKL" + ), + close(Out) + ) +)). + +% Test 9: Reading more than available with timeout=0 +test("read more than available with timeout=0", ( + atom_chars('/bin/echo', Echo), + atom_chars('Short', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, N, _Chars, 0), + N >= 5, + N =< 7 + ), + close(Out) + ) +)). + +% Test 10: Variable N unifies with actual character count +test("variable N unifies with actual count", ( + atom_chars('/usr/bin/python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; [print(c,end="",flush=True) or time.sleep(0.5) for c in "ABCD"]', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, N, Chars, 1300), + lists:length(Chars, ActualLength), + N = ActualLength + ), + close(Out) + ) +)). + +% Test 11: UTF-8 multi-byte character boundaries with timeout +% Ensures that partial UTF-8 sequences are preserved across timeouts +test("utf8_multibyte_boundary_with_timeout", ( + atom_chars('python3', Py), + atom_chars('-c', C), + % Send πŸ’œ (F0 9F 92 9C) one byte at a time with delays + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.1); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9c\"); sys.stdout.buffer.flush(); time.sleep(0.1); sys.stdout.buffer.write(b\"AB\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + % First read: timeout after first byte (incomplete UTF-8) + charsio:get_n_chars(Out, N1, _Chars1, 50), + % Second read: should complete the emoji and get more + charsio:get_n_chars(Out, N2, _Chars2, 500), + % Verify lossless property: total should be 3 chars (πŸ’œ + A + B) + TotalChars is N1 + N2, + TotalChars = 3 + ), + close(Out) + ) +)). + +% Test 12: nonblock with integer N (read up to N chars immediately) +test("nonblock with fixed N limit", ( + atom_chars('/bin/echo', Echo), + atom_chars('Hello World Test', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out))]), + ( + % Read up to 5 chars with nonblock + charsio:get_n_chars(Out, 5, Chars, nonblock), + Chars == "Hello" + ), + close(Out) + ) +)). + +% Test 13: nonblock with variable N (read all immediately available) +test("nonblock with variable N reads all available", ( + atom_chars('/bin/echo', Echo), + atom_chars('TestData', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out))]), + ( + % Read all available data with nonblock + charsio:get_n_chars(Out, N, Chars, nonblock), + % Should get all data including newline + N >= 8, + N =< 9, + lists:append("TestData", _, Chars) + ), + close(Out) + ) +)). + +% Test 14: nonblock can return empty list when no data ready +test("nonblock returns empty when no data ready", ( + atom_chars('python3', Py), + atom_chars('-c', C), + % Script that waits before outputting + atom_chars('import sys,time; time.sleep(0.2); print(\"delayed\")', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + % Immediate nonblock read should get empty list (data not ready yet) + charsio:get_n_chars(Out, N1, _Chars1, nonblock), + % Either empty or very little data + N1 =< 1, + % Wait and read again + charsio:get_n_chars(Out, N2, _Chars2, 1000), + % Now should have data + N2 > 5 + ), + close(Out) + ) +)). + +% Test 15: nonblock vs timeout behavior comparison +test("nonblock vs timeout returns immediately", ( + atom_chars('/bin/echo', Echo), + atom_chars('Data', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out))]), + ( + % nonblock reads whatever is available right now + charsio:get_n_chars(Out, 2, Chars1, nonblock), + Chars1 == "Da", + % Can immediately read more + charsio:get_n_chars(Out, 2, Chars2, nonblock), + Chars2 == "ta" + ), + close(Out) + ) +)). + +% Test 16: multiple sequential nonblock reads drain buffer +test("sequential nonblock reads drain buffer", ( + atom_chars('/bin/echo', Echo), + atom_chars('ABCDEFGHIJ', Content), + iso_ext:setup_call_cleanup( + process:process_create(Echo, [Content], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, 3, Chars1, nonblock), + Chars1 == "ABC", + charsio:get_n_chars(Out, 3, Chars2, nonblock), + Chars2 == "DEF", + charsio:get_n_chars(Out, 3, Chars3, nonblock), + Chars3 == "GHI" + ), + close(Out) + ) +)). + +% Test 17: nonblock with slow stream returns buffer snapshot +test("nonblock with slow data returns partial snapshot", ( + atom_chars('python3', Py), + atom_chars('-c', C), + % Output "ABC" immediately, then wait, then output more + atom_chars('import sys,time; print(\"ABC\",end=\"\",flush=True); time.sleep(0.5); print(\"DEF\",end=\"\",flush=True)', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + % First nonblock read gets immediate data + charsio:get_n_chars(Out, N1, Chars1, nonblock), + N1 >= 3, + lists:append("ABC", _, Chars1), + % Immediate second read gets little/nothing + charsio:get_n_chars(Out, N2, _Chars2, nonblock), + N2 =< 1, + % After wait, more data available + charsio:get_n_chars(Out, N3, Chars3, 1000), + N3 >= 3, + lists:append("DEF", _, Chars3) + ), + close(Out) + ) +)). + +% Test 18: variable N with timeout vs nonblock difference +test("variable N: timeout waits, nonblock returns immediately", ( + atom_chars('python3', Py), + atom_chars('-c', C), + % Output data slowly over 500ms + atom_chars('import sys,time; print(\"A\",end=\"\",flush=True); time.sleep(0.3); print(\"B\",end=\"\",flush=True)', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + % Nonblock gets whatever is ready now (just "A") + charsio:get_n_chars(Out, N1, Chars1, nonblock), + N1 >= 1, + N1 =< 2, + lists:append("A", _, Chars1), + % Timeout waits for more data + charsio:get_n_chars(Out, N2, Chars2, 500), + N2 >= 1, + lists:append("B", _, Chars2) + ), + close(Out) + ) +)). diff --git a/src/tests/incomplete_utf8.pl b/src/tests/incomplete_utf8.pl new file mode 100644 index 000000000..a5e710c26 --- /dev/null +++ b/src/tests/incomplete_utf8.pl @@ -0,0 +1,144 @@ +:- module(incomplete_utf8_tests, []). +:- use_module(test_framework). +:- use_module(library(charsio)). +:- use_module(library(iso_ext)). +:- use_module(library(process)). +:- use_module(library(format)). + +test("get_char/2 after get_n_chars/4 timeout mid-char", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9cAB\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:get_char(Out, C1), + C1 == 'πŸ’œ' + ), + close(Out) + ) +)). + +test("peek_char/2 after get_n_chars/4 timeout mid-char", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9cAB\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:peek_char(Out, C1), + C1 == 'πŸ’œ', + charsio:get_char(Out, C2), + C2 == 'πŸ’œ' + ), + close(Out) + ) +)). + +test("get_code/2 after get_n_chars/4 timeout mid-char", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9cAB\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:get_code(Out, Code), + Code =:= 128156 + ), + close(Out) + ) +)). + +test("peek_code/2 after get_n_chars/4 timeout mid-char", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9cAB\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:peek_code(Out, Code1), + Code1 =:= 128156, + charsio:get_code(Out, Code2), + Code2 =:= 128156 + ), + close(Out) + ) +)). + +test("get_n_chars/3 (no timeout) after get_n_chars/4 timeout mid-char", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9cAB\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:get_n_chars(Out, 2, Chars2), + Chars2 == "πŸ’œA" + ), + close(Out) + ) +)). + +test("read_term/3 after get_n_chars/4 timeout mid-char", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9c foo(bar).\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:get_char(Out, Emoji), + Emoji == 'πŸ’œ', + read_term(Out, Term, []), + Term == foo(bar) + ), + close(Out) + ) +)). + +test("get_line_to_chars/3 after get_n_chars/4 timeout mid-char", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9ctest\\n\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:get_line_to_chars(Out, Line, []), + Line == "πŸ’œtest\n" + ), + close(Out) + ) +)). + +test("multiple sequential reads after timeout", ( + atom_chars('python3', Py), + atom_chars('-c', C), + atom_chars('import sys,time; sys.stdout.buffer.write(b\"\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x92\\x9c\\xf0\"); sys.stdout.buffer.flush(); time.sleep(0.5); sys.stdout.buffer.write(b\"\\x9f\\x98\\x8a\"); sys.stdout.buffer.flush()', Cmd), + iso_ext:setup_call_cleanup( + process:process_create(Py, [C, Cmd], [stdout(pipe(Out))]), + ( + charsio:get_n_chars(Out, _, Chars1, 100), + Chars1 == [], + charsio:get_char(Out, C1), + C1 == 'πŸ’œ', + charsio:get_n_chars(Out, _, Chars2, 100), + Chars2 == [], + charsio:get_char(Out, C2), + C2 == '😊' + ), + close(Out) + ) +)). diff --git a/tests/scryer/cli/src_tests/get_n_chars.toml b/tests/scryer/cli/src_tests/get_n_chars.toml new file mode 100644 index 000000000..adb9336dc --- /dev/null +++ b/tests/scryer/cli/src_tests/get_n_chars.toml @@ -0,0 +1 @@ +args = ["-f", "--no-add-history", "src/tests/get_n_chars.pl", "-f", "-g", "use_module(library(get_n_chars_tests)), get_n_chars_tests:main_quiet(get_n_chars_tests)"] diff --git a/tests/scryer/cli/src_tests/incomplete_utf8.toml b/tests/scryer/cli/src_tests/incomplete_utf8.toml new file mode 100644 index 000000000..0cf7e5474 --- /dev/null +++ b/tests/scryer/cli/src_tests/incomplete_utf8.toml @@ -0,0 +1 @@ +args = ["-f", "--no-add-history", "src/tests/incomplete_utf8.pl", "-f", "-g", "use_module(library(incomplete_utf8_tests)), incomplete_utf8_tests:main_quiet(incomplete_utf8_tests)"] diff --git a/tests/scryer/src_tests.rs b/tests/scryer/src_tests.rs index f0edd5d86..17aa7bd08 100644 --- a/tests/scryer/src_tests.rs +++ b/tests/scryer/src_tests.rs @@ -76,3 +76,17 @@ fn clpz_load() { fn iso_conformity_tests() { load_module_test("tests-pl/iso-conformity-tests.pl", "All tests passed"); } + +#[serial] +#[test] +#[cfg_attr(miri, ignore = "it takes too long to run")] +fn get_n_chars() { + load_module_test("src/tests/get_n_chars.pl", ""); +} + +#[serial] +#[test] +#[cfg_attr(miri, ignore = "it takes too long to run")] +fn incomplete_utf8() { + load_module_test("src/tests/incomplete_utf8.pl", ""); +}