Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 87 additions & 62 deletions dissect/util/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@


class AlignedStream(io.RawIOBase):
"""Basic buffered stream that provides easy aligned reads.
"""Basic buffered stream that provides aligned reads.

Must be subclassed for various stream implementations. Subclasses can implement:
- _read(offset, length)
- _seek(pos, whence=io.SEEK_SET)
- :meth:`~AlignedStream._read`
- :meth:`~AlignedStream._seek`

The offset and length for _read are guaranteed to be aligned. The only time
that overriding _seek would make sense is if there's no known size of your stream,
but still want to provide SEEK_END functionality.
The offset and length for ``_read`` are guaranteed to be aligned for streams of a known size.
If your stream has an unknown size (i.e. ``size == None``), reads of length ``-1`` (i.e. read until EOF) will be
passed through to your implementation of ``_read``.
The only time that overriding ``_seek`` would make sense is if there's no known size of your stream,
but still want to provide ``SEEK_END`` functionality.

Most subclasses of AlignedStream take one or more file-like objects as source.
Most subclasses of ``AlignedStream`` take one or more file-like objects as source.
Operations on these subclasses, like reading, will modify the source file-like object as a side effect.

Args:
size: The size of the stream. This is used in read and seek operations. None if unknown.
size: The size of the stream. This is used in read and seek operations. ``None`` if unknown.
align: The alignment size. Read operations are aligned on this boundary. Also determines buffer size.

.. automethod:: _read
.. automethod:: _seek
"""

def __init__(self, size: int | None = None, align: int = STREAM_BUFFER_SIZE):
Expand All @@ -39,26 +44,30 @@ def __init__(self, size: int | None = None, align: int = STREAM_BUFFER_SIZE):
self._pos_align = 0

self._buf = None
self._seek_lock = Lock()
self._lock = Lock()

def _set_pos(self, pos: int) -> None:
"""Update the position and aligned position within the stream."""
new_pos_align = pos - (pos % self.align)
def readable(self) -> bool:
"""Indicate that the stream is readable."""
return True

if self._pos_align != new_pos_align:
self._pos_align = new_pos_align
self._buf = None
def seekable(self) -> bool:
"""Indicate that the stream is seekable."""
return True

self._pos = pos
def seek(self, pos: int, whence: int = io.SEEK_SET) -> int:
"""Seek the stream to the specified position.

def _fill_buf(self) -> None:
"""Fill the alignment buffer if we can."""
if self._buf or (self.size is not None and (self.size <= self._pos or self.size <= self._pos_align)):
return
Returns:
The new stream position after seeking.
"""
with self._lock:
pos = self._seek(pos, whence)
self._set_pos(pos)

self._buf = self._read(self._pos_align, self.align)
return pos

def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int:
"""Calculate and return the new stream position after a seek."""
if whence == io.SEEK_SET:
if pos < 0:
raise ValueError(f"negative seek position {pos}")
Expand All @@ -73,16 +82,32 @@ def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int:

return pos

def seek(self, pos: int, whence: int = io.SEEK_SET) -> int:
"""Seek the stream to the specified position."""
with self._seek_lock:
pos = self._seek(pos, whence)
self._set_pos(pos)
def _set_pos(self, pos: int) -> None:
"""Update the position and aligned position within the stream."""
new_pos_align = pos - (pos % self.align)

return pos
if self._pos_align != new_pos_align:
self._pos_align = new_pos_align
self._buf = None

self._pos = pos

def tell(self) -> int:
"""Return current stream position."""
return self._pos

def _fill_buf(self) -> None:
"""Fill the alignment buffer if we can."""
if self._buf or (self.size is not None and (self.size <= self._pos or self.size <= self._pos_align)):
# Don't fill the buffer if:
# - We already have a buffer
# - The stream position is at the end (or beyond) the stream size
return

self._buf = self._read(self._pos_align, self.align)

def read(self, n: int = -1) -> bytes:
"""Read and return up to n bytes, or read to the end of the stream if n is -1.
"""Read and return up to ``n`` bytes, or read to the end of the stream if ``n`` is ``-1``.

Returns an empty bytes object on EOF.
"""
Expand All @@ -93,7 +118,7 @@ def read(self, n: int = -1) -> bytes:
size = self.size
align = self.align

