diff --git a/climetlab/loaders/__init__.py b/climetlab/loaders/__init__.py index 2e9614e2..6b0e44b8 100644 --- a/climetlab/loaders/__init__.py +++ b/climetlab/loaders/__init__.py @@ -27,6 +27,20 @@ LOG = logging.getLogger(__name__) +def get_packages_versions(): + dic = {} + + import climetlab + + dic["climetlab"] = climetlab.__version__ + + import earthkit.meteo + + dic["earthkit.meteo"] = earthkit.meteo.__version__ + + return dic + + def _tidy(o): if isinstance(o, dict): return {k: _tidy(v) for k, v in o.items()} @@ -150,10 +164,13 @@ def __init__(self, *, loader, parts, **kwargs): i_chunk, n_chunks = int(i_chunk), int(n_chunks) total = len(self.loader.registry.get_flags()) + print(self.loader.registry.get_flags()) + print(total) assert i_chunk > 0, f"Chunk number {i_chunk} must be positive." - assert ( - i_chunk <= total - ), f"Chunk number {i_chunk} must be less than {total}+1." + if n_chunks <= total: + warnings.warn( + f"Number of chunks {n_chunks} is larger than the total number of chunks: {total}+1." + ) chunk_size = total / n_chunks parts = [ @@ -164,6 +181,8 @@ def __init__(self, *, loader, parts, **kwargs): parts = [int(_) for _ in parts] print(f"Running parts: {parts}") + if not parts: + warnings.warn(f"Nothing to do for chunk {i_chunk}.") self.parts = parts @@ -284,8 +303,13 @@ class ZarrBuiltRegistry: z = None def __init__(self, path): + import zarr + assert isinstance(path, str), path self.zarr_path = path + self.synchronizer = zarr.ProcessSynchronizer( + os.path.join(self.zarr_path, "registry.sync") + ) def get_slice_for(self, iloop): lengths = self.get_lengths() @@ -314,12 +338,12 @@ def set_flag(self, iloop, value=True): def _open_read(self): import zarr - return zarr.open(self.zarr_path, mode="r") + return zarr.open(self.zarr_path, mode="r", synchronizer=self.synchronizer) def _open_write(self): import zarr - return zarr.open(self.zarr_path, mode="r+") + return zarr.open(self.zarr_path, mode="r+", synchronizer=self.synchronizer) def create(self, lengths, overwrite=False): z = self._open_write() @@ -399,34 +423,47 @@ def initialise(self): """Create empty zarr from self.main_config and self.path""" import zarr - def get_one_element_config(): + from climetlab.utils.dates import to_datetime # avoid circular imports + + def get_first_and_last_element_configs(): + first = None for i, vars in enumerate(self.iter_loops()): keys = list(vars.keys()) assert len(vars) == 1, keys key = keys[0] - vars = {key: vars[key][0]} - return self.main_config.substitute(vars) - - config = get_one_element_config() - - cube, grid_points = self.config_to_data_cube(config, with_gridpoints=True) - - shape = list(cube.extended_user_shape) + if first is None: + first = self.main_config.substitute({key: vars[key][0]}) + last = self.main_config.substitute({key: vars[key][-1]}) + return first, last + + first, last = get_first_and_last_element_configs() + + first_cube, grid_points = self.config_to_data_cube(first, with_gridpoints=True) + last_cube, grid_points_ = self.config_to_data_cube(last, with_gridpoints=True) + for _ in zip(grid_points, grid_points_): + assert (_[0] == _[1]).all(), (grid_points_, grid_points) + + shape = list(first_cube.extended_user_shape) + assert shape == list(last_cube.extended_user_shape), ( + shape, + list(last_cube.extended_user_shape), + ) # Notice that shape[0] can be >1 # we are assuming that all data has the same shape one_element_length = shape[0] lengths = self._compute_lengths(one_element_length) - shape[0] = sum(lengths) + total_shape = [sum(lengths), *shape[1:]] - chunks = cube.chunking(self.main_config.output.chunking) + chunks = first_cube.chunking(self.main_config.output.chunking) dtype = self.main_config.output.dtype self.print( - f"Creating ZARR '{self.path}', with {shape=}, " f"{chunks=} and {dtype=}" + f"Creating ZARR '{self.path}', with {total_shape=}, " + f"{chunks=} and {dtype=}" ) self.z = zarr.open(self.path, mode="w") - self.z.create_dataset("data", shape=shape, chunks=chunks, dtype=dtype) + self.z.create_dataset("data", shape=total_shape, chunks=chunks, dtype=dtype) lat = self._add_dataset("latitude", grid_points[0]) lon = self._add_dataset("longitude", grid_points[1]) @@ -434,6 +471,23 @@ def get_one_element_config(): metadata = {} metadata["create_yaml_config"] = _tidy(self.main_config) + metadata["creation_timestamp"] = datetime.datetime.utcnow().isoformat() + + first_date = to_datetime(first_cube.user_coords["valid_datetime"][0]) + last_date = to_datetime(last_cube.user_coords["valid_datetime"][-1]) + metadata["start_date"] = first_date.isoformat() + metadata["end_date"] = last_date.isoformat() + + frequency = ( + (last_date - first_date).total_seconds() / 3600 / (total_shape[0] - 1) + ) + assert int(frequency) == frequency, frequency + frequency = int(frequency) + metadata["frequency"] = frequency + + metadata["resolution"] = first_cube.source[0].resolution + + metadata["versions"] = get_packages_versions() metadata["name_to_index"] = { name: i for i, name in enumerate( @@ -444,8 +498,6 @@ def get_one_element_config(): self.z.attrs[k] = v metadatastr = yaml.dump(metadata, sort_keys=False) - # metadatastr = json.dumps(metadata, sort_keys=False) - self.z.attrs["climetlab"] = metadata self.z.attrs["_climetlab"] = metadatastr self.z["data"].attrs["climetlab"] = metadata diff --git a/climetlab/readers/grib/codes.py b/climetlab/readers/grib/codes.py index 974b25de..dc799176 100644 --- a/climetlab/readers/grib/codes.py +++ b/climetlab/readers/grib/codes.py @@ -418,6 +418,21 @@ def field_metadata(self): m["shape"] = self.shape return m + @property + def resolution(self): + grid_type = self["gridType"] + + if grid_type == "reduced_gg": + return self["gridName"] + + if grid_type == "regular_ll": + x = self["DxInDegrees"] + y = self["DyInDegrees"] + assert x == y, (x, y) + return x + + raise ValueError(f"Unknown gridType={grid_type}") + def datetime(self): date = self.handle.get("date") time = self.handle.get("time") diff --git a/climetlab/readers/grib/output.py b/climetlab/readers/grib/output.py index 915578c9..9cb341be 100644 --- a/climetlab/readers/grib/output.py +++ b/climetlab/readers/grib/output.py @@ -21,12 +21,30 @@ ACCUMULATIONS = {("tp", 2): {"productDefinitionTemplateNumber": 8}} +_ORDER = ( + "edition", + "setLocalDefinition", + "typeOfGeneratingProcess", + "productDefinitionTemplateNumber", +) + +ORDER = {} +for i, k in enumerate(_ORDER): + ORDER[k] = i + + +def order(key): + ORDER.setdefault(key, len(ORDER)) + return ORDER[key] + + class Combined: def __init__(self, handle, metadata): self.handle = handle self.metadata = metadata def __contains__(self, key): + # return key in self.metadata or key in self.handle raise NotImplementedError() def __getitem__(self, key): @@ -89,13 +107,9 @@ def write( else: handle = template.handle.clone() + # print("->", metadata) self.update_metadata(handle, metadata, compulsary) - - other = {} - - for k, v in list(metadata.items()): - if not isinstance(v, (int, float, str)): - other[k] = metadata.pop(k) + # print("<-", metadata) if check_nans: import numpy as np @@ -107,11 +121,13 @@ def write( metadata["missingValue"] = missing_value metadata["bitmapPresent"] = 1 - LOG.debug("GribOutput.metadata %s, other %s", metadata, other) + metadata = { + k: v for k, v in sorted(metadata.items(), key=lambda x: order(x[0])) + } - handle.set_multiple(metadata) + LOG.debug("GribOutput.metadata %s", metadata) - for k, v in other.items(): + for k, v in metadata.items(): handle.set(k, v) handle.set_values(values) @@ -166,10 +182,9 @@ def update_metadata(self, handle, metadata, compulsary): if "number" in metadata: compulsary += ("numberOfForecastsInEnsemble",) productDefinitionTemplateNumber = {"tp": 11} - metadata.setdefault( - "productDefinitionTemplateNumber", - productDefinitionTemplateNumber.get(handle.get("shortName"), 1), - ) + metadata[ + "productDefinitionTemplateNumber" + ] = productDefinitionTemplateNumber.get(handle.get("shortName"), 1) if metadata.get("type") in ("pf", "cf"): metadata.setdefault("typeOfGeneratingProcess", 4) diff --git a/climetlab/scripts/create.py b/climetlab/scripts/create.py index ada6418a..b5f0b1aa 100644 --- a/climetlab/scripts/create.py +++ b/climetlab/scripts/create.py @@ -8,7 +8,9 @@ # import os +from contextlib import contextmanager +from climetlab import settings from climetlab.loaders import HDF5Loader, ZarrLoader from climetlab.utils.humanize import list_to_human @@ -24,49 +26,58 @@ class LoadersCmd: # " (default from config or 'dataset')" # ), # ), - format=( - "--format", - dict( - help="The format of the target storage into which to load the data" - " (default is inferred from target path extension)" - " only .zarr is currently supported." - ), - ), - config=( - "--config", - dict( - help="A yaml file that describes which data to use as input" - " and how to organise them in the target" - ), - ), path=( "--target", dict( - help="Where to store the data. " - "Currently only a path to a new ZARR or HDF5 file is supported." + help="Where to store the final data. " + "Currently only a path to a new ZARR is supported." ), ), init=( "--init", - dict(action="store_true", help="Initialise zarr"), + dict(action="store_true", help="Initialise zarr."), ), load=( "--load", - dict(action="store_true", help="Load data into zarr"), - ), - parts=( - "--parts", - dict(nargs="+", help="Use with --load. Part(s) of the data to process"), + dict(action="store_true", help="Load data into zarr."), ), statistics=( "--statistics", dict(action="store_true", help="Compute statistics."), ), + config=( + "--config", + dict( + help="Use with --init. A yaml file that describes which data to use as input" + " and how to organise them in the target." + ), + ), + parts=( + "--parts", + dict(nargs="+", help="Use with --load. Part(s) of the data to process."), + ), + cache_dir=( + "--cache-dir", + dict( + help="Use with --load. Location of cache directory for temporary data." + ), + ), + format=( + "--format", + dict( + help="The format of the target storage into which to load the data" + " (default is inferred from target path extension)" + " only .zarr is currently supported." + ), + ), ) def do_create(self, args): - if args.format is None: + format = args.format + + if format is None: _, ext = os.path.splitext(args.path) - args.format = ext[1:] + format = ext[1:] + assert format == "zarr", f"Unsupported format={format}" def no_callback(*args, **kwargs): print(*args, **kwargs) @@ -100,38 +111,49 @@ def callback(*msg): hdf5=HDF5Loader, hdf=HDF5Loader, ) - if args.format not in LOADERS: + if format not in LOADERS: lst = list_to_human(list(LOADERS.keys()), "or") - raise ValueError(f"Invalid format '{args.format}', must be one of {lst}.") + raise ValueError(f"Invalid format '{format}', must be one of {lst}.") kwargs = vars(args) kwargs["print"] = callback - loader_class = LOADERS[args.format] + loader_class = LOADERS[format] lst = [args.load, args.statistics, args.init] if sum(1 for x in lst if x) != 1: raise ValueError( "Too many options provided." - 'Must choose exactly one option in "--load", "--statistics", "--config"' + 'Must choose exactly one option in "--load", "--statistics", "--init"' ) if args.parts: assert args.load, "Use --parts only with --load" - if args.init: - assert args.config, "--init requires --config" - assert args.path, "--init requires --target" - loader = loader_class.from_config(**kwargs) - loader.initialise() - exit() - - if args.load: - assert args.config is None, "--load requires only a zarr target, no config." - loader = loader_class.from_zarr(**kwargs) - loader.load(**kwargs) - - if args.statistics: - assert ( - args.config is None - ), "--statistics requires only a zarr target, no config." - loader = loader_class.from_zarr(**kwargs) - loader.add_statistics() + @contextmanager + def dummy_context(): + yield + + context = dummy_context() + if kwargs["cache_dir"]: + context = settings.temporary("cache-directory", kwargs["cache_dir"]) + + with context: + if args.init: + assert args.config, "--init requires --config" + assert args.path, "--init requires --target" + loader = loader_class.from_config(**kwargs) + loader.initialise() + exit() + + if args.load: + assert ( + args.config is None + ), "--load requires only a --target, no --config." + loader = loader_class.from_zarr(**kwargs) + loader.load(**kwargs) + + if args.statistics: + assert ( + args.config is None + ), "--statistics requires only --target, no --config." + loader = loader_class.from_zarr(**kwargs) + loader.add_statistics() diff --git a/climetlab/sources/loader.py b/climetlab/sources/loader.py index 27ba3715..2c59a892 100644 --- a/climetlab/sources/loader.py +++ b/climetlab/sources/loader.py @@ -21,6 +21,7 @@ def execute(self, v, data, last, inherit): if not isinstance(v, list): v = [v] for one in v: + one = dict(**one) name = one.pop("name") if inherit: last.update(one) @@ -42,11 +43,6 @@ def load(self, *args, **kwargs): return load_dataset(*args, **kwargs) -class Inherit: - def execute(self, *args, **kwargs): - pass - - class LoadConstants(LoadSource): def execute(self, v, data, last, inherit): super().execute( @@ -62,7 +58,6 @@ def execute(self, v, data, last, inherit): ACTIONS = { - "inherit": Inherit, "source": LoadSource, "dataset": LoadDataset, "constants": LoadConstants, @@ -91,6 +86,8 @@ def __init__(self, config, **kwargs): config = config["input"] config = instanciate_values(config, kwargs) + assert isinstance(config, (list, tuple)), config + self.config = config def mutate(self): @@ -102,9 +99,15 @@ def mutate(self): data = [] inherit = False last = {} - for k, v in self.config.items(): + for input in self.config: + assert len(input) == 1, input + assert isinstance(input, dict), input + + k = list(input.keys())[0] + v = input[k] if k == "inherit": inherit = v + continue ACTIONS[k]().execute(v, data, last, inherit) diff --git a/climetlab/version b/climetlab/version index c5523bd0..7cca7711 100644 --- a/climetlab/version +++ b/climetlab/version @@ -1 +1 @@ -0.17.0 +0.17.1 diff --git a/tests/test_output.py b/tests/test_output.py index 82b55acf..4cbae086 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -174,7 +174,7 @@ def test_tp(): if __name__ == "__main__": - test_tp() + test_mars_labeling() # from climetlab.testing import main # main(__file__)