Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dissect/util/_build.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Reference: https://setuptools.pypa.io/en/latest/build_meta.html#dynamic-build-dependencies-and-other-build-meta-tweaks
# type: ignore
from __future__ import annotations

import os
Expand Down
4 changes: 2 additions & 2 deletions dissect/util/compression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
try:
from dissect.util import _native

lz4 = lz4_native = _native.compression.lz4
lzo = lzo_native = _native.compression.lzo
lz4 = lz4_native = _native.compression.lz4 # type: ignore
lzo = lzo_native = _native.compression.lzo # type: ignore
except (ImportError, AttributeError):
lz4_native = lzo_native = None

Expand Down
6 changes: 3 additions & 3 deletions dissect/util/compression/lz4.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def _get_length(src: BinaryIO, length: int) -> int:


def decompress(
src: bytes | BinaryIO,
src: bytes | bytearray | memoryview | BinaryIO,
uncompressed_size: int = -1,
return_bytearray: bool = False,
) -> bytes | tuple[bytes, int]:
) -> bytes | bytearray | tuple[bytes | bytearray, int]:
"""LZ4 decompress from a file-like object or bytes up to a certain length. Assumes no header.

Args:
Expand All @@ -39,7 +39,7 @@ def decompress(
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

dst = bytearray()
Expand Down
8 changes: 4 additions & 4 deletions dissect/util/compression/lzbitmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
_H = struct.Struct("<H")


def decompress(src: bytes | BinaryIO) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO) -> bytes:
"""LZBITMAP decompress from a file-like object or bytes.

Decompresses until EOF or EOS of the input data.
Expand All @@ -22,7 +22,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

if src.read(4) != b"ZBM\x09":
Expand Down Expand Up @@ -54,7 +54,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
buf = memoryview(src.read(compressed_size))

# Build the bitmap/token map
token_map = []
token_map: list[tuple[int | None, int]] = []
bits = int.from_bytes(buf[-17:], "little")
for i in range(0xF):
if i < 3:
Expand Down Expand Up @@ -97,7 +97,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:

for _ in range(repeat):
bitmap, token = token_map[idx]
if idx < 3:
if bitmap is None: # idx < 3, but this makes the type checker happy
# Index 0, 1, 2 are special and indicate we need to read a bitmap from the bitmap region
bitmap = buf[bitmap_offset]
bitmap_offset += 1
Expand Down
4 changes: 2 additions & 2 deletions dissect/util/compression/lzfse.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def _decode_lmd(
return bytes(dst)


def decompress(src: bytes | BinaryIO) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO) -> bytes:
"""LZFSE decompress from a file-like object or bytes.

Decompresses until EOF or EOS of the input data.
Expand All @@ -409,7 +409,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

dst = bytearray()
Expand Down
4 changes: 2 additions & 2 deletions dissect/util/compression/lznt1.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def _get_displacement(offset: int) -> int:
TAG_MASKS = [(1 << i) for i in range(8)]


def decompress(src: bytes | BinaryIO) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO) -> bytes:
"""LZNT1 decompress from a file-like object or bytes.

Args:
Expand All @@ -34,7 +34,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

offset = src.tell()
Expand Down
4 changes: 2 additions & 2 deletions dissect/util/compression/lzo.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def _read_length(src: BinaryIO, val: int, mask: int) -> int:
return length + mask + val


def decompress(src: bytes | BinaryIO, header: bool = True, buflen: int = -1) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO, header: bool = True, buflen: int = -1) -> bytes:
"""LZO decompress from a file-like object or bytes. Assumes no header.

Arguments are largely compatible with python-lzo API.
Expand All @@ -36,7 +36,7 @@ def decompress(src: bytes | BinaryIO, header: bool = True, buflen: int = -1) ->
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

dst = bytearray()
Expand Down
6 changes: 3 additions & 3 deletions dissect/util/compression/lzvn.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
_H = struct.Struct("<H")


def decompress(src: bytes | BinaryIO) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO) -> bytes:
"""LZVN decompress from a file-like object or bytes.

Decompresses until EOF or EOS of the input data.
Expand All @@ -67,7 +67,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

offset = src.tell()
Expand Down Expand Up @@ -207,7 +207,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
if src_size < opc_len:
break

src_size -= opc_len + L
src_size -= opc_len
break

elif opc in OP_UDEF:
Expand Down
4 changes: 2 additions & 2 deletions dissect/util/compression/lzxpress.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import BinaryIO


