From 378376dde56965b45f8b7f29413c7c75958041a0 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 17 Sep 2024 09:08:49 +0200 Subject: [PATCH] Add migrations --- src/ert/config/ert_config.py | 2 +- src/ert/storage/migration/to7.py | 104 +++++++++++ .../observations/FWPR | 2 - .../observations/GEN | 2 - .../observations/gen_data | 2 + .../observations/summary | 2 + .../storage/test_storage_migration.py | 170 +++++++++--------- 7 files changed, 195 insertions(+), 89 deletions(-) delete mode 100644 tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/FWPR delete mode 100644 tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/GEN create mode 100644 tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/gen_data create mode 100644 tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/summary diff --git a/src/ert/config/ert_config.py b/src/ert/config/ert_config.py index 7dd7f15b245..23ac82a6595 100644 --- a/src/ert/config/ert_config.py +++ b/src/ert/config/ert_config.py @@ -124,7 +124,7 @@ def __post_init__(self) -> None: def write_observations_to_folder(self, dest: Path) -> None: for name, dataset in self.observations.items(): - dataset.to_netcdf(dest / f"{name}", engine="scipy") + dataset.write_parquet(dest / name) @staticmethod def with_plugins( diff --git a/src/ert/storage/migration/to7.py b/src/ert/storage/migration/to7.py index 122648c1b04..0d66e113bd7 100644 --- a/src/ert/storage/migration/to7.py +++ b/src/ert/storage/migration/to7.py @@ -1,8 +1,10 @@ +import dataclasses import json import os from pathlib import Path from typing import List, Optional +import polars import xarray as xr info = "Standardize response configs" @@ -133,6 +135,108 @@ def _migrate_response_datasets(path: Path) -> None: os.remove(p) +@dataclasses.dataclass +class ObservationDatasetInfo: + polars_df: polars.DataFrame + response_type: str + original_ds_path: Path + + @classmethod + def from_path(cls, path: Path): + observation_key = os.path.basename(path) + ds = xr.open_dataset(path, engine="scipy") + response_key = ds.attrs["response"] + response_type = "summary" if response_key == "summary" else "gen_data" + + df = polars.from_pandas(ds.to_dataframe().dropna().reset_index()) + df = df.with_columns(observation_key=polars.lit(observation_key)) + + primary_key = ( + ["time"] if response_type == "summary" else ["report_step", "index"] + ) + if response_type == "summary": + df = df.rename({"name": "response_key"}) + df = df.with_columns(polars.col("time").dt.cast_time_unit("ms")) + + if response_type == "gen_data": + df = df.with_columns( + polars.col("report_step").cast(polars.UInt16), + polars.col("index").cast(polars.UInt16), + response_key=polars.lit(response_key), + ) + + df = df.with_columns( + [ + polars.col("std").cast(polars.Float32), + polars.col("observations").cast(polars.Float32), + ] + ) + + df = df[ + ["response_key", "observation_key", *primary_key, "observations", "std"] + ] + + return cls(df, response_type, path) + + +def _migrate_observations_to_grouped_parquet(path: Path): + for experiment in path.glob("experiments/*"): + _obs_keys = os.listdir(os.path.join(experiment, "observations")) + + if len(set(_obs_keys) - {"summary", "gen_data"}) == 0: + # Observations are already migrated, likely from .to4 migrations + continue + + obs_ds_infos = [ + ObservationDatasetInfo.from_path(experiment / "observations" / p) + for p in _obs_keys + ] + + for response_type in ["gen_data", "summary"]: + infos = [ + _info for _info in obs_ds_infos if _info.response_type == response_type + ] + if len(infos) > 0: + concatd_df = polars.concat([_info.polars_df for _info in infos]) + concatd_df.write_parquet(experiment / "observations" / response_type) + + for _info in infos: + os.remove(_info.original_ds_path) + + +def _migrate_responses_from_netcdf_to_parquet(path: Path): + for experiment in path.glob("experiments/*"): + ensembles = path.glob("ensembles/*") + + experiment_id = None + with open(experiment / "index.json", encoding="utf-8") as f: + exp_index = json.load(f) + experiment_id = exp_index["id"] + + for ens in ensembles: + with open(ens / "index.json", encoding="utf-8") as f: + ens_file = json.load(f) + if ens_file["experiment_id"] != experiment_id: + continue + + real_dirs = [*ens.glob("realization-*")] + + for real_dir in real_dirs: + for ds_name in ["gen_data", "summary"]: + if (real_dir / f"{ds_name}.nc").exists(): + gen_data_ds = xr.open_dataset( + real_dir / f"{ds_name}.nc", engine="scipy" + ) + + pandas_df = gen_data_ds.to_dataframe().dropna() + polars_df = polars.from_pandas(pandas_df.reset_index()) + polars_df.write_parquet(real_dir / f"{ds_name}.parquet") + + os.remove(real_dir / f"{ds_name}.nc") + + def migrate(path: Path) -> None: _migrate_response_datasets(path) _migrate_response_configs(path) + _migrate_responses_from_netcdf_to_parquet(path) + _migrate_observations_to_grouped_parquet(path) diff --git a/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/FWPR b/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/FWPR deleted file mode 100644 index 9ab94b51f6d..00000000000 --- a/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/FWPR +++ /dev/null @@ -1,2 +0,0 @@ -name,time,observations,std -RWPR,1996-01-02,0.1,0.05 diff --git a/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/GEN b/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/GEN deleted file mode 100644 index 3dfa01048df..00000000000 --- a/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/GEN +++ /dev/null @@ -1,2 +0,0 @@ -report_step,index,observations,std -1,0,0.0,0.1 diff --git a/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/gen_data b/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/gen_data new file mode 100644 index 00000000000..2ed3a72563b --- /dev/null +++ b/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/gen_data @@ -0,0 +1,2 @@ +,response_key,observation_key,report_step,index,observations,std +0,GEN,GEN,1,0,0.0,0.1 diff --git a/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/summary b/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/summary new file mode 100644 index 00000000000..18d0db311c2 --- /dev/null +++ b/tests/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/observations/summary @@ -0,0 +1,2 @@ +,response_key,observation_key,time,observations,std +0,RWPR,FWPR,1996-01-02,0.1,0.05 diff --git a/tests/unit_tests/storage/test_storage_migration.py b/tests/unit_tests/storage/test_storage_migration.py index 456164e6f7a..8fcd79138f7 100644 --- a/tests/unit_tests/storage/test_storage_migration.py +++ b/tests/unit_tests/storage/test_storage_migration.py @@ -92,17 +92,32 @@ def copy_shared(tmp_path, block_storage_path): "5.0.0", ], ) -def test_that_storage_matches( +def test_that_storage_works_with_missing_parameters_and_responses( tmp_path, block_storage_path, snapshot, monkeypatch, ert_version, ): + storage_path = tmp_path / "all_data_types" / f"storage-{ert_version}" shutil.copytree( block_storage_path / f"all_data_types/storage-{ert_version}", - tmp_path / "all_data_types" / f"storage-{ert_version}", + storage_path, ) + [ensemble_id] = os.listdir(storage_path / "ensembles") + + ensemble_path = storage_path / "ensembles" / ensemble_id + + # Remove all realization-*/TOP.nc, and only some realization-*/BPC.nc + for i, real_dir in enumerate( + (storage_path / "ensembles" / ensemble_id).glob("realization-*") + ): + os.remove(real_dir / "TOP.nc") + if i % 2 == 0: + os.remove(real_dir / "BPR.nc") + + os.remove(real_dir / "GEN.nc") + monkeypatch.chdir(tmp_path / "all_data_types") ert_config = ErtConfig.with_plugins().from_file("config.ert") local_storage_set_ert_config(ert_config) @@ -114,67 +129,16 @@ def test_that_storage_matches( experiment = experiments[0] ensembles = list(experiment.ensembles) assert len(ensembles) == 1 - ensemble = ensembles[0] - response_config = experiment.response_configuration - response_config["summary"].refcase = {} - - with open( - experiment._path / experiment._responses_file, "w", encoding="utf-8" - ) as f: - json.dump( - {k: v.to_dict() for k, v in response_config.items()}, - f, - default=str, - indent=2, - ) + ens_dir_contents = set(os.listdir(ensemble_path)) + assert { + "index.json", + }.issubset(ens_dir_contents) - # We need to normalize some irrelevant details: - experiment.parameter_configuration["PORO"].mask_file = "" - if version.parse(ert_version).major == 5: - # In this version we were not saving the full parameter - # configuration, so it had to be recreated by what was - # in ErtConfig at the time of migration, hence the new - # path - experiment.parameter_configuration[ - "BPR" - ].template_file = experiment.parameter_configuration[ - "BPR" - ].template_file.replace( - str(tmp_path), "/home/eivind/Projects/ert/test-data" - ) - snapshot.assert_match( - str(dict(sorted(experiment.parameter_configuration.items()))) + "\n", - "parameters", - ) - snapshot.assert_match( - str( - { - k: experiment.response_configuration[k] - for k in sorted(experiment.response_configuration.keys()) - } - ) - + "\n", - "responses", - ) + assert "TOP.nc" not in ens_dir_contents - summary_data = ensemble.load_responses( - "summary", - tuple(ensemble.get_realization_list_with_responses("summary")), - ) - snapshot.assert_match( - summary_data.to_dataframe(dim_order=["time", "name", "realization"]) - .transform(np.sort) - .to_csv(), - "summary_data", - ) - snapshot.assert_match_dir( - { - key: value.to_dataframe().to_csv() - for key, value in experiment.observations.items() - }, - "observations", - ) + with pytest.raises(KeyError): + ensembles[0].load_responses("GEN", (0,)) @pytest.mark.integration_test @@ -244,32 +208,17 @@ def test_that_storage_matches( "5.0.0", ], ) -def test_that_storage_works_with_missing_parameters_and_responses( +def test_that_storage_matches( tmp_path, block_storage_path, snapshot, monkeypatch, ert_version, ): - storage_path = tmp_path / "all_data_types" / f"storage-{ert_version}" shutil.copytree( block_storage_path / f"all_data_types/storage-{ert_version}", - storage_path, + tmp_path / "all_data_types" / f"storage-{ert_version}", ) - [ensemble_id] = os.listdir(storage_path / "ensembles") - - ensemble_path = storage_path / "ensembles" / ensemble_id - - # Remove all realization-*/TOP.nc, and only some realization-*/BPC.nc - for i, real_dir in enumerate( - (storage_path / "ensembles" / ensemble_id).glob("realization-*") - ): - os.remove(real_dir / "TOP.nc") - if i % 2 == 0: - os.remove(real_dir / "BPR.nc") - - os.remove(real_dir / "GEN.nc") - monkeypatch.chdir(tmp_path / "all_data_types") ert_config = ErtConfig.with_plugins().from_file("config.ert") local_storage_set_ert_config(ert_config) @@ -281,13 +230,66 @@ def test_that_storage_works_with_missing_parameters_and_responses( experiment = experiments[0] ensembles = list(experiment.ensembles) assert len(ensembles) == 1 + ensemble = ensembles[0] - ens_dir_contents = set(os.listdir(ensemble_path)) - assert { - "index.json", - }.issubset(ens_dir_contents) + response_config = experiment.response_configuration + response_config["summary"].refcase = {} - assert "TOP.nc" not in ens_dir_contents + with open( + experiment._path / experiment._responses_file, "w", encoding="utf-8" + ) as f: + json.dump( + {k: v.to_dict() for k, v in response_config.items()}, + f, + default=str, + indent=2, + ) - with pytest.raises(KeyError): - ensembles[0].load_responses("GEN", (0,)) + # We need to normalize some irrelevant details: + experiment.parameter_configuration["PORO"].mask_file = "" + if version.parse(ert_version).major == 5: + # In this version we were not saving the full parameter + # configuration, so it had to be recreated by what was + # in ErtConfig at the time of migration, hence the new + # path + experiment.parameter_configuration[ + "BPR" + ].template_file = experiment.parameter_configuration[ + "BPR" + ].template_file.replace( + str(tmp_path), "/home/eivind/Projects/ert/test-data" + ) + snapshot.assert_match( + str(dict(sorted(experiment.parameter_configuration.items()))) + "\n", + "parameters", + ) + snapshot.assert_match( + str( + { + k: experiment.response_configuration[k] + for k in sorted(experiment.response_configuration.keys()) + } + ) + + "\n", + "responses", + ) + + summary_data = ensemble.load_responses( + "summary", + tuple(ensemble.get_realization_list_with_responses("summary")), + ) + snapshot.assert_match( + summary_data.sort("time", "name", "realization") + .to_pandas() + .set_index(["time", "name", "realization"]) + .transform(np.sort) + .to_csv(), + "summary_data", + ) + snapshot.assert_match_dir( + { + key: value.to_pandas().to_csv() + for key, value in experiment.observations.items() + }, + "observations", + )