From 97626b0a69355db9dae96b01d2fd96b1c8cd453f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Therese=20Natter=C3=B8y?= <61694854+tnatt@users.noreply.github.com> Date: Thu, 10 Oct 2024 09:49:04 +0200 Subject: [PATCH] ENH: Calculate hashes in memory if possible --- src/fmu/dataio/_utils.py | 120 ++++++++------- src/fmu/dataio/aggregation.py | 11 +- src/fmu/dataio/dataio.py | 3 +- src/fmu/dataio/providers/_filedata.py | 23 ++- tests/test_units/test_checksum_md5.py | 204 ++++++++++++++++++++++++++ 5 files changed, 298 insertions(+), 63 deletions(-) create mode 100644 tests/test_units/test_checksum_md5.py diff --git a/src/fmu/dataio/_utils.py b/src/fmu/dataio/_utils.py index 75c28fb9d..0e8c20f85 100644 --- a/src/fmu/dataio/_utils.py +++ b/src/fmu/dataio/_utils.py @@ -7,6 +7,7 @@ import json import os import uuid +from io import BufferedIOBase, BytesIO from pathlib import Path from tempfile import NamedTemporaryFile from typing import Any, Final, Literal @@ -85,20 +86,26 @@ def export_metadata_file( def export_file( obj: types.Inferrable, - filename: Path, + file: Path | BytesIO, + file_suffix: str | None = None, fmt: str = "", -) -> str: +) -> None: """ - Export a valid object to file. If xtgeo is in the fmt string, xtgeo - xyz-column names will be preserved for xtgeo.Points and xtgeo.Polygons + Export a valid object to file or memory buffer. If xtgeo is in the fmt string, + xtgeo xyz-column names will be preserved for xtgeo.Points and xtgeo.Polygons """ - # create output folder if not existing - filename.parent.mkdir(parents=True, exist_ok=True) + if isinstance(file, Path): + # create output folder if not existing + file.parent.mkdir(parents=True, exist_ok=True) + file_suffix = file.suffix - if filename.suffix == ".gri" and isinstance(obj, xtgeo.RegularSurface): - obj.to_file(filename, fformat="irap_binary") - elif filename.suffix == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)): + elif not file_suffix: + raise ValueError("'suffix' must be provided when file is a BytesIO object") + + if file_suffix == ".gri" and isinstance(obj, xtgeo.RegularSurface): + obj.to_file(file, fformat="irap_binary") + elif file_suffix == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)): out = obj.copy() # to not modify incoming instance! if "xtgeo" not in fmt: out.xname = "X" @@ -109,72 +116,79 @@ def export_file( out.get_dataframe(copy=False).rename( columns={out.pname: "ID"}, inplace=True ) - out.get_dataframe(copy=False).to_csv(filename, index=False) - elif filename.suffix == ".pol" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)): - obj.to_file(filename) - elif filename.suffix == ".segy" and isinstance(obj, xtgeo.Cube): - obj.to_file(filename, fformat="segy") - elif filename.suffix == ".roff" and isinstance( - obj, (xtgeo.Grid, xtgeo.GridProperty) - ): - obj.to_file(filename, fformat="roff") - elif filename.suffix == ".csv" and isinstance(obj, pd.DataFrame): + out.get_dataframe(copy=False).to_csv(file, index=False) + elif file_suffix == ".pol" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)): + obj.to_file(file) + elif file_suffix == ".segy" and isinstance(obj, xtgeo.Cube): + obj.to_file(file, fformat="segy") + elif file_suffix == ".roff" and isinstance(obj, (xtgeo.Grid, xtgeo.GridProperty)): + obj.to_file(file, fformat="roff") + elif file_suffix == ".csv" and isinstance(obj, pd.DataFrame): logger.info( "Exporting dataframe to csv. Note: index columns will not be " "preserved unless calling 'reset_index()' on the dataframe." ) - obj.to_csv(filename, index=False) - elif filename.suffix == ".parquet": + obj.to_csv(file, index=False) + elif file_suffix == ".parquet": from pyarrow import Table if isinstance(obj, Table): - from pyarrow import parquet + from pyarrow import output_stream, parquet + + parquet.write_table(obj, where=output_stream(file)) - parquet.write_table(obj, where=str(filename)) + elif file_suffix == ".json": + if isinstance(obj, FaultRoomSurface): + serialized_json = json.dumps(obj.storage, indent=4) + else: + serialized_json = json.dumps(obj) + + if isinstance(file, Path): + with open(file, "w") as stream: + stream.write(serialized_json) + else: + file.write(serialized_json.encode("utf-8")) - elif filename.suffix == ".json" and isinstance(obj, FaultRoomSurface): - with open(filename, "w") as stream: - json.dump(obj.storage, stream, indent=4) - elif filename.suffix == ".json": - with open(filename, "w") as stream: - json.dump(obj, stream) else: - raise TypeError(f"Exporting {filename.suffix} for {type(obj)} is not supported") + raise TypeError(f"Exporting {file_suffix} for {type(obj)} is not supported") - return str(filename) +def md5sum(file: Path | BytesIO) -> str: + if isinstance(file, Path): + with open(file, "rb") as stream: + return md5sum_stream(stream) + return md5sum_stream(file) + + +def md5sum_stream(stream: BufferedIOBase) -> str: + """Calculate the MD5 checksum of a stream.""" + stream.seek(0) -def md5sum(fname: Path) -> str: - """Calculate the MD5 checksum of a file.""" hash_md5 = hashlib.md5() - with open(fname, "rb") as fil: - for chunk in iter(lambda: fil.read(4096), b""): - hash_md5.update(chunk) + while True: + chunk = stream.read(4096) + if not chunk: + break + hash_md5.update(chunk) return hash_md5.hexdigest() -def export_file_compute_checksum_md5( - obj: types.Inferrable, - filename: Path, - fmt: str = "", -) -> str: - """Export and compute checksum""" - export_file(obj, filename, fmt=fmt) - return md5sum(filename) +def compute_md5(obj: types.Inferrable, file_suffix: str, fmt: str = "") -> str: + """Compute an MD5 sum for an object.""" + memory_stream = BytesIO() + export_file(obj, memory_stream, file_suffix, fmt=fmt) + return md5sum(memory_stream) def compute_md5_using_temp_file( - obj: types.Inferrable, extension: str, fmt: str = "" + obj: types.Inferrable, file_suffix: str, fmt: str = "" ) -> str: """Compute an MD5 sum using a temporary file.""" - if not extension.startswith("."): - raise ValueError("An extension must start with '.'") - - with NamedTemporaryFile(buffering=0, suffix=extension) as tf: - logger.info("Compute MD5 sum for tmp file...: %s", tf.name) - return export_file_compute_checksum_md5( - obj=obj, filename=Path(tf.name), fmt=fmt - ) + with NamedTemporaryFile(buffering=0, suffix=file_suffix) as tf: + logger.info("Compute MD5 sum for tmp file") + tempfile = Path(tf.name) + export_file(obj=obj, file=tempfile, fmt=fmt) + return md5sum(tempfile) def create_symlink(source: str, target: str) -> None: diff --git a/src/fmu/dataio/aggregation.py b/src/fmu/dataio/aggregation.py index 5b668b52d..afc757456 100644 --- a/src/fmu/dataio/aggregation.py +++ b/src/fmu/dataio/aggregation.py @@ -262,11 +262,20 @@ def _set_metadata( objdata = objectdata_provider_factory(obj=obj, dataio=etemp) + try: + checksum_md5 = _utils.compute_md5(obj, objdata.extension) + except Exception as e: + logger.debug( + f"Exception {e} occured when trying to compute md5 from memory stream " + f"for an object of type {type(obj)}. Will use tempfile instead." + ) + checksum_md5 = _utils.compute_md5_using_temp_file(obj, objdata.extension) + template["tracklog"] = [fields.Tracklog.initialize()[0]] template["file"] = { "relative_path": str(relpath), "absolute_path": str(abspath) if abspath else None, - "checksum_md5": _utils.compute_md5_using_temp_file(obj, objdata.extension), + "checksum_md5": checksum_md5, } # data section diff --git a/src/fmu/dataio/dataio.py b/src/fmu/dataio/dataio.py index 29ed01475..cb9aca795 100644 --- a/src/fmu/dataio/dataio.py +++ b/src/fmu/dataio/dataio.py @@ -821,7 +821,8 @@ def _export_without_metadata(self, obj: types.Inferrable) -> str: ).get_metadata() assert filemeta.absolute_path is not None # for mypy - return export_file(obj, filename=filemeta.absolute_path, fmt=objdata.fmt) + export_file(obj, file=filemeta.absolute_path, fmt=objdata.fmt) + return str(filemeta.absolute_path) # ================================================================================== # Public methods: diff --git a/src/fmu/dataio/providers/_filedata.py b/src/fmu/dataio/providers/_filedata.py index ccaaeace7..242cd717b 100644 --- a/src/fmu/dataio/providers/_filedata.py +++ b/src/fmu/dataio/providers/_filedata.py @@ -15,9 +15,7 @@ from fmu.dataio._logging import null_logger from fmu.dataio._model import enums, fields -from fmu.dataio._utils import ( - compute_md5_using_temp_file, -) +from fmu.dataio._utils import compute_md5, compute_md5_using_temp_file from ._base import Provider @@ -108,11 +106,20 @@ def _get_share_folders(self) -> Path: def _compute_md5(self) -> str: """Compute an MD5 sum using a temporary file.""" - if self.obj is None: - raise ValueError("Can't compute MD5 sum without an object.") - return compute_md5_using_temp_file( - self.obj, self.objdata.extension, fmt=self.objdata.fmt - ) + try: + return compute_md5( + obj=self.obj, + file_suffix=self.objdata.extension, + fmt=self.objdata.fmt, + ) + except Exception as e: + logger.debug( + f"Exception {e} occured when trying to compute md5 from memory stream " + f"for an object of type {type(self.obj)}. Will use tempfile instead." + ) + return compute_md5_using_temp_file( + self.obj, self.objdata.extension, fmt=self.objdata.fmt + ) def _add_filename_to_path(self, path: Path) -> Path: stem = self._get_filestem() diff --git a/tests/test_units/test_checksum_md5.py b/tests/test_units/test_checksum_md5.py new file mode 100644 index 000000000..f06fdf27c --- /dev/null +++ b/tests/test_units/test_checksum_md5.py @@ -0,0 +1,204 @@ +from pathlib import Path + +import fmu.dataio.readers as readers +from fmu.dataio._utils import md5sum +from fmu.dataio.dataio import ExportData, read_metadata + + +def test_checksum_md5_for_regsurf(monkeypatch, tmp_path, globalconfig1, regsurf): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an xtgeo.RegularSurface + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(regsurf) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_gridproperty( + monkeypatch, tmp_path, globalconfig1, gridproperty +): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an xtgeo.GridProperty + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(gridproperty) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_grid(monkeypatch, tmp_path, globalconfig1, grid): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an xtgeo.Grid + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(grid) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_points(monkeypatch, tmp_path, globalconfig1, points): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an xtgeo.Points + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(points) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_polygons(monkeypatch, tmp_path, globalconfig1, polygons): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an xtgeo.Polygons + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(polygons) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_cube(monkeypatch, tmp_path, globalconfig1, cube): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an xtgeo.Cube + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(cube) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_dataframe(monkeypatch, tmp_path, globalconfig1, dataframe): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an pandas.DataFrame + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(dataframe) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_arrowtable(monkeypatch, tmp_path, globalconfig1, arrowtable): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for an pyarrow.Table + """ + monkeypatch.chdir(tmp_path) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(arrowtable) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_dictionary(monkeypatch, tmp_path, globalconfig1): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for a dictionary + """ + monkeypatch.chdir(tmp_path) + + mydict = {"test": 3, "test2": "string", "test3": {"test4": 100.89}} + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(mydict) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path) + + +def test_checksum_md5_for_faultroom(monkeypatch, tmp_path, globalconfig1, rootpath): + """ + Test that the MD5 hash in the metadata is equal to one computed for + the exported file for a FaultRoomSurface + """ + monkeypatch.chdir(tmp_path) + + faultroom_files = (rootpath / "tests/data/drogon/rms/output/faultroom").glob( + "*.json" + ) + fault_room_surface = readers.read_faultroom_file(next(faultroom_files)) + + export_path = Path( + ExportData( + config=globalconfig1, + content="depth", + name="myname", + ).export(fault_room_surface) + ) + + meta = read_metadata(export_path) + assert meta["file"]["checksum_md5"] == md5sum(export_path)