Skip to content

Commit

Permalink
refactor: separate out StdoutReader to own file
Browse files Browse the repository at this point in the history
  • Loading branch information
lars-berger committed Jan 26, 2025
1 parent 5c53699 commit b5fcc75
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 103 deletions.
2 changes: 2 additions & 0 deletions crates/shell/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ mod encoding;
mod error;
mod options;
mod shell;
mod stdout_reader;

pub use encoding::*;
pub use error::*;
pub use options::*;
pub use shell::*;
pub(crate) use stdout_reader::*;

pub type Result<T> = std::result::Result<T, Error>;
110 changes: 7 additions & 103 deletions crates/shell/src/shell.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
use core::slice::memchr;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::process::CommandExt;
use std::{
ffi::OsStr,
io::{BufRead, BufReader, Write},
io::Write,
process::{Command, Stdio},
sync::{Arc, RwLock},
thread::spawn,
};

#[cfg(windows)]
const CREATE_NO_WINDOW: u32 = 0x08000000;

use os_pipe::{pipe, PipeWriter};
use serde::{Deserialize, Serialize};
use shared_child::SharedChild;
use tokio::sync::mpsc;

use crate::{encoding::Encoding, options::CommandOptions};
use crate::{encoding::Encoding, options::CommandOptions, StdoutReader};

pub type ProcessId = u32;

#[cfg(windows)]
const CREATE_NO_WINDOW: u32 = 0x08000000;

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(untagged)]
pub enum Buffer {
Expand Down Expand Up @@ -388,7 +387,7 @@ impl Shell {
{
spawn(move || {
let _lock = guard.read().unwrap();
let mut reader = StdOutReader::new(pipe, encoding);
let mut reader = StdoutReader::new(pipe, encoding);

while let Ok(Some(buffer)) = reader.read_next() {
if tx.blocking_send(wrapper(buffer)).is_err() {
Expand All @@ -399,101 +398,6 @@ impl Shell {
}
}

/// A pipe reader for stdout/stderr.
struct StdOutReader {
reader: BufReader<os_pipe::PipeReader>,
encoding: Encoding,
}

impl StdOutReader {
/// Creates a new `StdOutReader` instance.
fn new(pipe: os_pipe::PipeReader, encoding: Encoding) -> Self {
Self {
reader: BufReader::new(pipe),
encoding,
}
}

/// Reads the next chunk of data.
fn read_next(&mut self) -> std::io::Result<Option<Buffer>> {
if self.encoding == Encoding::Raw {
self.read_raw_chunk()
} else {
self.read_line()
}
}

/// Reads a chunk of raw bytes.
fn read_raw_chunk(&mut self) -> std::io::Result<Option<Buffer>> {
let chunk = self.reader.fill_buf()?.to_vec();

if chunk.is_empty() {
return Ok(None);
}

self.reader.consume(chunk.len());
Ok(Some(Buffer::Raw(chunk)))
}

/// Reads until a line ending (\n or \r) is found.
fn read_line(&mut self) -> std::io::Result<Option<Buffer>> {
let mut buffer = Vec::new();

loop {
let chunk = match self.reader.fill_buf() {
Ok(chunk) => chunk.to_vec(),
Err(err) => {
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
} else {
return Err(err);
}
}
};

if chunk.is_empty() {
break;
}

match Self::find_delimiter(&chunk) {
Some(pos) => {
// Delimiter found - consume up to and including the delimiter.
// The delimiter is included in the output buffer.
buffer.extend_from_slice(&chunk[..=pos]);
self.reader.consume(pos + 1);
break;
}
None => {
// No delimiter found - consume entire chunk.
buffer.extend_from_slice(&chunk);
self.reader.consume(chunk.len());
}
}
}

if buffer.is_empty() {
Ok(None)
} else {
Ok(Some(self.encoding.decode(buffer)))
}
}

/// Finds the position of a line delimiter (\n or \r) within a buffer.
fn find_delimiter(buffer: &[u8]) -> Option<usize> {
// Try to find a newline.
if let Some(pos) = memchr::memchr(b'\n', buffer) {
return Some(pos);
}

// Try to find a carriage return.
if let Some(pos) = memchr::memchr(b'\r', buffer) {
return Some(pos);
}

None
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -539,7 +443,7 @@ mod tests {

let mut child = Shell::spawn(
if cfg!(windows) { "cmd" } else { "sh" },
&[if cfg!(windows) { "/C" } else { "-c" }, "echo test"],
[if cfg!(windows) { "/C" } else { "-c" }, "echo test"],
&options,
)
.unwrap();
Expand Down
99 changes: 99 additions & 0 deletions crates/shell/src/stdout_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use core::slice::memchr;
use std::io::{BufRead, BufReader};

use crate::{Buffer, Encoding};

/// A pipe reader for stdout/stderr.
pub(crate) struct StdoutReader {
reader: BufReader<os_pipe::PipeReader>,
encoding: Encoding,
}

impl StdoutReader {
/// Creates a new `StdOutReader` instance.
pub fn new(pipe: os_pipe::PipeReader, encoding: Encoding) -> Self {
Self {
reader: BufReader::new(pipe),
encoding,
}
}

/// Reads the next chunk of data.
pub fn read_next(&mut self) -> std::io::Result<Option<Buffer>> {
if self.encoding == Encoding::Raw {
self.read_raw_chunk()
} else {
self.read_line()
}
}

/// Reads a chunk of raw bytes.
fn read_raw_chunk(&mut self) -> std::io::Result<Option<Buffer>> {
let chunk = self.reader.fill_buf()?.to_vec();

if chunk.is_empty() {
return Ok(None);
}

self.reader.consume(chunk.len());
Ok(Some(Buffer::Raw(chunk)))
}

/// Reads until a line ending (\n or \r) is found.
fn read_line(&mut self) -> std::io::Result<Option<Buffer>> {
let mut buffer = Vec::new();

loop {
let chunk = match self.reader.fill_buf() {
Ok(chunk) => chunk.to_vec(),
Err(err) => {
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
} else {
return Err(err);
}
}
};

if chunk.is_empty() {
break;
}

match Self::find_delimiter(&chunk) {
Some(pos) => {
// Delimiter found - consume up to and including the delimiter.
// The delimiter is included in the output buffer.
buffer.extend_from_slice(&chunk[..=pos]);
self.reader.consume(pos + 1);
break;
}
None => {
// No delimiter found - consume entire chunk.
buffer.extend_from_slice(&chunk);
self.reader.consume(chunk.len());
}
}
}

if buffer.is_empty() {
Ok(None)
} else {
Ok(Some(self.encoding.decode(buffer)))
}
}

/// Finds the position of a line delimiter (\n or \r) within a buffer.
fn find_delimiter(buffer: &[u8]) -> Option<usize> {
// Try to find a newline.
if let Some(pos) = memchr::memchr(b'\n', buffer) {
return Some(pos);
}

// Try to find a carriage return.
if let Some(pos) = memchr::memchr(b'\r', buffer) {
return Some(pos);
}

None
}
}

0 comments on commit b5fcc75

Please sign in to comment.