From 5e2237659c704388ec25133c544fcc428e735be3 Mon Sep 17 00:00:00 2001 From: twiggler <12800443+twiggler@users.noreply.github.com> Date: Tue, 4 Nov 2025 17:03:52 +0100 Subject: [PATCH] Add diagnostics --- acquire/acquire.py | 25 ++++ acquire/diagnostics/__init__.py | 0 acquire/diagnostics/common.py | 17 +++ acquire/diagnostics/linux.py | 232 ++++++++++++++++++++++++++++++++ acquire/diagnostics/windows.py | 198 +++++++++++++++++++++++++++ acquire/utils.py | 10 ++ 6 files changed, 482 insertions(+) create mode 100644 acquire/diagnostics/__init__.py create mode 100644 acquire/diagnostics/common.py create mode 100644 acquire/diagnostics/linux.py create mode 100644 acquire/diagnostics/windows.py diff --git a/acquire/acquire.py b/acquire/acquire.py index a1df1e57..d9f221d9 100644 --- a/acquire/acquire.py +++ b/acquire/acquire.py @@ -32,6 +32,7 @@ from dissect.util.stream import RunlistStream from acquire.collector import Collector, get_full_formatted_report, get_report_summary +from acquire.diagnostics.common import diagnostics_info_json from acquire.dynamic.windows.named_objects import NamedObjectType from acquire.esxi import esxi_memory_context_manager from acquire.gui import GUI @@ -2399,6 +2400,30 @@ def main() -> None: exit_failure(args.config.get("arguments")) exit_success(args.config.get("arguments")) + # Check for diagnostics argument here. Process and execute. + if args.diagnostics: + print("\nWARNING: Gathering diagnostics may destroy forensic evidence.") + confirm = input("Do you want to continue? [y/N]: ").strip().lower() + if confirm not in ("y", "yes"): + print("Aborted diagnostics.") + exit_success(args.config.get("arguments")) + try: + # Determine output path + if isinstance(args.diagnostics, str): + diag_path = Path(args.diagnostics) + elif log_file: + diag_path = Path(log_file).with_name(Path(log_file).stem + "_diag.json") + else: + diag_path = Path("diag.json") + + diagnostics_info_json(diag_path) + log.info("Diagnostics written to file %s", diag_path.resolve()) + except Exception: + acquire_gui.message("Failed to upload files") + log.exception("") + exit_failure(args.config.get("arguments")) + exit_success(args.config.get("arguments")) + target_paths = [] for target_path in args.targets: target_path = args_to_uri([target_path], args.loader, rest)[0] if args.loader else target_path diff --git a/acquire/diagnostics/__init__.py b/acquire/diagnostics/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/acquire/diagnostics/common.py b/acquire/diagnostics/common.py new file mode 100644 index 00000000..cde9048e --- /dev/null +++ b/acquire/diagnostics/common.py @@ -0,0 +1,17 @@ +import json +import platform +from pathlib import Path + + +def diagnostics_info_json(output: Path) -> None: + # Dynamic imports are required to avoid import errors on unsupported platforms + system = platform.system().lower() + if system == "windows": + from .windows import diagnostics_info # noqa: PLC0415 + elif system == "linux": + from .linux import diagnostics_info # noqa: PLC0415 + else: + raise NotImplementedError(f"Diagnostics not implemented for OS: {system}") + data = diagnostics_info() + with output.open("w") as f: + json.dump(data, f, default=str, indent=2) diff --git a/acquire/diagnostics/linux.py b/acquire/diagnostics/linux.py new file mode 100644 index 00000000..0fd96bd0 --- /dev/null +++ b/acquire/diagnostics/linux.py @@ -0,0 +1,232 @@ +import json +import os +import platform +import stat +import subprocess +from pathlib import Path + + +def check_sudo() -> bool: + return os.geteuid() == 0 + + +def get_disk_info() -> dict: + info = {} + # RAID + try: + with Path("/proc/mdstat").open() as f: + info["raid"] = f.read() + except Exception as e: + info["raid"] = f"Error: {e}" + # LVM: Distinguish logical volumes and backing devices + logical_volumes = [] + backing_devices = [] + lvm_error = None + try: + # Get logical volumes + try: + lvs_result = subprocess.run( + ["lvs", "--noheadings", "-o", "lv_path"], capture_output=True, text=True, check=False + ) + if lvs_result.returncode == 0: + logical_volumes = [line.strip() for line in lvs_result.stdout.splitlines() if line.strip()] + else: + lvm_error = f"lvs failed: {lvs_result.stderr.strip()}" + except FileNotFoundError: + lvm_error = "lvs not found" + # Get backing devices + try: + pvs_result = subprocess.run( + ["pvs", "--noheadings", "-o", "pv_name"], capture_output=True, text=True, check=False + ) + if pvs_result.returncode == 0: + backing_devices = [line.strip() for line in pvs_result.stdout.splitlines() if line.strip()] + else: + lvm_error = (lvm_error or "") + f"; pvs failed: {pvs_result.stderr.strip()}" + except FileNotFoundError: + lvm_error = (lvm_error or "") + "; pvs not found" + except Exception as e: + lvm_error = f"Error: {e}" + info["lvm"] = { + "logical_volumes": logical_volumes, + "backing_devices": backing_devices, + "error": lvm_error, + } + + luks_devices = [] + # Check /dev/mapper for dm-crypt devices + try: + luks_devices.extend( + [ + Path("/dev/mapper") / entry.name + for entry in Path("/dev/mapper").iterdir() + if entry.name.startswith(("dm_crypt", "crypt")) + ] + ) + except Exception as e: + luks_devices.append(f"Error: {e}") + # Parse /etc/crypttab for configured LUKS devices + try: + if Path("/etc/crypttab").exists(): + with Path("/etc/crypttab").open() as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + parts = line.split() + if len(parts) > 1: + luks_devices.append(parts[1]) + except Exception as e: + luks_devices.append(f"Error: {e}") + info["luks"] = luks_devices + return info + + +def walk_dev() -> list[str]: + dev_tree = [] + for root, _, files in os.walk("/dev"): + for name in files: + dev_path = str(Path(root) / name) + info = {"path": dev_path} + try: + st = Path.stat(dev_path) + if stat.S_ISBLK(st.st_mode): + info["type"] = "block" + elif stat.S_ISCHR(st.st_mode): + info["type"] = "char" + else: + info["type"] = "other" + info["major"] = os.major(st.st_rdev) + info["minor"] = os.minor(st.st_rdev) + info["mode"] = oct(st.st_mode) + info["owner"] = st.st_uid + info["group"] = st.st_gid + if info["type"] == "block": + try: + blkid = subprocess.run(["blkid", dev_path], capture_output=True, text=True) + blkid_str = blkid.stdout.strip() + info["blkid"] = parse_blkid_output(blkid_str) if blkid_str else None + except Exception: + info["blkid"] = None + except Exception as e: + info["error"] = str(e) + dev_tree.append(info) + return dev_tree + + +def get_dmesg() -> list[str]: + try: + with os.popen("dmesg | tail -n 100") as f: + return f.read().splitlines() + except Exception as e: + return [f"Error: {e}"] + + +def get_hardware_info() -> dict: + info = {} + # CPU + try: + with Path("/proc/cpuinfo").open() as f: + cpuinfo_raw = f.read() + # Parse into list of dicts (one per processor) + cpu_blocks = cpuinfo_raw.strip().split("\n\n") + cpuinfo = [] + for block in cpu_blocks: + cpu = parse_key_value_lines(block.splitlines()) + if cpu: + cpuinfo.append(cpu) + info["cpuinfo"] = cpuinfo + except Exception as e: + info["cpuinfo"] = {"error": str(e)} + # Memory + try: + with Path("/proc/meminfo").open() as f: + meminfo_raw = f.read() + meminfo = parse_key_value_lines(meminfo_raw.splitlines()) + info["meminfo"] = meminfo + except Exception as e: + info["meminfo"] = {"error": str(e)} + # DMI + dmi_path = Path("/sys/class/dmi/id/") + dmi_info = {} + if dmi_path.is_dir(): + for fpath in dmi_path.iterdir(): + if fpath.is_file() and os.access(fpath, os.R_OK): + with fpath.open() as f: + dmi_info[fpath.name] = f.read().strip() + info["dmi"] = dmi_info + return info + + +def get_os_info() -> dict: + info = {} + try: + with Path("/etc/os-release").open() as f: + info["os-release"] = f.read() + except Exception as e: + info["os-release"] = f"Error: {e}" + info["platform"] = platform.platform() + info["uname"] = platform.uname() + return info + + +def diagnostics_info() -> dict: + info = {} + info["running_as_root"] = check_sudo() + info["disk_info"] = get_disk_info() + devs = walk_dev() + info["devices"] = devs + info["dmesg"] = get_dmesg() + info["hardware_info"] = get_hardware_info() + info["os_info"] = get_os_info() + return info + + +def diagnostics_info_json(output: Path) -> None: + data = diagnostics_info() + with output.open("w") as f: + json.dump(data, f, default=str, indent=2) + + +def parse_key_value_lines(lines: list[str]) -> dict[str, str]: + dev_tree = [] + for root, _, files in os.walk("/dev"): + for name in files: + path_obj = Path(root) / name + dev_path = str(path_obj) + info = {"path": dev_path} + try: + st = path_obj.stat() + if stat.S_ISBLK(st.st_mode): + info["type"] = "block" + elif stat.S_ISCHR(st.st_mode): + info["type"] = "char" + else: + info["type"] = "other" + info["major"] = os.major(st.st_rdev) + info["minor"] = os.minor(st.st_rdev) + info["mode"] = oct(st.st_mode) + info["owner"] = st.st_uid + info["group"] = st.st_gid + if info["type"] == "block": + try: + blkid = subprocess.run(["blkid", dev_path], capture_output=True, text=True) + info["blkid"] = blkid.stdout.strip() + except Exception: + info["blkid"] = None + except Exception as e: + info["error"] = str(e) + dev_tree.append(info) + return dev_tree + + +def parse_blkid_output(blkid_str: str) -> dict: + """Parse blkid output string into a dictionary of key-value pairs.""" + # Example: /dev/sda1: UUID="abcd-1234" TYPE="ext4" PARTUUID="efgh-5678" + parts = blkid_str.split(None, 1) + blkid_info = {} + if len(parts) == 2: + for item in parts[1].split(): + if "=" in item: + k, v = item.split("=", 1) + blkid_info[k] = v.strip('"') + return blkid_info diff --git a/acquire/diagnostics/windows.py b/acquire/diagnostics/windows.py new file mode 100644 index 00000000..55dede29 --- /dev/null +++ b/acquire/diagnostics/windows.py @@ -0,0 +1,198 @@ +import ctypes +import getpass +import platform +import socket +import subprocess + + +def is_admin() -> bool: + try: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + except Exception: + return False + + +def get_os_info() -> dict: + info = {} + info["platform"] = platform.platform() + info["system"] = platform.system() + info["release"] = platform.release() + info["version"] = platform.version() + info["architecture"] = platform.architecture() + info["node"] = platform.node() + info["uname"] = platform.uname() + return info + + +def get_user_info() -> dict: + info = {} + info["username"] = getpass.getuser() + info["hostname"] = socket.gethostname() + return info + + +def get_drives() -> list[str]: + drives = [] + bitmask = ctypes.windll.kernel32.GetLogicalDrives() + for letter in "ABCDEFGHIJKLMNOPQRSTUVWXYZ": + if bitmask & 1: + drives.append(f"{letter}:\\") + bitmask >>= 1 + return drives + + +def get_drive_info() -> dict: + drives = get_drives() + info = {} + errors = {} + for drive in drives: + total, free = ctypes.c_ulonglong(), ctypes.c_ulonglong() + try: + ctypes.windll.kernel32.GetDiskFreeSpaceExW( + ctypes.c_wchar_p(drive), None, ctypes.byref(total), ctypes.byref(free) + ) + info[drive] = {"total_GB": total.value // (1024**3), "free_GB": free.value // (1024**3)} + except Exception as e: + errors[drive] = f"Error: {e}" + info.update(errors) + return info + + +def get_drive_filesystems() -> dict: + """Return a dict mapping drive letters to filesystem types using GetVolumeInformationW.""" + filesystems = {} + drives = get_drives() + GetVolumeInformationW = ctypes.windll.kernel32.GetVolumeInformationW + for drive in drives: + fs_name_buf = ctypes.create_unicode_buffer(255) + try: + res = GetVolumeInformationW( + ctypes.c_wchar_p(drive), None, 0, None, None, None, fs_name_buf, ctypes.sizeof(fs_name_buf) + ) + if res: + filesystems[drive] = fs_name_buf.value + else: + filesystems[drive] = "Unknown or inaccessible" + except Exception as e: + filesystems[drive] = f"Error: {e}" + return filesystems + + +def parse_hardware_info(raw_info: dict) -> dict: + def parse_block(block: str) -> dict: + result = {} + for line in block.splitlines(): + if ":" in line: + key, value = line.split(":", 1) + result[key.strip()] = value.strip() + return result + + parsed = {} + # Parse CPU info + cpu_raw = raw_info.get("cpu", "") + if cpu_raw and not cpu_raw.startswith("Error"): + cpus = [parse_block(b) for b in cpu_raw.split("\n\n") if b.strip()] + parsed["cpu"] = cpus + else: + parsed["cpu"] = cpu_raw + # Parse memory info + mem_raw = raw_info.get("memory", "") + if mem_raw and not mem_raw.startswith("Error"): + mems = [parse_block(b) for b in mem_raw.split("\n\n") if b.strip()] + parsed["memory"] = mems + else: + parsed["memory"] = mem_raw + # Parse baseboard info + base_raw = raw_info.get("baseboard", "") + if base_raw and not base_raw.startswith("Error"): + bases = [parse_block(b) for b in base_raw.split("\n\n") if b.strip()] + parsed["baseboard"] = bases + else: + parsed["baseboard"] = base_raw + return parsed + + +def get_hardware_info() -> dict: + info = {} + try: + output = subprocess.check_output( + [ + "powershell", + "-Command", + "Get-CimInstance Win32_Processor | Select-Object Name,NumberOfCores,NumberOfLogicalProcessors,MaxClockSpeed | Format-List", # noqa: E501 + ], + universal_newlines=True, + ) + info["cpu"] = output.strip() + except Exception as e: + info["cpu"] = f"Error: {e}" + try: + output = subprocess.check_output( + [ + "powershell", + "-Command", + "Get-CimInstance Win32_PhysicalMemory | Select-Object Capacity,Speed,Manufacturer | Format-List", + ], + universal_newlines=True, + ) + info["memory"] = output.strip() + except Exception as e: + info["memory"] = f"Error: {e}" + try: + output = subprocess.check_output( + [ + "powershell", + "-Command", + "Get-CimInstance Win32_BaseBoard | Select-Object Manufacturer,Product,SerialNumber | Format-List", + ], + universal_newlines=True, + ) + info["baseboard"] = output.strip() + except Exception as e: + info["baseboard"] = f"Error: {e}" + return parse_hardware_info(info) + + +def parse_software_info(raw_info: dict) -> dict: + def parse_block(block: str) -> dict: + result = {} + for line in block.splitlines(): + if ":" in line: + key, value = line.split(":", 1) + result[key.strip()] = value.strip() + return result + + parsed = {} + sw_raw = raw_info.get("installed_software", "") + if sw_raw and not sw_raw.startswith("Error"): + sws = [parse_block(b) for b in sw_raw.split("\n\n") if b.strip()] + parsed["installed_software"] = sws + else: + parsed["installed_software"] = sw_raw + return parsed + + +def get_software_info() -> dict: + info = {} + try: + output = subprocess.check_output( + ["powershell", "-Command", "Get-CimInstance Win32_Product | Select-Object Name,Version | Format-List"], + universal_newlines=True, + stderr=subprocess.DEVNULL, + ) + info["installed_software"] = output.strip() + except Exception as e: + info["installed_software"] = f"Error: {e}" + return parse_software_info(info) + + +def diagnostics_info() -> dict: + info = {} + info["running_as_admin"] = is_admin() + info["os_info"] = get_os_info() + info["user_info"] = get_user_info() + info["drive_info"] = get_drive_info() + info["drive_filesystems"] = get_drive_filesystems() + info["hardware_info"] = get_hardware_info() + info["software_info"] = get_software_info() + return info diff --git a/acquire/utils.py b/acquire/utils.py index 025d6eac..774e84cc 100644 --- a/acquire/utils.py +++ b/acquire/utils.py @@ -82,6 +82,16 @@ def create_argument_parser(profiles: dict, volatile: dict, modules: dict) -> arg output_group.add_argument("-o", "--output", default=Path(), type=Path, help="output directory") output_group.add_argument("-of", "--output-file", type=Path, help="output filename") + output_group.add_argument( + "-dg", + "--diagnostics", + nargs="?", + const=True, + default=False, + metavar="[PATH]", + help="gather diagnostics information and print as JSON (optional path, defaults to logfile stem)", + ) + parser.add_argument( "-ot", "--output-type",