diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 0cbeb16..e23749c 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -5,13 +5,14 @@ import os import pathlib import re +import shlex import sys import warnings from binascii import a2b_hex, b2a_hex from datetime import datetime as _dt from datetime import timezone from posixpath import basename, dirname -from typing import Any, Optional, Tuple +from typing import Any, Optional from urllib.parse import urlparse try: @@ -34,8 +35,8 @@ PY_311 = sys.version_info >= (3, 11, 0) PY_312 = sys.version_info >= (3, 12, 0) -PATH_POSIX = 0 -PATH_WINDOWS = 1 +TYPE_POSIX = 0 +TYPE_WINDOWS = 1 string_type = str varint_type = int @@ -694,15 +695,15 @@ def __repr__(self) -> str: return repr(str(self)) def _pack(self): - path_type = PATH_WINDOWS if isinstance(self, windows_path) else PATH_POSIX + path_type = TYPE_WINDOWS if isinstance(self, windows_path) else TYPE_POSIX return (str(self), path_type) @classmethod - def _unpack(cls, data: Tuple[str, str]): + def _unpack(cls, data: tuple[str, str]): path_, path_type = data - if path_type == PATH_POSIX: + if path_type == TYPE_POSIX: return posix_path(path_) - elif path_type == PATH_WINDOWS: + elif path_type == TYPE_WINDOWS: return windows_path(path_) else: # Catch all: default to posix_path @@ -734,3 +735,115 @@ def __repr__(self) -> str: quote = '"' return f"{quote}{s}{quote}" + + +class command(FieldType): + executable: Optional[path] = None + args: Optional[list[str]] = None + + _path_type: type[path] = None + _posix: bool + + def __new__(cls, value: str) -> command: + if cls is not command: + return super().__new__(cls) + + if not isinstance(value, str): + raise ValueError(f"Expected a value of type 'str' not {type(value)}") + + # pre checking for windows like paths + # This checks for windows like starts of a path: + # an '%' for an environment variable + # r'\\' for a UNC path + # the strip and check for ":" on the second line is for `:` + windows = value.startswith((r"\\", "%")) or value.lstrip("\"'")[1] == ":" + + if windows: + cls = windows_command + else: + cls = posix_command + return super().__new__(cls) + + def __init__(self, value: str | tuple[str, tuple[str]] | None): + if value is None: + return + + if isinstance(value, str): + self.executable, self.args = self._split(value) + return + + executable, self.args = value + self.executable = self._path_type(executable) + self.args = list(self.args) + + def __repr__(self) -> str: + return f"(executable={self.executable!r}, args={self.args})" + + def __eq__(self, other: Any) -> bool: + if isinstance(other, command): + return self.executable == other.executable and self.args == other.args + elif isinstance(other, str): + return self._join() == other + elif isinstance(other, (tuple, list)): + return self.executable == other[0] and self.args == list(other[1:]) + + return False + + def _split(self, value: str) -> tuple[str, list[str]]: + executable, *args = shlex.split(value, posix=self._posix) + executable = executable.strip("'\" ") + + return self._path_type(executable), args + + def _join(self) -> str: + return shlex.join([str(self.executable)] + self.args) + + def _pack(self) -> tuple[tuple[str, list], str]: + command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX + if self.executable: + _exec, _ = self.executable._pack() + return ((_exec, self.args), command_type) + else: + return (None, command_type) + + @classmethod + def _unpack(cls, data: tuple[tuple[str, tuple] | None, int]) -> command: + _value, _type = data + if _type == TYPE_WINDOWS: + return windows_command(_value) + + return posix_command(_value) + + @classmethod + def from_posix(cls, value: str) -> command: + return posix_command(value) + + @classmethod + def from_windows(cls, value: str) -> command: + return windows_command(value) + + +class posix_command(command): + _posix = True + _path_type = posix_path + + +class windows_command(command): + _posix = False + _path_type = windows_path + + def _split(self, value: str) -> tuple[str, list[str]]: + executable, args = super()._split(value) + if args: + args = [" ".join(args)] + + return executable, args + + def _join(self) -> str: + arg = f" {self.args[0]}" if self.args else "" + executable_str = str(self.executable) + + if " " in executable_str: + return f"'{executable_str}'{arg}" + + return f"{executable_str}{arg}" diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 7808d24..9004482 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -72,6 +72,11 @@ def pack_obj(self, obj): return base64.b64encode(obj).decode() if isinstance(obj, fieldtypes.path): return str(obj) + if isinstance(obj, fieldtypes.command): + return { + "executable": obj.executable, + "args": obj.args, + } raise Exception("Unpackable type " + str(type(obj))) diff --git a/flow/record/whitelist.py b/flow/record/whitelist.py index 6e1aa92..6e71420 100644 --- a/flow/record/whitelist.py +++ b/flow/record/whitelist.py @@ -1,5 +1,6 @@ WHITELIST = [ "boolean", + "command", "dynamic", "datetime", "filesize", diff --git a/tests/test_fieldtypes.py b/tests/test_fieldtypes.py index 0103215..4eeec91 100644 --- a/tests/test_fieldtypes.py +++ b/tests/test_fieldtypes.py @@ -1,4 +1,5 @@ # coding: utf-8 +from __future__ import annotations import hashlib import os @@ -12,14 +13,22 @@ import flow.record.fieldtypes from flow.record import RecordDescriptor, RecordReader, RecordWriter from flow.record.fieldtypes import ( - PATH_POSIX, - PATH_WINDOWS, PY_312, + TYPE_POSIX, + TYPE_WINDOWS, _is_posixlike_path, _is_windowslike_path, + command, ) from flow.record.fieldtypes import datetime as dt -from flow.record.fieldtypes import fieldtype_for_value, net, uri, windows_path +from flow.record.fieldtypes import ( + fieldtype_for_value, + net, + posix_command, + uri, + windows_command, + windows_path, +) UTC = timezone.utc @@ -639,16 +648,16 @@ def test_path(): assert isinstance(test_path, flow.record.fieldtypes.windows_path) test_path = flow.record.fieldtypes.path.from_posix(posix_path_str) - assert test_path._pack() == (posix_path_str, PATH_POSIX) + assert test_path._pack() == (posix_path_str, TYPE_POSIX) - test_path = flow.record.fieldtypes.path._unpack((posix_path_str, PATH_POSIX)) + test_path = flow.record.fieldtypes.path._unpack((posix_path_str, TYPE_POSIX)) assert str(test_path) == posix_path_str assert isinstance(test_path, flow.record.fieldtypes.posix_path) test_path = flow.record.fieldtypes.path.from_windows(windows_path_str) - assert test_path._pack() == (windows_path_str, PATH_WINDOWS) + assert test_path._pack() == (windows_path_str, TYPE_WINDOWS) - test_path = flow.record.fieldtypes.path._unpack((windows_path_str, PATH_WINDOWS)) + test_path = flow.record.fieldtypes.path._unpack((windows_path_str, TYPE_WINDOWS)) assert str(test_path) == windows_path_str assert isinstance(test_path, flow.record.fieldtypes.windows_path) @@ -998,5 +1007,130 @@ def test_datetime_comparisions(): assert dt("2023-01-02") != datetime(2023, 3, 4, tzinfo=UTC) +def test_command_record() -> None: + TestRecord = RecordDescriptor( + "test/command", + [ + ("command", "commando"), + ], + ) + + record = TestRecord(commando="help.exe -h") + assert isinstance(record.commando, posix_command) + assert record.commando.executable == "help.exe" + assert record.commando.args == ["-h"] + + record = TestRecord(commando="something.so -h -q -something") + assert isinstance(record.commando, posix_command) + assert record.commando.executable == "something.so" + assert record.commando.args == ["-h", "-q", "-something"] + + +def test_command_integration(tmp_path: pathlib.Path) -> None: + TestRecord = RecordDescriptor( + "test/command", + [ + ("command", "commando"), + ], + ) + + with RecordWriter(tmp_path / "command_record") as writer: + record = TestRecord(commando=r"\\.\\?\some_command.exe -h,help /d quiet") + writer.write(record) + assert record.commando.executable == r"\\.\\?\some_command.exe" + assert record.commando.args == [r"-h,help /d quiet"] + + with RecordReader(tmp_path / "command_record") as reader: + for record in reader: + assert record.commando.executable == r"\\.\\?\some_command.exe" + assert record.commando.args == [r"-h,help /d quiet"] + + +def test_command_integration_none(tmp_path: pathlib.Path) -> None: + TestRecord = RecordDescriptor( + "test/command", + [ + ("command", "commando"), + ], + ) + + with RecordWriter(tmp_path / "command_record") as writer: + record = TestRecord(commando=command.from_posix(None)) + writer.write(record) + with RecordReader(tmp_path / "command_record") as reader: + for record in reader: + assert record.commando.executable is None + assert record.commando.args is None + + +@pytest.mark.parametrize( + "command_string, expected_executable, expected_argument", + [ + # Test relative windows paths + ("windows.exe something,or,somethingelse", "windows.exe", ["something,or,somethingelse"]), + # Test weird command strings for windows + ("windows.dll something,or,somethingelse", "windows.dll", ["something,or,somethingelse"]), + # Test environment variables + (r"%WINDIR%\\windows.dll something,or,somethingelse", r"%WINDIR%\\windows.dll", ["something,or,somethingelse"]), + # Test a quoted path + (r"'c:\path to some exe' /d /a", r"c:\path to some exe", [r"/d /a"]), + # Test a unquoted path + (r"'c:\Program Files\hello.exe'", r"c:\Program Files\hello.exe", []), + # Test an unquoted path with a path as argument + (r"'c:\Program Files\hello.exe' c:\startmepls.exe", r"c:\Program Files\hello.exe", [r"c:\startmepls.exe"]), + (None, None, None), + ], +) +def test_command_windows(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: + cmd = windows_command(command_string) + + assert cmd.executable == expected_executable + assert cmd.args == expected_argument + + +@pytest.mark.parametrize( + "command_string, expected_executable, expected_argument", + [ + # Test relative posix command + ("some_file.so -h asdsad -f asdsadas", "some_file.so", ["-h", "asdsad", "-f", "asdsadas"]), + # Test command with spaces + (r"/bin/hello\ world -h -word", r"/bin/hello world", ["-h", "-word"]), + ], +) +def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: + cmd = posix_command(command_string) + + assert cmd.executable == expected_executable + assert cmd.args == expected_argument + + +def test_command_equal() -> None: + assert command("hello.so -h") == command("hello.so -h") + assert command("hello.so -h") != command("hello.so") + + # Test different types with the comparitor + assert command("hello.so -h") == ["hello.so", "-h"] + assert command("hello.so -h") == ("hello.so", "-h") + assert command("hello.so -h") == "hello.so -h" + assert command("c:\\hello.dll -h -b") == "c:\\hello.dll -h -b" + + # Compare paths that contain spaces + assert command("'/home/some folder/file' -h") == "'/home/some folder/file' -h" + assert command("'c:\\Program files\\some.dll' -h -q") == "'c:\\Program files\\some.dll' -h -q" + assert command("'c:\\program files\\some.dll' -h -q") == ["c:\\program files\\some.dll", "-h -q"] + assert command("'c:\\Program files\\some.dll' -h -q") == ("c:\\Program files\\some.dll", "-h -q") + + # Test failure conditions + assert command("hello.so -h") != 1 + assert command("hello.so") != "hello.so -h" + assert command("hello.so") != ["hello.so", ""] + assert command("hello.so") != ("hello.so", "") + + +def test_command_failed() -> None: + with pytest.raises(ValueError): + command(b"failed") + + if __name__ == "__main__": __import__("standalone_test").main(globals())