Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 82 additions & 26 deletions dissect/target/filesystems/ad1.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from __future__ import annotations

import stat
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO

from dissect.evidence import ad1

from dissect.target.exceptions import (
FileNotFoundError,
FilesystemError,
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystem import DirEntry, Filesystem, FilesystemEntry
from dissect.target.helpers import fsutil
from dissect.target.helpers import fsutil, keychain

if TYPE_CHECKING:
from collections.abc import Iterator
Expand All @@ -21,22 +23,46 @@
class AD1Filesystem(Filesystem):
__type__ = "ad1"

def __init__(self, fh: BinaryIO, *args, **kwargs):
def __init__(self, fh: BinaryIO | list[BinaryIO] | Path | list[Path], *args, **kwargs):
super().__init__(fh, *args, **kwargs)
self.ad1 = ad1.AD1(fh)

if self.ad1.is_adcrypt():
keys = keychain.get_keys_for_provider(self.__type__) + keychain.get_keys_without_provider()

if not keys:
raise ValueError("Failed to unlock ADCRYPT: no key(s) provided")

for key in keys:
try:
if key.key_type == keychain.KeyType.PASSPHRASE:
self.ad1.unlock(passphrase=key.value)
elif key.key_type == keychain.KeyType.FILE and (path := Path(key.value)).is_file():
self.ad1.unlock(private_key=path)
except ValueError: # noqa: PERF203
pass

if self.ad1.is_locked():
raise ValueError("Failed to unlock ADCRYPT using provided key(s)")

@staticmethod
def _detect(fh: BinaryIO) -> bool:
return fh.read(16) == b"ADSEGMENTEDFILE\x00"

def get(self, path: str) -> AD1FilesystemEntry:
def get(self, path: str) -> FilesystemEntry:
return AD1FilesystemEntry(self, path, self._get_entry(path))

def _get_entry(self, path: str) -> ad1.LogicalImage | ad1.FileEntry:
def _get_entry(self, path: str, entry: ad1.FileEntry | None = None) -> ad1.FileEntry:
try:
return self.ad1.get(path)
except IOError:
raise FileNotFoundError(path)
return self.ad1.entry(path, entry)
except ad1.FileNotFoundError as e:
raise FileNotFoundError(path) from e
except ad1.NotADirectoryError as e:
raise NotADirectoryError(path) from e
except ad1.NotASymlinkError as e:
raise NotASymlinkError(path) from e
except ad1.Error as e:
raise FileNotFoundError(path) from e


class AD1DirEntry(DirEntry):
Expand All @@ -51,41 +77,71 @@ def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:


class AD1FilesystemEntry(FilesystemEntry):
def get(self, path: str) -> AD1FilesystemEntry:
path = fsutil.join(self.path, path, alt_separator=self.alt_separator)
return AD1FilesystemEntry(self.fs, path, self.fs._get_node(path, self.entry))
fs: AD1Filesystem
entry: ad1.FileEntry

def get(self, path: str) -> FilesystemEntry:
entry_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator)
entry = self.fs._get_entry(path, self.entry)
return AD1FilesystemEntry(self.fs, entry_path, entry)

def open(self) -> BinaryIO:
if self.is_dir():
raise IsADirectoryError(self.path)
return self.entry.open()

def scandir(self) -> Iterator[AD1FilesystemEntry]:
def scandir(self) -> Iterator[AD1DirEntry]:
if not self.is_dir():
raise NotADirectoryError(self.path)

for name, entry in self.entry.listdir().items():
yield AD1DirEntry(self.fs, self.path, name, entry)

def is_file(self, follow_symlinks: bool = True) -> bool:
return self.entry.is_file()
for entry in self.entry.iterdir():
yield AD1DirEntry(self.fs, self.path, entry.name, entry)

def is_dir(self, follow_symlinks: bool = True) -> bool:
return self.entry.is_dir()
try:
return self._resolve(follow_symlinks=follow_symlinks).entry.is_dir()
except FilesystemError:
return False

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
return self._resolve(follow_symlinks=follow_symlinks).entry.is_file()
except FilesystemError:
return False

