Skip to content

Commit

Permalink
Fix dumpfiles (#46)
Browse files Browse the repository at this point in the history
* fix : Dumpfile issue with parameters

* test : Create test for dumpfiles

* fix context builder

* fix dumpfiles : dumpfiles can now be passed with arguments

* tests: add tests for each parameters of dumpfiles
fix: add markers on tests to easily execute a bunch of test instead of the complete file

* fix : kwargs value in set_arguments was setted to int directly

* add : Add test fonctions to test dumpfiles with a virtaddr but not able to test locally

* add : add pytest decorator markers to pslist_pid

---------

Co-authored-by: Br4guette <Br4guette@pm.me>
  • Loading branch information
Ston14 and Br4guette authored Jul 11, 2024
1 parent f51db37 commit 40aa52a
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 11 deletions.
27 changes: 18 additions & 9 deletions pydfirram/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,17 @@ def __init__(
self.dump_file = dump_file
self.context = contexts.Context()
self.plugin = plugin
self.automag : automagic

def set_context(self):
dump_file_location = self.get_dump_file_location()
self.context.config[self.KEY_STACKERS] = self.os_stackers()
self.context.config[self.KEY_SINGLE_LOCATION] = dump_file_location

def set_automagic(self):
self.automag = self.automagics()


def build(self) -> interfaces.plugins.PluginInterface:
"""Build a basic context for the provided plugin.
Expand All @@ -156,18 +166,16 @@ def build(self) -> interfaces.plugins.PluginInterface:
VolatilityExceptions.UnsatisfiedException: If the plugin cannot be built.
"""
plugin = self.plugin.interface
automagics = self.automagics()
dump_file_location = self.get_dump_file_location()
#automagics = self.automagics()
#self.set_context()
base_config_path = "plugins"
file_handler = create_file_handler(os.getcwd())
self.context.config[self.KEY_STACKERS] = self.os_stackers()
self.context.config[self.KEY_SINGLE_LOCATION] = dump_file_location
try:
# Construct the plugin, clever magic figures out how to
# fulfill each requirement that might not be fulfilled
constructed = construct_plugin(
self.context,
automagics,
self.automag,
plugin, # type: ignore
base_config_path,
None, # no progress callback for now
Expand Down Expand Up @@ -313,13 +321,14 @@ def run_plugin(self, plugin: PluginEntry, **kwargs: Any) -> Any:
ValueError: If the context is not built.
"""
self.context = Context(self.os, self.dump_file, plugin) # type: ignore
context = self.context.build() # type: ignore

self.context.set_automagic()
self.context.set_context()
builded_context = self.context.build() # type: ignore
if kwargs:
context = self.context.add_arguments(context,kwargs)
runable_context = self.context.add_arguments(builded_context,kwargs)
if self.context is None:
raise ValueError("Context not built.")
return context.run()
return runable_context.run()

def validate_dump_file(self, dump_file: Path) -> bool:
"""Validate dump file location.
Expand Down
18 changes: 18 additions & 0 deletions pydfirram/core/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,25 @@ def to_list(self) -> Dict:
except Exception as e:
logger.error("Impossible to render data in dictionary form.")
raise e
def file_render(self)-> None:
"""
Convert the data to a list format.
This method attempts to render the input data using the TreeGrid_to_json class,
and convert it to a dictionary.
Returns:
Dict: The rendered data in list format.
Raises:
Exception: If rendering the data fails.
"""
try:
formatted = TreeGrid_to_json().render(self.data)
except Exception as e:
logger.error("Impossible to render data in dictionary form.")
raise e

def to_json(self) -> str:
"""
Convert the data to a JSON string.
Expand Down
49 changes: 48 additions & 1 deletion pydfirram/modules/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
>>> print(plugin)
"""

from pydfirram.core.base import Generic, OperatingSystem
from pydfirram.core.base import Generic, OperatingSystem,Context
from pydfirram.core.renderer import Renderer


class Windows(Generic):
Expand Down Expand Up @@ -60,4 +61,50 @@ def __init__(self, dumpfile):
--------
>>> windows = Windows("path/to/dump.raw": Path)
"""
self.dump_files = dumpfile
super().__init__(OperatingSystem.WINDOWS, dumpfile)

def _set_argument(self,context, prefix, kwargs):
for k, v in kwargs.items():
print(k,v)
context.config[prefix+k] = v
return context

def dumpfiles(self,**kwargs) -> None:
"""
Dump memory files based on provided parameters.
This method utilizes the "dumpfiles" plugin to create memory dumps from a
Windows operating system context. The memory dumps can be filtered based
on the provided arguments. If no parameters are provided, the method will
dump the entire system by default.
Parameters:
-----------
physaddr : int, optional
The physical address offset for the memory dump.
virtaddr : int, optional
The virtual address offset for the memory dump.
pid : int, optional
The process ID for which the memory dump should be generated.
Notes:
------
- The method sets up the context with the operating system and dump files.
- Automagic and context settings are configured before building the context.
- If additional keyword arguments are provided, they are added as arguments to the context.
- The resulting context is executed and rendered to a file using the Renderer class.
- If no parameters are provided, the method will dump the entire system by default.
Returns:
--------
None
"""
plugin = self.get_plugin("dumpfiles")
context = Context(OperatingSystem.WINDOWS, self.dump_files, plugin) # type: ignore
context.set_automagic()
context.set_context()
builded_context = context.build()
if kwargs:
runable_context = context.add_arguments(builded_context,kwargs)
Renderer(runable_context.run()).file_render()
2 changes: 1 addition & 1 deletion tests/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from pathlib import Path

DUMP_FILE = Path("./data/dump.raw")
DUMP_FILE = Path("/home/braguette/dataset_memory/ch2.dmp")
99 changes: 99 additions & 0 deletions tests/test_volatility_windows_function.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from pathlib import Path
from pydfirram.core.base import Generic, OperatingSystem
from pydfirram.modules.windows import Windows
from pydfirram.core.renderer import Renderer
from loguru import logger
from .config import DUMP_FILE
Expand All @@ -17,6 +18,12 @@ def generic_instance() -> Generic:
dumpfile = Path(DUMP_FILE)
return Generic(os, dumpfile)

@pytest.fixture
def windows_instance() -> Windows :
dumpfile = Path(DUMP_FILE)
return Windows(dumpfile)

@pytest.mark.pslist
def test_volatility_pslist(generic_instance: Generic) -> None:
"""
Test the volatility PsList function
Expand All @@ -30,6 +37,8 @@ def test_volatility_pslist(generic_instance: Generic) -> None:
assert len(pslist_content) > 0, "Output list is empty"
logger.success("TEST PASSED!")

@pytest.mark.pslist
@pytest.mark.pslist_pid
def test_volatilty_pslist_with_args_pid(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>pslist</cyan></b> with args from volatility is running")
output : Renderer = generic_instance.pslist(pid=[4])
Expand All @@ -39,12 +48,14 @@ def test_volatilty_pslist_with_args_pid(generic_instance : Generic) -> None :
assert len(pslist_content) == 1
logger.success("TEST PASSED !")

@pytest.mark.banners
def test_volatility_banners(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>banners</cyan></b> from volatility is running")
output : Renderer = generic_instance.banners(pid=[4])
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.cmdline
def test_volatility_cmdline(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>cmdline</cyan></b> from volatility is running")
output : Renderer = generic_instance.cmdline()
Expand All @@ -54,6 +65,7 @@ def test_volatility_cmdline(generic_instance : Generic) -> None :
assert isinstance(cmdline_content,list),"Not a list"
assert len(cmdline_content) > 0

@pytest.mark.dlllist
def test_volatility_dlllist(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>dlllist</cyan></b> from volatility is running")
output : Renderer = generic_instance.dlllist()
Expand All @@ -63,60 +75,70 @@ def test_volatility_dlllist(generic_instance : Generic) -> None :
assert len(dllist_content) > 0
logger.success("TEST PASSED !")

@pytest.mark.bigpools
def test_bigpools(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>bigpools</cyan></b> from volatility is running")
output : Renderer = generic_instance.bigpools()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.callbacks
def test_callbacks(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>callbacks</cyan></b> from volatility is running")
output : Renderer = generic_instance.callbacks()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.certificates
def test_certificates(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>certificate</cyan></b> from volatility is running")
output : Renderer = generic_instance.certificates()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.configwriter
def test_configwriter(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>configwriter</cyan></b> from volatility is running")
output : Renderer = generic_instance.configwriter()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.crashinfo
def test_crashinfo(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>crashinfo</cyan></b> from volatility is running")
with pytest.raises(Exception):
generic_instance.crashinfo()
logger.success("TEST PASSED !")

@pytest.mark.devicetree
def test_devicetree(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>devicetree</cyan></b> from volatility is running")
output : Renderer = generic_instance.devicetree()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.driverirp
def test_driverirp(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>driverirp</cyan></b> from volatility is running")
output : Renderer = generic_instance.driverirp()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.drivermodules
def test_drivermodule(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>drivermodule</cyan></b> from volatility is running")
output : Renderer = generic_instance.drivermodule()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.driverscan
def test_driverscan(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>driverscan</cyan></b> from volatility is running")
output : Renderer = generic_instance.driverscan()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.envars
def test_envars(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>envars</cyan></b> from volatility is running")
output : Renderer = generic_instance.envars()
Expand All @@ -126,30 +148,35 @@ def test_envars(generic_instance : Generic) -> None :
assert len(envars_content) > 0
logger.success("TEST PASSED !")

@pytest.mark.hivelist
def test_hivelist(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>hivelist</cyan></b> from volatility is running")
output : Renderer = generic_instance.hivelist()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.hivescan
def test_hivescan(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>hivescan</cyan></b> from volatility is running")
output : Renderer = generic_instance.hivescan()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.iat
def test_iat(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>iat</cyan></b> from volatility is running")
output : Renderer = generic_instance.iat()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.info
def test_info(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>info</cyan></b> from volatility is running")
output : Renderer = generic_instance.info()
assert isinstance(output, Renderer), "Error during function execution"
logger.success("TEST PASSED !")

@pytest.mark.pstree
def test_pstree(generic_instance : Generic) -> None :
logger.opt(colors=True).info("<b><cyan>pstree</cyan></b> from volatility is running")
output : Renderer = generic_instance.pstree()
Expand All @@ -158,3 +185,75 @@ def test_pstree(generic_instance : Generic) -> None :
assert isinstance(cmdline_content,list),"Not a list"
assert len(cmdline_content) > 0
logger.success("TEST PASSED !")

@pytest.mark.dumpfile_pid
@pytest.mark.dumpfiles
def test_dumpfile_with_args_pid(windows_instance : Windows):
current_directory = Path.cwd()
initial_files = set(current_directory.glob("file.*"))
new_files = set()

try:
result = windows_instance.dumpfiles(pid=4)
assert result is None, "The dumpfile method should return a non-null result"
new_files = set(current_directory.glob("file.*")) - initial_files
assert len(new_files) >= 1, f"Expected exactly one new file starting with 'file.', but found {len(new_files)}"
logger.opt(colors=True).info(f"number of file dumped {len(new_files)}")
except Exception as e:
pytest.fail(f"An exception should not be raised: {e}")
finally:
for new_file in new_files:
try:
new_file.unlink()
except Exception as cleanup_error:
print(f"Failed to delete {new_file}: {cleanup_error}")

@pytest.mark.dumpfile_physaddr
@pytest.mark.dumpfiles
def test_dumpfile_with_args_physaddr(windows_instance : Windows):
current_directory = Path.cwd()
initial_files = set(current_directory.glob("file.*"))
new_files = set()

try:
result = windows_instance.dumpfiles(physaddr=533517296)
assert result is None, "The dumpfile method should return a non-null result"
# Check if new files starting with 'file.' are created
new_files = set(current_directory.glob("file.*")) - initial_files
assert len(new_files) == 1, f"Expected exactly one new file starting with 'file.', but found {len(new_files)}"
except Exception as e:
pytest.fail(f"An exception should not be raised: {e}")

finally:
# Clean up any new files created during the test
for new_file in new_files:
try:
new_file.unlink()
except Exception as cleanup_error:
print(f"Failed to delete {new_file}: {cleanup_error}")

#Not able to test virtaddr locally
@pytest.mark.dumpfiles
@pytest.mark.dumpfile_virtaddr
def test_dumpfile_with_args_virtaddr(windows_instance : Windows):
current_directory = Path.cwd()
initial_files = set(current_directory.glob("file.*"))
new_files = set()
value = 2274855800
try:
result = windows_instance.dumpfiles(virtaddr=value)
# Check if new files starting with 'file.' with the value in hex are created
file_created = "file." + hex(value)
new_files = set(current_directory.glob(file_created)) - initial_files
assert len(new_files) == 1, f"Expected exactly one new file starting with 'file.', but found {len(new_files)}"

except Exception as e:
pytest.fail(f"An exception should not be raised: {e}")

finally:
# Clean up any new files created during the test
for new_file in new_files:
try:
new_file.unlink()
except Exception as cleanup_error:
print(f"Failed to delete {new_file}: {cleanup_error}")

0 comments on commit 40aa52a

Please sign in to comment.