with self._seek_lock:
with self._lock:
if size is None and n == -1:
r = []
if self._buf:
Expand All @@ -107,10 +132,12 @@ def read(self, n: int = -1) -> bytes:
self._set_pos(self._pos + len(buf))
return buf

# If we know the stream size, adjust n
if size is not None:
remaining = size - self._pos
n = remaining if n == -1 else min(n, remaining)

# Short path for when it turns out we don't need to read anything
if n == 0 or (size is not None and size <= self._pos):
return b""

Expand All @@ -136,7 +163,7 @@ def read(self, n: int = -1) -> bytes:

self._set_pos(self._pos + read_len)

# Misaligned end
# Misaligned remaining bytes
if n > 0:
self._fill_buf()
r.append(self._buf[:n])
Expand All @@ -163,22 +190,28 @@ def readall(self) -> bytes:
return self.read()

def readoffset(self, offset: int, length: int) -> bytes:
"""Convenience method to read from a certain offset with 1 call."""
"""Convenience method to read from a given offset.

Args:
offset: The offset in the stream to read from.
length: The number of bytes to read.
"""
self.seek(offset)
return self.read(length)

def tell(self) -> int:
"""Return current stream position."""
return self._pos
def peek(self, n: int) -> bytes:
"""Convenience method to peek from the current offset without advancing the stream position.

def close(self) -> None:
pass

def readable(self) -> bool:
return True
Args:
n: The number of bytes to peek.
"""
pos = self._pos
data = self.read(n)
self._set_pos(pos)
return data

def seekable(self) -> bool:
return True
def close(self) -> None:
"""Close the stream. Does nothing by default."""


class RangeStream(AlignedStream):
Expand All @@ -198,18 +231,25 @@ class RangeStream(AlignedStream):
align: The alignment size.
"""

def __init__(self, fh: BinaryIO, offset: int, size: int, align: int = STREAM_BUFFER_SIZE):
def __init__(self, fh: BinaryIO, offset: int, size: int | None, align: int = STREAM_BUFFER_SIZE):
super().__init__(size, align)
self._fh = fh
self.offset = offset

def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int:
if self.size is None and whence == io.SEEK_END:
if (pos := self._fh.seek(pos, whence)) is None:
pos = self._fh.tell()
return max(0, pos - self.offset)
return super()._seek(pos, whence)

def _read(self, offset: int, length: int) -> bytes:
read_length = min(length, self.size - offset)
read_length = min(length, self.size - offset) if self.size else length
self._fh.seek(self.offset + offset)
return self._fh.read(read_length)


class RelativeStream(AlignedStream):
class RelativeStream(RangeStream):
"""Create a relative stream from another file-like object.

ASCII representation::
Expand All @@ -222,27 +262,12 @@ class RelativeStream(AlignedStream):
Args:
fh: The source file-like object.
offset: The offset the stream should start from on the source file-like object.
size: The size the stream should be.
size: Optional size the stream should be.
align: The alignment size.
"""

def __init__(self, fh: BinaryIO, offset: int, size: int | None = None, align: int = STREAM_BUFFER_SIZE):
super().__init__(size, align)
self._fh = fh
self.offset = offset

def _seek(self, pos: int, whence: int = io.SEEK_SET) -> int:
if whence == io.SEEK_END:
pos = self._fh.seek(pos, whence)
if pos is None:
pos = self._fh.tell()
return max(0, pos - self.offset)
return super()._seek(pos, whence)

def _read(self, offset: int, length: int) -> bytes:
read_length = min(length, self.size - offset) if self.size else length
self._fh.seek(self.offset + offset)
return self._fh.read(read_length)
super().__init__(fh, offset, size, align)


class BufferedStream(RelativeStream):
Expand Down Expand Up @@ -459,7 +484,7 @@ def add(self, offset: int, data: bytes | BinaryIO, size: int | None = None) -> N

Args:
offset: The offset in bytes to add an overlay at.
data: The bytes or file-like object to overlay
data: The bytes or file-like object to overlay.
size: Optional size specification of the overlay, if it can't be inferred.
"""
if not hasattr(data, "read"):
Expand Down
Loading