From 633772b209092134f10622b21a1fd15d166f5cfb Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 2 Jul 2025 12:06:44 +0200 Subject: [PATCH] Small stream improvements --- dissect/util/stream.py | 149 ++++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 62 deletions(-) diff --git a/dissect/util/stream.py b/dissect/util/stream.py index db163bc..7dc3b84 100644 --- a/dissect/util/stream.py +++ b/dissect/util/stream.py @@ -12,22 +12,27 @@ class AlignedStream(io.RawIOBase): - """Basic buffered stream that provides easy aligned reads. + """Basic buffered stream that provides aligned reads. Must be subclassed for various stream implementations. Subclasses can implement: - - _read(offset, length) - - _seek(pos, whence=io.SEEK_SET) + - :meth:`~AlignedStream._read` + - :meth:`~AlignedStream._seek` - The offset and length for _read are guaranteed to be aligned. The only time - that overriding _seek would make sense is if there's no known size of your stream, - but still want to provide SEEK_END functionality. + The offset and length for ``_read`` are guaranteed to be aligned for streams of a known size. + If your stream has an unknown size (i.e. ``size == None``), reads of length ``-1`` (i.e. read until EOF) will be + passed through to your implementation of ``_read``. + The only time that overriding ``_seek`` would make sense is if there's no known size of your stream, + but still want to provide ``SEEK_END`` functionality. - Most subclasses of AlignedStream take one or more file-like objects as source. + Most subclasses of ``AlignedStream`` take one or more file-like objects as source. Operations on these subclasses, like reading, will modify the source file-like object as a side effect. Args: - size: The size of the stream. This is used in read and seek operations. None if unknown. + size: The size of the stream. This is used in read and seek operations. ``None`` if unknown. align: The alignment size. Read operations are aligned on this boundary. Also determines buffer size. + + .. automethod:: _read + .. automethod:: _seek """ def __init__(self, size: int | None = None, align: int = STREAM_BUFFER_SIZE): @@ -39,26 +44,30 @@ def __init__(self, size: int | None = None, align: int = STREAM_BUFFER_SIZE): self._pos_align = 0 self._buf = None - self._seek_lock = Lock() + self._lock = Lock() - def _set_pos(self, pos: int) -> None: - """Update the position and aligned position within the stream.""" - new_pos_align = pos - (pos % self.align) + def readable(self) -> bool: + """Indicate that the stream is readable.""" + return True - if self._pos_align != new_pos_align: - self._pos_align = new_pos_align - self._buf = None + def seekable(self) -> bool: + """Indicate that the stream is seekable.""" + return True - self._pos = pos + def seek(self, pos: int, whence: int = io.SEEK_SET) -> int: + """Seek the stream to the specified position. - def _fill_buf(self) -> None: - """Fill the alignment buffer if we can.""" - if self._buf or (self.size is not None and (self.size <= self._pos or self.size <= self._pos_align)): - return + Returns: + The new stream position after seeking. + """ + with self._lock: + pos = self._seek(pos, whence) + self._set_pos(pos) - self._buf = self._read(self._pos_align, self.align) + return pos def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int: + """Calculate and return the new stream position after a seek.""" if whence == io.SEEK_SET: if pos < 0: raise ValueError(f"negative seek position {pos}") @@ -73,16 +82,32 @@ def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int: return pos - def seek(self, pos: int, whence: int = io.SEEK_SET) -> int: - """Seek the stream to the specified position.""" - with self._seek_lock: - pos = self._seek(pos, whence) - self._set_pos(pos) + def _set_pos(self, pos: int) -> None: + """Update the position and aligned position within the stream.""" + new_pos_align = pos - (pos % self.align) - return pos + if self._pos_align != new_pos_align: + self._pos_align = new_pos_align + self._buf = None + + self._pos = pos + + def tell(self) -> int: + """Return current stream position.""" + return self._pos + + def _fill_buf(self) -> None: + """Fill the alignment buffer if we can.""" + if self._buf or (self.size is not None and (self.size <= self._pos or self.size <= self._pos_align)): + # Don't fill the buffer if: + # - We already have a buffer + # - The stream position is at the end (or beyond) the stream size + return + + self._buf = self._read(self._pos_align, self.align) def read(self, n: int = -1) -> bytes: - """Read and return up to n bytes, or read to the end of the stream if n is -1. + """Read and return up to ``n`` bytes, or read to the end of the stream if ``n`` is ``-1``. Returns an empty bytes object on EOF. """ @@ -93,7 +118,7 @@ def read(self, n: int = -1) -> bytes: size = self.size align = self.align - with self._seek_lock: + with self._lock: if size is None and n == -1: r = [] if self._buf: @@ -107,10 +132,12 @@ def read(self, n: int = -1) -> bytes: self._set_pos(self._pos + len(buf)) return buf + # If we know the stream size, adjust n if size is not None: remaining = size - self._pos n = remaining if n == -1 else min(n, remaining) + # Short path for when it turns out we don't need to read anything if n == 0 or (size is not None and size <= self._pos): return b"" @@ -136,7 +163,7 @@ def read(self, n: int = -1) -> bytes: self._set_pos(self._pos + read_len) - # Misaligned end + # Misaligned remaining bytes if n > 0: self._fill_buf() r.append(self._buf[:n]) @@ -163,22 +190,28 @@ def readall(self) -> bytes: return self.read() def readoffset(self, offset: int, length: int) -> bytes: - """Convenience method to read from a certain offset with 1 call.""" + """Convenience method to read from a given offset. + + Args: + offset: The offset in the stream to read from. + length: The number of bytes to read. + """ self.seek(offset) return self.read(length) - def tell(self) -> int: - """Return current stream position.""" - return self._pos + def peek(self, n: int) -> bytes: + """Convenience method to peek from the current offset without advancing the stream position. - def close(self) -> None: - pass - - def readable(self) -> bool: - return True + Args: + n: The number of bytes to peek. + """ + pos = self._pos + data = self.read(n) + self._set_pos(pos) + return data - def seekable(self) -> bool: - return True + def close(self) -> None: + """Close the stream. Does nothing by default.""" class RangeStream(AlignedStream): @@ -198,18 +231,25 @@ class RangeStream(AlignedStream): align: The alignment size. """ - def __init__(self, fh: BinaryIO, offset: int, size: int, align: int = STREAM_BUFFER_SIZE): + def __init__(self, fh: BinaryIO, offset: int, size: int | None, align: int = STREAM_BUFFER_SIZE): super().__init__(size, align) self._fh = fh self.offset = offset + def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int: + if self.size is None and whence == io.SEEK_END: + if (pos := self._fh.seek(pos, whence)) is None: + pos = self._fh.tell() + return max(0, pos - self.offset) + return super()._seek(pos, whence) + def _read(self, offset: int, length: int) -> bytes: - read_length = min(length, self.size - offset) + read_length = min(length, self.size - offset) if self.size else length self._fh.seek(self.offset + offset) return self._fh.read(read_length) -class RelativeStream(AlignedStream): +class RelativeStream(RangeStream): """Create a relative stream from another file-like object. ASCII representation:: @@ -222,27 +262,12 @@ class RelativeStream(AlignedStream): Args: fh: The source file-like object. offset: The offset the stream should start from on the source file-like object. - size: The size the stream should be. + size: Optional size the stream should be. align: The alignment size. """ def __init__(self, fh: BinaryIO, offset: int, size: int | None = None, align: int = STREAM_BUFFER_SIZE): - super().__init__(size, align) - self._fh = fh - self.offset = offset - - def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int: - if whence == io.SEEK_END: - pos = self._fh.seek(pos, whence) - if pos is None: - pos = self._fh.tell() - return max(0, pos - self.offset) - return super()._seek(pos, whence) - - def _read(self, offset: int, length: int) -> bytes: - read_length = min(length, self.size - offset) if self.size else length - self._fh.seek(self.offset + offset) - return self._fh.read(read_length) + super().__init__(fh, offset, size, align) class BufferedStream(RelativeStream): @@ -459,7 +484,7 @@ def add(self, offset: int, data: bytes | BinaryIO, size: int | None = None) -> N Args: offset: The offset in bytes to add an overlay at. - data: The bytes or file-like object to overlay + data: The bytes or file-like object to overlay. size: Optional size specification of the overlay, if it can't be inferred. """ if not hasattr(data, "read"):