Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Calculate hashes in memory #846

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 67 additions & 53 deletions src/fmu/dataio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import os
import uuid
from io import BufferedIOBase, BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Final, Literal
Expand Down Expand Up @@ -85,20 +86,26 @@ def export_metadata_file(

def export_file(
obj: types.Inferrable,
filename: Path,
file: Path | BytesIO,
file_suffix: str | None = None,
fmt: str = "",
) -> str:
) -> None:
"""
Export a valid object to file. If xtgeo is in the fmt string, xtgeo
xyz-column names will be preserved for xtgeo.Points and xtgeo.Polygons
Export a valid object to file or memory buffer. If xtgeo is in the fmt string,
xtgeo xyz-column names will be preserved for xtgeo.Points and xtgeo.Polygons
"""

# create output folder if not existing
filename.parent.mkdir(parents=True, exist_ok=True)
if isinstance(file, Path):
# create output folder if not existing
file.parent.mkdir(parents=True, exist_ok=True)
file_suffix = file.suffix

if filename.suffix == ".gri" and isinstance(obj, xtgeo.RegularSurface):
obj.to_file(filename, fformat="irap_binary")
elif filename.suffix == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
elif not file_suffix:
raise ValueError("'suffix' must be provided when file is a BytesIO object")

if file_suffix == ".gri" and isinstance(obj, xtgeo.RegularSurface):
obj.to_file(file, fformat="irap_binary")
elif file_suffix == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
out = obj.copy() # to not modify incoming instance!
if "xtgeo" not in fmt:
out.xname = "X"
Expand All @@ -109,72 +116,79 @@ def export_file(
out.get_dataframe(copy=False).rename(
columns={out.pname: "ID"}, inplace=True
)
out.get_dataframe(copy=False).to_csv(filename, index=False)
elif filename.suffix == ".pol" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
obj.to_file(filename)
elif filename.suffix == ".segy" and isinstance(obj, xtgeo.Cube):
obj.to_file(filename, fformat="segy")
elif filename.suffix == ".roff" and isinstance(
obj, (xtgeo.Grid, xtgeo.GridProperty)
):
obj.to_file(filename, fformat="roff")
elif filename.suffix == ".csv" and isinstance(obj, pd.DataFrame):
out.get_dataframe(copy=False).to_csv(file, index=False)
elif file_suffix == ".pol" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
obj.to_file(file)
elif file_suffix == ".segy" and isinstance(obj, xtgeo.Cube):
obj.to_file(file, fformat="segy")
elif file_suffix == ".roff" and isinstance(obj, (xtgeo.Grid, xtgeo.GridProperty)):
obj.to_file(file, fformat="roff")
elif file_suffix == ".csv" and isinstance(obj, pd.DataFrame):
logger.info(
"Exporting dataframe to csv. Note: index columns will not be "
"preserved unless calling 'reset_index()' on the dataframe."
)
obj.to_csv(filename, index=False)
elif filename.suffix == ".parquet":
obj.to_csv(file, index=False)
elif file_suffix == ".parquet":
from pyarrow import Table

if isinstance(obj, Table):
from pyarrow import parquet
from pyarrow import output_stream, parquet

parquet.write_table(obj, where=output_stream(file))

parquet.write_table(obj, where=str(filename))
elif file_suffix == ".json":
if isinstance(obj, FaultRoomSurface):
serialized_json = json.dumps(obj.storage, indent=4)
else:
serialized_json = json.dumps(obj)

if isinstance(file, Path):
with open(file, "w") as stream:
stream.write(serialized_json)
else:
file.write(serialized_json.encode("utf-8"))

elif filename.suffix == ".json" and isinstance(obj, FaultRoomSurface):
with open(filename, "w") as stream:
json.dump(obj.storage, stream, indent=4)
elif filename.suffix == ".json":
with open(filename, "w") as stream:
json.dump(obj, stream)
else:
raise TypeError(f"Exporting {filename.suffix} for {type(obj)} is not supported")
raise TypeError(f"Exporting {file_suffix} for {type(obj)} is not supported")

return str(filename)

def md5sum(file: Path | BytesIO) -> str:
if isinstance(file, Path):
with open(file, "rb") as stream:
return md5sum_stream(stream)
return md5sum_stream(file)


def md5sum_stream(stream: BufferedIOBase) -> str:
"""Calculate the MD5 checksum of a stream."""
stream.seek(0)

def md5sum(fname: Path) -> str:
"""Calculate the MD5 checksum of a file."""
hash_md5 = hashlib.md5()
with open(fname, "rb") as fil:
for chunk in iter(lambda: fil.read(4096), b""):
hash_md5.update(chunk)
while True:
chunk = stream.read(4096)
if not chunk:
break
hash_md5.update(chunk)
return hash_md5.hexdigest()


def export_file_compute_checksum_md5(
obj: types.Inferrable,
filename: Path,
fmt: str = "",
) -> str:
"""Export and compute checksum"""
export_file(obj, filename, fmt=fmt)
return md5sum(filename)
def compute_md5(obj: types.Inferrable, file_suffix: str, fmt: str = "") -> str:
"""Compute an MD5 sum for an object."""
memory_stream = BytesIO()
export_file(obj, memory_stream, file_suffix, fmt=fmt)
return md5sum(memory_stream)


def compute_md5_using_temp_file(
obj: types.Inferrable, extension: str, fmt: str = ""
obj: types.Inferrable, file_suffix: str, fmt: str = ""
) -> str:
"""Compute an MD5 sum using a temporary file."""
if not extension.startswith("."):
raise ValueError("An extension must start with '.'")

with NamedTemporaryFile(buffering=0, suffix=extension) as tf:
logger.info("Compute MD5 sum for tmp file...: %s", tf.name)
return export_file_compute_checksum_md5(
obj=obj, filename=Path(tf.name), fmt=fmt
)
with NamedTemporaryFile(buffering=0, suffix=file_suffix) as tf:
logger.info("Compute MD5 sum for tmp file")
tempfile = Path(tf.name)
export_file(obj=obj, file=tempfile, fmt=fmt)
return md5sum(tempfile)


def create_symlink(source: str, target: str) -> None:
Expand Down
11 changes: 10 additions & 1 deletion src/fmu/dataio/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,20 @@ def _set_metadata(

objdata = objectdata_provider_factory(obj=obj, dataio=etemp)

try:
checksum_md5 = _utils.compute_md5(obj, objdata.extension)
except Exception as e:
logger.debug(
f"Exception {e} occured when trying to compute md5 from memory stream "
f"for an object of type {type(obj)}. Will use tempfile instead."
)
checksum_md5 = _utils.compute_md5_using_temp_file(obj, objdata.extension)

template["tracklog"] = [fields.Tracklog.initialize()[0]]
template["file"] = {
"relative_path": str(relpath),
"absolute_path": str(abspath) if abspath else None,
"checksum_md5": _utils.compute_md5_using_temp_file(obj, objdata.extension),
"checksum_md5": checksum_md5,
}

# data section
Expand Down
3 changes: 2 additions & 1 deletion src/fmu/dataio/dataio.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,8 @@ def _export_without_metadata(self, obj: types.Inferrable) -> str:
).get_metadata()

assert filemeta.absolute_path is not None # for mypy
return export_file(obj, filename=filemeta.absolute_path, fmt=objdata.fmt)
export_file(obj, file=filemeta.absolute_path, fmt=objdata.fmt)
return str(filemeta.absolute_path)

# ==================================================================================
# Public methods:
Expand Down
23 changes: 15 additions & 8 deletions src/fmu/dataio/providers/_filedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

from fmu.dataio._logging import null_logger
from fmu.dataio._model import enums, fields
from fmu.dataio._utils import (
compute_md5_using_temp_file,
)
from fmu.dataio._utils import compute_md5, compute_md5_using_temp_file

from ._base import Provider

Expand Down Expand Up @@ -108,11 +106,20 @@ def _get_share_folders(self) -> Path:

def _compute_md5(self) -> str:
"""Compute an MD5 sum using a temporary file."""
if self.obj is None:
raise ValueError("Can't compute MD5 sum without an object.")
return compute_md5_using_temp_file(
self.obj, self.objdata.extension, fmt=self.objdata.fmt
)
try:
return compute_md5(
obj=self.obj,
file_suffix=self.objdata.extension,
fmt=self.objdata.fmt,
)
except Exception as e:
logger.debug(
f"Exception {e} occured when trying to compute md5 from memory stream "
f"for an object of type {type(self.obj)}. Will use tempfile instead."
)
return compute_md5_using_temp_file(
self.obj, self.objdata.extension, fmt=self.objdata.fmt
)

def _add_filename_to_path(self, path: Path) -> Path:
stem = self._get_filestem()
Expand Down
Loading