def decompress(src: bytes | BinaryIO) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO) -> bytes:
"""LZXPRESS decompress from a file-like object or bytes.

Args:
Expand All @@ -15,7 +15,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

offset = src.tell()
Expand Down
46 changes: 25 additions & 21 deletions dissect/util/compression/lzxpress_huffman.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def _read_16_bit(fh: BinaryIO) -> int:
class Node:
__slots__ = ("children", "is_leaf", "symbol")

def __init__(self, symbol: Symbol | None = None, is_leaf: bool = False):
def __init__(self, symbol: int = 0, is_leaf: bool = False):
self.symbol = symbol
self.is_leaf = is_leaf
self.children = [None, None]
self.children: dict[int, Node] = {}


def _add_leaf(nodes: list[Node], idx: int, mask: int, bits: int) -> int:
Expand All @@ -32,7 +32,7 @@ def _add_leaf(nodes: list[Node], idx: int, mask: int, bits: int) -> int:
while bits > 1:
bits -= 1
childidx = (mask >> bits) & 1
if node.children[childidx] is None:
if childidx not in node.children:
node.children[childidx] = nodes[i]
nodes[i].is_leaf = False
i += 1
Expand Down Expand Up @@ -84,24 +84,28 @@ def _build_tree(buf: bytes) -> Node:


class BitString:
def __init__(self):
self.source = None
def __init__(self, fh: BinaryIO):
self.fh = fh
self.mask = 0
self.bits = 0

@property
def index(self) -> int:
return self.source.tell()
return self.fh.tell()

def init(self, fh: BinaryIO) -> None:
self.mask = (_read_16_bit(fh) << 16) + _read_16_bit(fh)
def reset(self) -> None:
self.mask = (_read_16_bit(self.fh) << 16) + _read_16_bit(self.fh)
self.bits = 32
self.source = fh

def read(self, n: int) -> bytes:
return self.source.read(n)
return self.fh.read(n)

def lookup(self, n: int) -> int:
def take(self, n: int) -> int:
value = self.peek(n)
self.skip(n)
return value

def peek(self, n: int) -> int:
if n == 0:
return 0

Expand All @@ -111,19 +115,19 @@ def skip(self, n: int) -> None:
self.mask = (self.mask << n) & 0xFFFFFFFF
self.bits -= n
if self.bits < 16:
self.mask += _read_16_bit(self.source) << (16 - self.bits)
self.mask += _read_16_bit(self.fh) << (16 - self.bits)
self.bits += 16

def decode(self, root: Node) -> Symbol:
def decode(self, root: Node) -> int:
node = root
while not node.is_leaf:
bit = self.lookup(1)
self.skip(1)
bit = self.take(1)
node = node.children[bit]

return node.symbol


def decompress(src: bytes | BinaryIO) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO) -> bytes:
"""LZXPRESS decompress from a file-like object or bytes.

Decompresses until EOF of the input data.
Expand All @@ -134,7 +138,7 @@ def decompress(src: bytes | BinaryIO) -> bytes:
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

dst = bytearray()
Expand All @@ -144,11 +148,11 @@ def decompress(src: bytes | BinaryIO) -> bytes:
size = src.tell() - start_offset
src.seek(start_offset, io.SEEK_SET)

bitstring = BitString()
bitstring = BitString(src)

while src.tell() - start_offset < size:
root = _build_tree(src.read(256))
bitstring.init(src)
bitstring.reset()

chunk_size = 0
while chunk_size < 65536 and src.tell() - start_offset < size:
Expand All @@ -161,13 +165,13 @@ def decompress(src: bytes | BinaryIO) -> bytes:
length = symbol & 0x0F
symbol >>= 4

offset = (1 << symbol) + bitstring.lookup(symbol)
offset = (1 << symbol) + bitstring.peek(symbol)

if length == 15:
length = ord(bitstring.read(1)) + 15

if length == 270:
length = _read_16_bit(bitstring.source)
length = _read_16_bit(bitstring.fh)

bitstring.skip(symbol)

Expand Down
14 changes: 7 additions & 7 deletions dissect/util/compression/sevenbit.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

from io import BytesIO
import io
from typing import BinaryIO


def compress(src: bytes | BinaryIO) -> bytes:
def compress(src: bytes | bytearray | memoryview | BinaryIO) -> bytes:
"""Sevenbit compress from a file-like object or bytes.

Args:
Expand All @@ -13,8 +13,8 @@ def compress(src: bytes | BinaryIO) -> bytes:
Returns:
The compressed data.
"""
if not hasattr(src, "read"):
src = BytesIO(src)
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

dst = bytearray()

Expand All @@ -39,7 +39,7 @@ def compress(src: bytes | BinaryIO) -> bytes:
return bytes(dst)


def decompress(src: bytes | BinaryIO, wide: bool = False) -> bytes:
def decompress(src: bytes | bytearray | memoryview | BinaryIO, wide: bool = False) -> bytes:
"""Sevenbit decompress from a file-like object or bytes.

Args:
Expand All @@ -48,8 +48,8 @@ def decompress(src: bytes | BinaryIO, wide: bool = False) -> bytes:
Returns:
The decompressed data.
"""
if not hasattr(src, "read"):
src = BytesIO(src)
if isinstance(src, bytes | bytearray | memoryview):
src = io.BytesIO(src)

dst = bytearray()

Expand Down
4 changes: 2 additions & 2 deletions dissect/util/compression/xz.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
CRC_SIZE = 4


def repair_checksum(fh: BinaryIO) -> BinaryIO:
def repair_checksum(fh: BinaryIO) -> OverlayStream:
"""Repair CRC32 checksums for all headers in an XZ stream.

FortiOS XZ files have (on purpose) corrupt streams which they read using a modified ``xz`` binary.
Expand Down Expand Up @@ -55,7 +55,7 @@ def repair_checksum(fh: BinaryIO) -> BinaryIO:
# Parse the index
isize, num_records = _mbi(index[1:])
index = index[1 + isize : -4]
records = []
records: list[tuple[int, int]] = []
for _ in range(num_records):
if not index:
raise ValueError("Missing index size")
Expand Down
Loading
Loading