def is_symlink(self) -> bool:
return False
return self.entry.is_symlink()

def readlink(self) -> str:
raise NotASymlinkError

def readlink_ext(self) -> FilesystemEntry:
raise NotASymlinkError
if not self.is_symlink():
raise NotASymlinkError(self.path)
return self.entry.readlink()

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
return self.lstat()
return self._resolve(follow_symlinks=follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
size = self.entry.size if self.entry.is_file() else 0
entry_addr = fsutil.generate_addr(self.path, alt_separator=self.fs.alt_separator)
return fsutil.stat_result([stat.S_IFREG, entry_addr, id(self.fs), 1, 0, 0, size, 0, 0, 0])
if self.is_symlink():
mode = stat.S_IFLNK
elif self.is_file():
mode = stat.S_IFREG
else:
mode = stat.S_IFDIR

st_info = fsutil.stat_result(
[
mode | 0o777,
self.entry.offset,
id(self.fs),
1, # nlink
0, # uid
0, # gid
self.entry.size,
self.entry.atime.timestamp(),
self.entry.mtime.timestamp(),
self.entry.ctime.timestamp(),
]
)

st_info.st_birthtime = self.entry.btime.timestamp()
return st_info
1 change: 1 addition & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def open(path: str | Path, *, fallbacks: list[type[Loader]] | None = None, **kwa


register("local", "LocalLoader")
register("ad1", "AD1Loader")
register("asdf", "AsdfLoader")
register("vmx", "VmxLoader")
register("vmwarevm", "VmwarevmLoader")
Expand Down
34 changes: 34 additions & 0 deletions dissect/target/loaders/ad1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from dissect.evidence.ad1.ad1 import find_files

from dissect.target.filesystems.ad1 import AD1Filesystem
from dissect.target.loader import Loader

if TYPE_CHECKING:
from pathlib import Path

from dissect.target.target import Target


class AD1Loader(Loader):
"""Access Data ``.ad`` loader."""

@staticmethod
def detect(path: Path) -> bool:
return path.suffix.lower() == ".ad1"

def map(self, target: Target) -> None:
"""Map the detected segment files as an :class:`AD1Filesystem` to the target.

Currently does not detect NTFS / case-insensitive filesystems or custom content
images with multiple sources.
"""
try:
fs = AD1Filesystem(find_files(self.path))
target.filesystems.add(fs)

except ValueError as e:
target.log.error("Unable to map AD1: %s", e) # noqa: TRY400
11 changes: 5 additions & 6 deletions dissect/target/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,12 @@ def _open_all(spec: str | Path, include_children: bool = False) -> Iterator[Targ

if not path.exists():
raise TargetError(f"Failed to find loader for {path}: path does not exist")
elif not path.is_dir():
raise TargetError(f"Failed to find loader for {path}: path is not a directory")

for entry in path.iterdir():
for target in _open_all(entry, include_children=include_children):
at_least_one_loaded = True
yield target
if path.is_dir():
for entry in path.iterdir():
for target in _open_all(entry, include_children=include_children):
at_least_one_loaded = True
yield target

if not at_least_one_loaded:
raise TargetError(f"Failed to find any loader for targets: {paths}")
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
"dissect.cstruct>=4,<5",
"dissect.database>=1,<2",
"dissect.eventlog>=3,<4",
"dissect.evidence>=3,<4",
"dissect.evidence>=3.13.dev2,<4", # TODO: update on release!
"dissect.hypervisor>=3.20,<4",
"dissect.ntfs>=3.16.dev,<4",
"dissect.regf>=3.13,<4",
Expand Down Expand Up @@ -86,7 +86,7 @@ dev = [
"dissect.database[dev]>=1.0.dev,<2.0.dev",
"dissect.etl[dev]>=3.0.dev,<4.0.dev",
"dissect.eventlog[dev]>=3.0.dev,<4.0.dev",
"dissect.evidence[dev]>=3.0.dev,<4.0.dev",
"dissect.evidence[dev]>=3.13.dev2,<4.0.dev",
"dissect.extfs[dev]>=3.0.dev,<4.0.dev",
"dissect.fat[dev]>=3.0.dev,<4.0.dev",
"dissect.ffs[dev]>=3.0.dev,<4.0.dev",
Expand Down
3 changes: 3 additions & 0 deletions tests/_data/filesystems/ad1/encrypted-small.ad1
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/ad1/encrypted-small.ad1.csv
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/ad1/encrypted-small.ad1.txt
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/ad1/encrypted-small.ad2
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/_data/filesystems/ad1/encrypted-small.ad3
Git LFS file not shown
32 changes: 32 additions & 0 deletions tests/filesystems/test_ad1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

import pytest
from dissect.evidence.ad1.ad1 import find_files

from dissect.target.filesystems.ad1 import AD1Filesystem
from dissect.target.helpers import keychain
from tests._utils import absolute_path


def test_ad1_encrypted() -> None:
"""Test if we can open an AD1 ADCRYPT encrypted image using the keychain."""

path = absolute_path("_data/filesystems/ad1/encrypted-small.ad1")
segments = find_files(path)

with pytest.raises(ValueError, match=r"Failed to unlock ADCRYPT: no key\(s\) provided"):
AD1Filesystem(segments)

keychain.register_wildcard_value("invalid")

with pytest.raises(ValueError, match=r"Failed to unlock ADCRYPT using provided key\(s\)"):
AD1Filesystem(segments)

keychain.register_wildcard_value("password")
fs = AD1Filesystem(segments)

assert not fs.ad1.is_locked()
assert list(fs.get("C:/Users/user/Desktop/Data").iterdir()) == [
"hello.txt",
"philipp-pilz-QZ2EQuPpQJs-unsplash.jpg",
]
38 changes: 38 additions & 0 deletions tests/loaders/test_ad1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from dissect.target.exceptions import TargetError
from dissect.target.helpers import keychain
from dissect.target.loaders.ad1 import AD1Loader
from dissect.target.target import Target
from tests._utils import absolute_path

if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path


@pytest.mark.parametrize(
("opener"),
[
pytest.param(Target.open, id="target-open"),
pytest.param(lambda x: next(Target.open_all([x])), id="target-open-all"),
],
)
def test_target_open(opener: Callable[[str | Path], Target]) -> None:
"""Test that we correctly use ``AD1Loader`` when opening a ``Target``."""

path = absolute_path("_data/filesystems/ad1/encrypted-small.ad1")

with pytest.raises(TargetError):
opener(path)

keychain.register_wildcard_value("password")
target = opener(path)

assert isinstance(target._loader, AD1Loader)
assert target.path == path
assert target.fs.path("C:/Users/user/Desktop/Data").is_dir()
13 changes: 13 additions & 0 deletions tests/test_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dissect.target.loaders.raw import RawLoader
from dissect.target.loaders.vbox import VBoxLoader
from dissect.target.target import DiskCollection, Event, Target, TargetLogAdapter, log
from tests._utils import absolute_path

if TYPE_CHECKING:
from collections.abc import Callable, Iterator
Expand Down Expand Up @@ -772,6 +773,18 @@ def test_exception_invalid_path() -> None:
):
next(Target.open_all("smb://invalid"))

# If no loader was found (eg. a loader was found but crashed),
# but the given path does exist, throw a sensible error message.
with ( # noqa: PT012
pytest.raises(
TargetError,
match=r"Failed to find any loader for targets:",
),
patch("dissect.target.loaders.tar.TarLoader.map") as map,
):
map.side_effect = Exception
next(Target.open_all(str(absolute_path("_data/loaders/tar/test-archive.tar"))))


def test_list_children() -> None:
"""Test that ``list_children`` returns child records."""
Expand Down
Loading