diff --git a/pydfirram/core/base.py b/pydfirram/core/base.py index 2222155..1098c3c 100644 --- a/pydfirram/core/base.py +++ b/pydfirram/core/base.py @@ -145,7 +145,17 @@ def __init__( self.dump_file = dump_file self.context = contexts.Context() self.plugin = plugin + self.automag : automagic + def set_context(self): + dump_file_location = self.get_dump_file_location() + self.context.config[self.KEY_STACKERS] = self.os_stackers() + self.context.config[self.KEY_SINGLE_LOCATION] = dump_file_location + + def set_automagic(self): + self.automag = self.automagics() + + def build(self) -> interfaces.plugins.PluginInterface: """Build a basic context for the provided plugin. @@ -156,18 +166,16 @@ def build(self) -> interfaces.plugins.PluginInterface: VolatilityExceptions.UnsatisfiedException: If the plugin cannot be built. """ plugin = self.plugin.interface - automagics = self.automagics() - dump_file_location = self.get_dump_file_location() + #automagics = self.automagics() + #self.set_context() base_config_path = "plugins" file_handler = create_file_handler(os.getcwd()) - self.context.config[self.KEY_STACKERS] = self.os_stackers() - self.context.config[self.KEY_SINGLE_LOCATION] = dump_file_location try: # Construct the plugin, clever magic figures out how to # fulfill each requirement that might not be fulfilled constructed = construct_plugin( self.context, - automagics, + self.automag, plugin, # type: ignore base_config_path, None, # no progress callback for now @@ -313,13 +321,14 @@ def run_plugin(self, plugin: PluginEntry, **kwargs: Any) -> Any: ValueError: If the context is not built. """ self.context = Context(self.os, self.dump_file, plugin) # type: ignore - context = self.context.build() # type: ignore - + self.context.set_automagic() + self.context.set_context() + builded_context = self.context.build() # type: ignore if kwargs: - context = self.context.add_arguments(context,kwargs) + runable_context = self.context.add_arguments(builded_context,kwargs) if self.context is None: raise ValueError("Context not built.") - return context.run() + return runable_context.run() def validate_dump_file(self, dump_file: Path) -> bool: """Validate dump file location. diff --git a/pydfirram/core/renderer.py b/pydfirram/core/renderer.py index d6214f5..916532b 100644 --- a/pydfirram/core/renderer.py +++ b/pydfirram/core/renderer.py @@ -147,7 +147,25 @@ def to_list(self) -> Dict: except Exception as e: logger.error("Impossible to render data in dictionary form.") raise e + def file_render(self)-> None: + """ + Convert the data to a list format. + + This method attempts to render the input data using the TreeGrid_to_json class, + and convert it to a dictionary. + + Returns: + Dict: The rendered data in list format. + Raises: + Exception: If rendering the data fails. + """ + try: + formatted = TreeGrid_to_json().render(self.data) + except Exception as e: + logger.error("Impossible to render data in dictionary form.") + raise e + def to_json(self) -> str: """ Convert the data to a JSON string. diff --git a/pydfirram/modules/windows.py b/pydfirram/modules/windows.py index 9a08618..2bba296 100644 --- a/pydfirram/modules/windows.py +++ b/pydfirram/modules/windows.py @@ -27,7 +27,8 @@ >>> print(plugin) """ -from pydfirram.core.base import Generic, OperatingSystem +from pydfirram.core.base import Generic, OperatingSystem,Context +from pydfirram.core.renderer import Renderer class Windows(Generic): @@ -60,4 +61,50 @@ def __init__(self, dumpfile): -------- >>> windows = Windows("path/to/dump.raw": Path) """ + self.dump_files = dumpfile super().__init__(OperatingSystem.WINDOWS, dumpfile) + + def _set_argument(self,context, prefix, kwargs): + for k, v in kwargs.items(): + print(k,v) + context.config[prefix+k] = v + return context + + def dumpfiles(self,**kwargs) -> None: + """ + Dump memory files based on provided parameters. + + This method utilizes the "dumpfiles" plugin to create memory dumps from a + Windows operating system context. The memory dumps can be filtered based + on the provided arguments. If no parameters are provided, the method will + dump the entire system by default. + + Parameters: + ----------- + physaddr : int, optional + The physical address offset for the memory dump. + virtaddr : int, optional + The virtual address offset for the memory dump. + pid : int, optional + The process ID for which the memory dump should be generated. + + Notes: + ------ + - The method sets up the context with the operating system and dump files. + - Automagic and context settings are configured before building the context. + - If additional keyword arguments are provided, they are added as arguments to the context. + - The resulting context is executed and rendered to a file using the Renderer class. + - If no parameters are provided, the method will dump the entire system by default. + + Returns: + -------- + None + """ + plugin = self.get_plugin("dumpfiles") + context = Context(OperatingSystem.WINDOWS, self.dump_files, plugin) # type: ignore + context.set_automagic() + context.set_context() + builded_context = context.build() + if kwargs: + runable_context = context.add_arguments(builded_context,kwargs) + Renderer(runable_context.run()).file_render() diff --git a/tests/config.py b/tests/config.py index 6b8b858..7f5f8df 100644 --- a/tests/config.py +++ b/tests/config.py @@ -1,3 +1,3 @@ from pathlib import Path -DUMP_FILE = Path("./data/dump.raw") \ No newline at end of file +DUMP_FILE = Path("/home/braguette/dataset_memory/ch2.dmp") \ No newline at end of file diff --git a/tests/test_volatility_windows_function.py b/tests/test_volatility_windows_function.py index ee4f905..277ba64 100644 --- a/tests/test_volatility_windows_function.py +++ b/tests/test_volatility_windows_function.py @@ -1,6 +1,7 @@ import pytest from pathlib import Path from pydfirram.core.base import Generic, OperatingSystem +from pydfirram.modules.windows import Windows from pydfirram.core.renderer import Renderer from loguru import logger from .config import DUMP_FILE @@ -17,6 +18,12 @@ def generic_instance() -> Generic: dumpfile = Path(DUMP_FILE) return Generic(os, dumpfile) +@pytest.fixture +def windows_instance() -> Windows : + dumpfile = Path(DUMP_FILE) + return Windows(dumpfile) + +@pytest.mark.pslist def test_volatility_pslist(generic_instance: Generic) -> None: """ Test the volatility PsList function @@ -30,6 +37,8 @@ def test_volatility_pslist(generic_instance: Generic) -> None: assert len(pslist_content) > 0, "Output list is empty" logger.success("TEST PASSED!") +@pytest.mark.pslist +@pytest.mark.pslist_pid def test_volatilty_pslist_with_args_pid(generic_instance : Generic) -> None : logger.opt(colors=True).info("pslist with args from volatility is running") output : Renderer = generic_instance.pslist(pid=[4]) @@ -39,12 +48,14 @@ def test_volatilty_pslist_with_args_pid(generic_instance : Generic) -> None : assert len(pslist_content) == 1 logger.success("TEST PASSED !") +@pytest.mark.banners def test_volatility_banners(generic_instance : Generic) -> None : logger.opt(colors=True).info("banners from volatility is running") output : Renderer = generic_instance.banners(pid=[4]) assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.cmdline def test_volatility_cmdline(generic_instance : Generic) -> None : logger.opt(colors=True).info("cmdline from volatility is running") output : Renderer = generic_instance.cmdline() @@ -54,6 +65,7 @@ def test_volatility_cmdline(generic_instance : Generic) -> None : assert isinstance(cmdline_content,list),"Not a list" assert len(cmdline_content) > 0 +@pytest.mark.dlllist def test_volatility_dlllist(generic_instance : Generic) -> None : logger.opt(colors=True).info("dlllist from volatility is running") output : Renderer = generic_instance.dlllist() @@ -63,60 +75,70 @@ def test_volatility_dlllist(generic_instance : Generic) -> None : assert len(dllist_content) > 0 logger.success("TEST PASSED !") +@pytest.mark.bigpools def test_bigpools(generic_instance : Generic) -> None : logger.opt(colors=True).info("bigpools from volatility is running") output : Renderer = generic_instance.bigpools() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.callbacks def test_callbacks(generic_instance : Generic) -> None : logger.opt(colors=True).info("callbacks from volatility is running") output : Renderer = generic_instance.callbacks() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.certificates def test_certificates(generic_instance : Generic) -> None : logger.opt(colors=True).info("certificate from volatility is running") output : Renderer = generic_instance.certificates() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.configwriter def test_configwriter(generic_instance : Generic) -> None : logger.opt(colors=True).info("configwriter from volatility is running") output : Renderer = generic_instance.configwriter() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.crashinfo def test_crashinfo(generic_instance : Generic) -> None : logger.opt(colors=True).info("crashinfo from volatility is running") with pytest.raises(Exception): generic_instance.crashinfo() logger.success("TEST PASSED !") +@pytest.mark.devicetree def test_devicetree(generic_instance : Generic) -> None : logger.opt(colors=True).info("devicetree from volatility is running") output : Renderer = generic_instance.devicetree() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.driverirp def test_driverirp(generic_instance : Generic) -> None : logger.opt(colors=True).info("driverirp from volatility is running") output : Renderer = generic_instance.driverirp() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.drivermodules def test_drivermodule(generic_instance : Generic) -> None : logger.opt(colors=True).info("drivermodule from volatility is running") output : Renderer = generic_instance.drivermodule() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.driverscan def test_driverscan(generic_instance : Generic) -> None : logger.opt(colors=True).info("driverscan from volatility is running") output : Renderer = generic_instance.driverscan() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.envars def test_envars(generic_instance : Generic) -> None : logger.opt(colors=True).info("envars from volatility is running") output : Renderer = generic_instance.envars() @@ -126,30 +148,35 @@ def test_envars(generic_instance : Generic) -> None : assert len(envars_content) > 0 logger.success("TEST PASSED !") +@pytest.mark.hivelist def test_hivelist(generic_instance : Generic) -> None : logger.opt(colors=True).info("hivelist from volatility is running") output : Renderer = generic_instance.hivelist() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.hivescan def test_hivescan(generic_instance : Generic) -> None : logger.opt(colors=True).info("hivescan from volatility is running") output : Renderer = generic_instance.hivescan() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.iat def test_iat(generic_instance : Generic) -> None : logger.opt(colors=True).info("iat from volatility is running") output : Renderer = generic_instance.iat() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.info def test_info(generic_instance : Generic) -> None : logger.opt(colors=True).info("info from volatility is running") output : Renderer = generic_instance.info() assert isinstance(output, Renderer), "Error during function execution" logger.success("TEST PASSED !") +@pytest.mark.pstree def test_pstree(generic_instance : Generic) -> None : logger.opt(colors=True).info("pstree from volatility is running") output : Renderer = generic_instance.pstree() @@ -158,3 +185,75 @@ def test_pstree(generic_instance : Generic) -> None : assert isinstance(cmdline_content,list),"Not a list" assert len(cmdline_content) > 0 logger.success("TEST PASSED !") + +@pytest.mark.dumpfile_pid +@pytest.mark.dumpfiles +def test_dumpfile_with_args_pid(windows_instance : Windows): + current_directory = Path.cwd() + initial_files = set(current_directory.glob("file.*")) + new_files = set() + + try: + result = windows_instance.dumpfiles(pid=4) + assert result is None, "The dumpfile method should return a non-null result" + new_files = set(current_directory.glob("file.*")) - initial_files + assert len(new_files) >= 1, f"Expected exactly one new file starting with 'file.', but found {len(new_files)}" + logger.opt(colors=True).info(f"number of file dumped {len(new_files)}") + except Exception as e: + pytest.fail(f"An exception should not be raised: {e}") + finally: + for new_file in new_files: + try: + new_file.unlink() + except Exception as cleanup_error: + print(f"Failed to delete {new_file}: {cleanup_error}") + +@pytest.mark.dumpfile_physaddr +@pytest.mark.dumpfiles +def test_dumpfile_with_args_physaddr(windows_instance : Windows): + current_directory = Path.cwd() + initial_files = set(current_directory.glob("file.*")) + new_files = set() + + try: + result = windows_instance.dumpfiles(physaddr=533517296) + assert result is None, "The dumpfile method should return a non-null result" + # Check if new files starting with 'file.' are created + new_files = set(current_directory.glob("file.*")) - initial_files + assert len(new_files) == 1, f"Expected exactly one new file starting with 'file.', but found {len(new_files)}" + except Exception as e: + pytest.fail(f"An exception should not be raised: {e}") + + finally: + # Clean up any new files created during the test + for new_file in new_files: + try: + new_file.unlink() + except Exception as cleanup_error: + print(f"Failed to delete {new_file}: {cleanup_error}") + +#Not able to test virtaddr locally +@pytest.mark.dumpfiles +@pytest.mark.dumpfile_virtaddr +def test_dumpfile_with_args_virtaddr(windows_instance : Windows): + current_directory = Path.cwd() + initial_files = set(current_directory.glob("file.*")) + new_files = set() + value = 2274855800 + try: + result = windows_instance.dumpfiles(virtaddr=value) + # Check if new files starting with 'file.' with the value in hex are created + file_created = "file." + hex(value) + new_files = set(current_directory.glob(file_created)) - initial_files + assert len(new_files) == 1, f"Expected exactly one new file starting with 'file.', but found {len(new_files)}" + + except Exception as e: + pytest.fail(f"An exception should not be raised: {e}") + + finally: + # Clean up any new files created during the test + for new_file in new_files: + try: + new_file.unlink() + except Exception as cleanup_error: + print(f"Failed to delete {new_file}: {cleanup_error}") \ No newline at end of file