Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions acquire/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dissect.util.stream import RunlistStream

from acquire.collector import Collector, get_full_formatted_report, get_report_summary
from acquire.diagnostics.common import diagnostics_info_json
from acquire.dynamic.windows.named_objects import NamedObjectType
from acquire.esxi import esxi_memory_context_manager
from acquire.gui import GUI
Expand Down Expand Up @@ -2399,6 +2400,30 @@ def main() -> None:
exit_failure(args.config.get("arguments"))
exit_success(args.config.get("arguments"))

# Check for diagnostics argument here. Process and execute.
if args.diagnostics:
print("\nWARNING: Gathering diagnostics may destroy forensic evidence.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print("\nWARNING: Gathering diagnostics may destroy forensic evidence.")
print("\nWARNING: Gathering diagnostics may destroy forensic evidence!")

Gotta shout it.

confirm = input("Do you want to continue? [y/N]: ").strip().lower()
if confirm not in ("y", "yes"):
print("Aborted diagnostics.")
exit_success(args.config.get("arguments"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
exit_success(args.config.get("arguments"))
exit_success(args.config.get("arguments"))

try:
# Determine output path
if isinstance(args.diagnostics, str):
diag_path = Path(args.diagnostics)
elif log_file:
diag_path = Path(log_file).with_name(Path(log_file).stem + "_diag.json")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't log_file already a Path?

else:
diag_path = Path("diag.json")

diagnostics_info_json(diag_path)
log.info("Diagnostics written to file %s", diag_path.resolve())
except Exception:
acquire_gui.message("Failed to upload files")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

log.exception("")
exit_failure(args.config.get("arguments"))
exit_success(args.config.get("arguments"))

target_paths = []
for target_path in args.targets:
target_path = args_to_uri([target_path], args.loader, rest)[0] if args.loader else target_path
Expand Down
Empty file added acquire/diagnostics/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions acquire/diagnostics/common.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily a hard requirement, but in most other places in Dissect we'd call this file base.py. I'll leave it up to you to decide what to call it.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import json
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import json
from __future__ import annotations
import json

import platform
from pathlib import Path


def diagnostics_info_json(output: Path) -> None:
# Dynamic imports are required to avoid import errors on unsupported platforms
system = platform.system().lower()
if system == "windows":
from .windows import diagnostics_info # noqa: PLC0415
elif system == "linux":
from .linux import diagnostics_info # noqa: PLC0415
Comment on lines +10 to +12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do an absolute import.

else:
raise NotImplementedError(f"Diagnostics not implemented for OS: {system}")
data = diagnostics_info()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data = diagnostics_info()
data = diagnostics_info()

with output.open("w") as f:
json.dump(data, f, default=str, indent=2)
232 changes: 232 additions & 0 deletions acquire/diagnostics/linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import json
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import json
from __future__ import annotations
import json

import os
import platform
import stat
import subprocess
from pathlib import Path


def check_sudo() -> bool:
return os.geteuid() == 0


def get_disk_info() -> dict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding suggestions for this all in GitHub is a bit slow, but can you add some newlines between sensible sections/parts?

info = {}
# RAID
try:
with Path("/proc/mdstat").open() as f:
info["raid"] = f.read()
except Exception as e:
info["raid"] = f"Error: {e}"
# LVM: Distinguish logical volumes and backing devices
logical_volumes = []
backing_devices = []
lvm_error = None
try:
# Get logical volumes
try:
lvs_result = subprocess.run(
["lvs", "--noheadings", "-o", "lv_path"], capture_output=True, text=True, check=False
)
if lvs_result.returncode == 0:
logical_volumes = [line.strip() for line in lvs_result.stdout.splitlines() if line.strip()]
else:
lvm_error = f"lvs failed: {lvs_result.stderr.strip()}"
except FileNotFoundError:
lvm_error = "lvs not found"
# Get backing devices
try:
pvs_result = subprocess.run(
["pvs", "--noheadings", "-o", "pv_name"], capture_output=True, text=True, check=False
)
if pvs_result.returncode == 0:
backing_devices = [line.strip() for line in pvs_result.stdout.splitlines() if line.strip()]
else:
lvm_error = (lvm_error or "") + f"; pvs failed: {pvs_result.stderr.strip()}"
except FileNotFoundError:
lvm_error = (lvm_error or "") + "; pvs not found"
except Exception as e:
lvm_error = f"Error: {e}"
info["lvm"] = {
"logical_volumes": logical_volumes,
"backing_devices": backing_devices,
"error": lvm_error,
}

luks_devices = []
# Check /dev/mapper for dm-crypt devices
try:
luks_devices.extend(
[
Path("/dev/mapper") / entry.name
for entry in Path("/dev/mapper").iterdir()
if entry.name.startswith(("dm_crypt", "crypt"))
]
)
except Exception as e:
luks_devices.append(f"Error: {e}")
# Parse /etc/crypttab for configured LUKS devices
try:
if Path("/etc/crypttab").exists():
with Path("/etc/crypttab").open() as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
parts = line.split()
if len(parts) > 1:
luks_devices.append(parts[1])
except Exception as e:
luks_devices.append(f"Error: {e}")
info["luks"] = luks_devices
return info


def walk_dev() -> list[str]:
dev_tree = []
for root, _, files in os.walk("/dev"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.walk follows symlinks by default I think, is that wanted? Maybe consider Path.rglob("*")?

for name in files:
dev_path = str(Path(root) / name)
info = {"path": dev_path}
try:
st = Path.stat(dev_path)
if stat.S_ISBLK(st.st_mode):
info["type"] = "block"
elif stat.S_ISCHR(st.st_mode):
info["type"] = "char"
else:
info["type"] = "other"
info["major"] = os.major(st.st_rdev)
info["minor"] = os.minor(st.st_rdev)
info["mode"] = oct(st.st_mode)
info["owner"] = st.st_uid
info["group"] = st.st_gid
if info["type"] == "block":
try:
blkid = subprocess.run(["blkid", dev_path], capture_output=True, text=True)
blkid_str = blkid.stdout.strip()
info["blkid"] = parse_blkid_output(blkid_str) if blkid_str else None
except Exception:
info["blkid"] = None
except Exception as e:
info["error"] = str(e)
dev_tree.append(info)
return dev_tree


def get_dmesg() -> list[str]:
try:
with os.popen("dmesg | tail -n 100") as f:
return f.read().splitlines()
except Exception as e:
return [f"Error: {e}"]


def get_hardware_info() -> dict:
info = {}
# CPU
try:
with Path("/proc/cpuinfo").open() as f:
cpuinfo_raw = f.read()
# Parse into list of dicts (one per processor)
cpu_blocks = cpuinfo_raw.strip().split("\n\n")
cpuinfo = []
for block in cpu_blocks:
cpu = parse_key_value_lines(block.splitlines())
if cpu:
cpuinfo.append(cpu)
info["cpuinfo"] = cpuinfo
except Exception as e:
info["cpuinfo"] = {"error": str(e)}
# Memory
try:
with Path("/proc/meminfo").open() as f:
meminfo_raw = f.read()
meminfo = parse_key_value_lines(meminfo_raw.splitlines())
info["meminfo"] = meminfo
except Exception as e:
info["meminfo"] = {"error": str(e)}
# DMI
dmi_path = Path("/sys/class/dmi/id/")
dmi_info = {}
if dmi_path.is_dir():
for fpath in dmi_path.iterdir():
if fpath.is_file() and os.access(fpath, os.R_OK):
with fpath.open() as f:
dmi_info[fpath.name] = f.read().strip()
info["dmi"] = dmi_info
return info


def get_os_info() -> dict:
info = {}
try:
with Path("/etc/os-release").open() as f:
info["os-release"] = f.read()
except Exception as e:
info["os-release"] = f"Error: {e}"
info["platform"] = platform.platform()
info["uname"] = platform.uname()
return info


def diagnostics_info() -> dict:
info = {}
info["running_as_root"] = check_sudo()
info["disk_info"] = get_disk_info()
devs = walk_dev()
info["devices"] = devs
info["dmesg"] = get_dmesg()
info["hardware_info"] = get_hardware_info()
info["os_info"] = get_os_info()
return info


def diagnostics_info_json(output: Path) -> None:
data = diagnostics_info()
with output.open("w") as f:
json.dump(data, f, default=str, indent=2)
Comment on lines +184 to +187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate/unused?



def parse_key_value_lines(lines: list[str]) -> dict[str, str]:
dev_tree = []
for root, _, files in os.walk("/dev"):
for name in files:
path_obj = Path(root) / name
dev_path = str(path_obj)
info = {"path": dev_path}
try:
st = path_obj.stat()
if stat.S_ISBLK(st.st_mode):
info["type"] = "block"
elif stat.S_ISCHR(st.st_mode):
info["type"] = "char"
else:
info["type"] = "other"
info["major"] = os.major(st.st_rdev)
info["minor"] = os.minor(st.st_rdev)
info["mode"] = oct(st.st_mode)
info["owner"] = st.st_uid
info["group"] = st.st_gid
if info["type"] == "block":
try:
blkid = subprocess.run(["blkid", dev_path], capture_output=True, text=True)
info["blkid"] = blkid.stdout.strip()
except Exception:
info["blkid"] = None
except Exception as e:
info["error"] = str(e)
dev_tree.append(info)
return dev_tree
Comment on lines +190 to +219
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the walk_dev code?



def parse_blkid_output(blkid_str: str) -> dict:
"""Parse blkid output string into a dictionary of key-value pairs."""
# Example: /dev/sda1: UUID="abcd-1234" TYPE="ext4" PARTUUID="efgh-5678"
parts = blkid_str.split(None, 1)
blkid_info = {}
if len(parts) == 2:
for item in parts[1].split():
if "=" in item:
k, v = item.split("=", 1)
blkid_info[k] = v.strip('"')
return blkid_info
Loading
Loading