From d96cf469c3beca0ac28df23d2f96ec831d169069 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Sat, 26 Oct 2024 16:42:03 -0400 Subject: [PATCH] Add first pass at grib zarr 3 codec --- kerchunk/codecs.py | 87 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 3 deletions(-) diff --git a/kerchunk/codecs.py b/kerchunk/codecs.py index 852076e..4804423 100644 --- a/kerchunk/codecs.py +++ b/kerchunk/codecs.py @@ -1,11 +1,22 @@ import ast +from dataclasses import dataclass import io +from typing import TYPE_CHECKING import numcodecs from numcodecs.abc import Codec import numpy as np import threading import zlib +from zarr.abc.codec import ArrayBytesCodec +from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer +from zarr.core.common import JSON, parse_enum, parse_named_configuration +from zarr.registry import register_codec + +if TYPE_CHECKING: + from typing import Self + + from zarr.core.array_spec import ArraySpec class FillStringsCodec(Codec): @@ -115,6 +126,78 @@ def decode(self, buf, out=None): numcodecs.register_codec(GRIBCodec, "grib") +@dataclass(frozen=True) +class GRIBZarrCodec(ArrayBytesCodec): + eclock = threading.RLock() + + var: str + dtype: np.dtype + + def __init__(self, *, var: str, dtype: np.dtype) -> None: + object.__setattr__(self, "var", var) + object.__setattr__(self, "dtype", dtype) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration( + data, "bytes", require_configuration=True + ) + configuration_parsed = configuration_parsed or {} + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> dict[str, JSON]: + if self.endian is None: + return {"name": "grib"} + else: + return { + "name": "grib", + "configuration": {"var": self.var, "dtype": self.dtype}, + } + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + assert isinstance(chunk_bytes, Buffer) + import eccodes + + if self.var in ["latitude", "longitude"]: + var = self.var + "s" + dt = self.dtype or "float64" + else: + var = "values" + dt = self.dtype or "float32" + + with self.eclock: + mid = eccodes.codes_new_from_message(chunk_bytes.to_bytes()) + try: + data = eccodes.codes_get_array(mid, var) + missingValue = eccodes.codes_get_string(mid, "missingValue") + if var == "values" and missingValue: + data[data == float(missingValue)] = np.nan + return data.astype(dt, copy=False) + + finally: + eccodes.codes_release(mid) + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + # This is a one way codec + raise NotImplementedError + + def compute_encoded_size( + self, input_byte_length: int, _chunk_spec: ArraySpec + ) -> int: + raise NotImplementedError + + +register_codec("grib", GRIBZarrCodec) + + class AsciiTableCodec(numcodecs.abc.Codec): """Decodes ASCII-TABLE extensions in FITS files""" @@ -166,7 +249,6 @@ def decode(self, buf, out=None): arr2 = np.empty((self.nrow,), dtype=dt_out) heap = buf[arr.nbytes :] for name in dt_out.names: - if dt_out[name] == "O": dt = np.dtype(self.ftypes[self.types[name]]) counts = arr[name][:, 0] @@ -244,8 +326,7 @@ def encode(self, buf): class ZlibCodec(Codec): codec_id = "zlib" - def __init__(self): - ... + def __init__(self): ... def decode(self, data, out=None): if out: