Skip to content

Commit

Permalink
Add migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Sep 17, 2024
1 parent f77b61f commit 378376d
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 89 deletions.
2 changes: 1 addition & 1 deletion src/ert/config/ert_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __post_init__(self) -> None:

def write_observations_to_folder(self, dest: Path) -> None:
for name, dataset in self.observations.items():
dataset.to_netcdf(dest / f"{name}", engine="scipy")
dataset.write_parquet(dest / name)

@staticmethod
def with_plugins(
Expand Down
104 changes: 104 additions & 0 deletions src/ert/storage/migration/to7.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import dataclasses
import json
import os
from pathlib import Path
from typing import List, Optional

import polars
import xarray as xr

info = "Standardize response configs"
Expand Down Expand Up @@ -133,6 +135,108 @@ def _migrate_response_datasets(path: Path) -> None:
os.remove(p)


@dataclasses.dataclass
class ObservationDatasetInfo:
polars_df: polars.DataFrame
response_type: str
original_ds_path: Path

@classmethod
def from_path(cls, path: Path):
observation_key = os.path.basename(path)
ds = xr.open_dataset(path, engine="scipy")
response_key = ds.attrs["response"]
response_type = "summary" if response_key == "summary" else "gen_data"

df = polars.from_pandas(ds.to_dataframe().dropna().reset_index())
df = df.with_columns(observation_key=polars.lit(observation_key))

primary_key = (
["time"] if response_type == "summary" else ["report_step", "index"]
)
if response_type == "summary":
df = df.rename({"name": "response_key"})
df = df.with_columns(polars.col("time").dt.cast_time_unit("ms"))

if response_type == "gen_data":
df = df.with_columns(
polars.col("report_step").cast(polars.UInt16),
polars.col("index").cast(polars.UInt16),
response_key=polars.lit(response_key),
)

df = df.with_columns(
[
polars.col("std").cast(polars.Float32),
polars.col("observations").cast(polars.Float32),
]
)

df = df[
["response_key", "observation_key", *primary_key, "observations", "std"]
]

return cls(df, response_type, path)


def _migrate_observations_to_grouped_parquet(path: Path):
for experiment in path.glob("experiments/*"):
_obs_keys = os.listdir(os.path.join(experiment, "observations"))

if len(set(_obs_keys) - {"summary", "gen_data"}) == 0:
# Observations are already migrated, likely from .to4 migrations
continue

obs_ds_infos = [
ObservationDatasetInfo.from_path(experiment / "observations" / p)
for p in _obs_keys
]

for response_type in ["gen_data", "summary"]:
infos = [
_info for _info in obs_ds_infos if _info.response_type == response_type
]
if len(infos) > 0:
concatd_df = polars.concat([_info.polars_df for _info in infos])
concatd_df.write_parquet(experiment / "observations" / response_type)

for _info in infos:
os.remove(_info.original_ds_path)


def _migrate_responses_from_netcdf_to_parquet(path: Path):
for experiment in path.glob("experiments/*"):
ensembles = path.glob("ensembles/*")

experiment_id = None
with open(experiment / "index.json", encoding="utf-8") as f:
exp_index = json.load(f)
experiment_id = exp_index["id"]

for ens in ensembles:
with open(ens / "index.json", encoding="utf-8") as f:
ens_file = json.load(f)
if ens_file["experiment_id"] != experiment_id:
continue

real_dirs = [*ens.glob("realization-*")]

for real_dir in real_dirs:
for ds_name in ["gen_data", "summary"]:
if (real_dir / f"{ds_name}.nc").exists():
gen_data_ds = xr.open_dataset(
real_dir / f"{ds_name}.nc", engine="scipy"
)

pandas_df = gen_data_ds.to_dataframe().dropna()
polars_df = polars.from_pandas(pandas_df.reset_index())
polars_df.write_parquet(real_dir / f"{ds_name}.parquet")

os.remove(real_dir / f"{ds_name}.nc")


def migrate(path: Path) -> None:
_migrate_response_datasets(path)
_migrate_response_configs(path)
_migrate_responses_from_netcdf_to_parquet(path)
_migrate_observations_to_grouped_parquet(path)

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
,response_key,observation_key,report_step,index,observations,std
0,GEN,GEN,1,0,0.0,0.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
,response_key,observation_key,time,observations,std
0,RWPR,FWPR,1996-01-02,0.1,0.05
170 changes: 86 additions & 84 deletions tests/unit_tests/storage/test_storage_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,32 @@ def copy_shared(tmp_path, block_storage_path):
"5.0.0",
],
)
def test_that_storage_matches(
def test_that_storage_works_with_missing_parameters_and_responses(
tmp_path,
block_storage_path,
snapshot,
monkeypatch,
ert_version,
):
storage_path = tmp_path / "all_data_types" / f"storage-{ert_version}"
shutil.copytree(
block_storage_path / f"all_data_types/storage-{ert_version}",
tmp_path / "all_data_types" / f"storage-{ert_version}",
storage_path,
)
[ensemble_id] = os.listdir(storage_path / "ensembles")

ensemble_path = storage_path / "ensembles" / ensemble_id

# Remove all realization-*/TOP.nc, and only some realization-*/BPC.nc
for i, real_dir in enumerate(
(storage_path / "ensembles" / ensemble_id).glob("realization-*")
):
os.remove(real_dir / "TOP.nc")
if i % 2 == 0:
os.remove(real_dir / "BPR.nc")

os.remove(real_dir / "GEN.nc")

monkeypatch.chdir(tmp_path / "all_data_types")
ert_config = ErtConfig.with_plugins().from_file("config.ert")
local_storage_set_ert_config(ert_config)
Expand All @@ -114,67 +129,16 @@ def test_that_storage_matches(
experiment = experiments[0]
ensembles = list(experiment.ensembles)
assert len(ensembles) == 1
ensemble = ensembles[0]

response_config = experiment.response_configuration
response_config["summary"].refcase = {}

with open(
experiment._path / experiment._responses_file, "w", encoding="utf-8"
) as f:
json.dump(
{k: v.to_dict() for k, v in response_config.items()},
f,
default=str,
indent=2,
)
ens_dir_contents = set(os.listdir(ensemble_path))
assert {
"index.json",
}.issubset(ens_dir_contents)

# We need to normalize some irrelevant details:
experiment.parameter_configuration["PORO"].mask_file = ""
if version.parse(ert_version).major == 5:
# In this version we were not saving the full parameter
# configuration, so it had to be recreated by what was
# in ErtConfig at the time of migration, hence the new
# path
experiment.parameter_configuration[
"BPR"
].template_file = experiment.parameter_configuration[
"BPR"
].template_file.replace(
str(tmp_path), "/home/eivind/Projects/ert/test-data"
)
snapshot.assert_match(
str(dict(sorted(experiment.parameter_configuration.items()))) + "\n",
"parameters",
)
snapshot.assert_match(
str(
{
k: experiment.response_configuration[k]
for k in sorted(experiment.response_configuration.keys())
}
)
+ "\n",
"responses",
)
assert "TOP.nc" not in ens_dir_contents

summary_data = ensemble.load_responses(
"summary",
tuple(ensemble.get_realization_list_with_responses("summary")),
)
snapshot.assert_match(
summary_data.to_dataframe(dim_order=["time", "name", "realization"])
.transform(np.sort)
.to_csv(),
"summary_data",
)
snapshot.assert_match_dir(
{
key: value.to_dataframe().to_csv()
for key, value in experiment.observations.items()
},
"observations",
)
with pytest.raises(KeyError):
ensembles[0].load_responses("GEN", (0,))


@pytest.mark.integration_test
Expand Down Expand Up @@ -244,32 +208,17 @@ def test_that_storage_matches(
"5.0.0",
],
)
def test_that_storage_works_with_missing_parameters_and_responses(
def test_that_storage_matches(
tmp_path,
block_storage_path,
snapshot,
monkeypatch,
ert_version,
):
storage_path = tmp_path / "all_data_types" / f"storage-{ert_version}"
shutil.copytree(
block_storage_path / f"all_data_types/storage-{ert_version}",
storage_path,
tmp_path / "all_data_types" / f"storage-{ert_version}",
)
[ensemble_id] = os.listdir(storage_path / "ensembles")

ensemble_path = storage_path / "ensembles" / ensemble_id

# Remove all realization-*/TOP.nc, and only some realization-*/BPC.nc
for i, real_dir in enumerate(
(storage_path / "ensembles" / ensemble_id).glob("realization-*")
):
os.remove(real_dir / "TOP.nc")
if i % 2 == 0:
os.remove(real_dir / "BPR.nc")

os.remove(real_dir / "GEN.nc")

monkeypatch.chdir(tmp_path / "all_data_types")
ert_config = ErtConfig.with_plugins().from_file("config.ert")
local_storage_set_ert_config(ert_config)
Expand All @@ -281,13 +230,66 @@ def test_that_storage_works_with_missing_parameters_and_responses(
experiment = experiments[0]
ensembles = list(experiment.ensembles)
assert len(ensembles) == 1
ensemble = ensembles[0]

ens_dir_contents = set(os.listdir(ensemble_path))
assert {
"index.json",
}.issubset(ens_dir_contents)
response_config = experiment.response_configuration
response_config["summary"].refcase = {}

assert "TOP.nc" not in ens_dir_contents
with open(
experiment._path / experiment._responses_file, "w", encoding="utf-8"
) as f:
json.dump(
{k: v.to_dict() for k, v in response_config.items()},
f,
default=str,
indent=2,
)

with pytest.raises(KeyError):
ensembles[0].load_responses("GEN", (0,))
# We need to normalize some irrelevant details:
experiment.parameter_configuration["PORO"].mask_file = ""
if version.parse(ert_version).major == 5:
# In this version we were not saving the full parameter
# configuration, so it had to be recreated by what was
# in ErtConfig at the time of migration, hence the new
# path
experiment.parameter_configuration[
"BPR"
].template_file = experiment.parameter_configuration[
"BPR"
].template_file.replace(
str(tmp_path), "/home/eivind/Projects/ert/test-data"
)
snapshot.assert_match(
str(dict(sorted(experiment.parameter_configuration.items()))) + "\n",
"parameters",
)
snapshot.assert_match(
str(
{
k: experiment.response_configuration[k]
for k in sorted(experiment.response_configuration.keys())
}
)
+ "\n",
"responses",
)

summary_data = ensemble.load_responses(
"summary",
tuple(ensemble.get_realization_list_with_responses("summary")),
)
snapshot.assert_match(
summary_data.sort("time", "name", "realization")
.to_pandas()
.set_index(["time", "name", "realization"])
.transform(np.sort)
.to_csv(),
"summary_data",
)
snapshot.assert_match_dir(
{
key: value.to_pandas().to_csv()
for key, value in experiment.observations.items()
},
"observations",
)

0 comments on commit 378376d

Please sign in to comment.