diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 6dadbc0..6ca97da 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Optional, cast import numpy as np from xarray import Dataset @@ -24,7 +24,9 @@ } -def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: +def dataset_to_icechunk( + ds: Dataset, store: "IcechunkStore", append_dim: Optional[str] = None +) -> None: """ Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. @@ -51,7 +53,10 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: # TODO only supports writing to the root group currently # TODO pass zarr_format kwarg? - root_group = Group.from_store(store=store) + if store.mode.str == "a": + root_group = Group.open(store=store, zarr_format=3) + else: + root_group = Group.from_store(store=store) # TODO this is Frozen, the API for setting attributes must be something else # root_group.attrs = ds.attrs @@ -63,6 +68,7 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: ds.attrs, store=store, group=root_group, + append_dim=append_dim, ) @@ -71,6 +77,7 @@ def write_variables_to_icechunk_group( attrs, store, group, + append_dim: Optional[str] = None, ): virtual_variables = { name: var @@ -96,6 +103,7 @@ def write_variables_to_icechunk_group( group=group, name=name, var=var, + append_dim=append_dim, ) @@ -104,6 +112,7 @@ def write_variable_to_icechunk( group: "Group", name: str, var: Variable, + append_dim: Optional[str] = None, ) -> None: """Write a single (possibly virtual) variable into an icechunk store""" if isinstance(var.data, ManifestArray): @@ -112,6 +121,7 @@ def write_variable_to_icechunk( group=group, name=name, var=var, + append_dim=append_dim, ) else: raise ValueError( @@ -124,15 +134,37 @@ def write_virtual_variable_to_icechunk( group: "Group", name: str, var: Variable, + append_dim: Optional[str] = None, ) -> None: """Write a single virtual variable into an icechunk store""" ma = cast(ManifestArray, var.data) zarray = ma.zarray + shape = zarray.shape + mode = store.mode.str + + # Aimee: resize the array if it already exists + # TODO: assert chunking and encoding is the same + existing_keys = tuple(group.array_keys()) + append_axis, existing_num_chunks = None, None + if name in existing_keys and mode == "a": + # resize + dims = var.dims + if append_dim in dims: + append_axis = dims.index(append_dim) + existing_array = group[name] + existing_size = existing_array.shape[append_axis] + existing_num_chunks = int( + existing_size / existing_array.chunks[append_axis] + ) + new_shape = list(existing_array.shape) + new_shape[append_axis] += var.shape[append_axis] + shape = tuple(new_shape) + existing_array.resize(new_shape) # creates array if it doesn't already exist arr = group.require_array( name=name, - shape=zarray.shape, + shape=shape, chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), codecs=zarray._v3_codec_pipeline(), @@ -142,6 +174,7 @@ def write_virtual_variable_to_icechunk( ) # TODO it would be nice if we could assign directly to the .attrs property + # Aimee: assert that new attributes are the same as existing attributes for k, v in var.attrs.items(): arr.attrs[k] = encode_zarr_attr_value(v) arr.attrs["_ARRAY_DIMENSIONS"] = encode_zarr_attr_value(var.dims) @@ -156,6 +189,8 @@ def write_virtual_variable_to_icechunk( group=group, arr_name=name, manifest=ma.manifest, + append_axis=append_axis, + existing_num_chunks=existing_num_chunks, ) @@ -164,6 +199,8 @@ def write_manifest_virtual_refs( group: "Group", arr_name: str, manifest: ChunkManifest, + append_axis: Optional[int] = None, + existing_num_chunks: Optional[int] = None, ) -> None: """Write all the virtual references for one array manifest at once.""" @@ -181,8 +218,14 @@ def write_manifest_virtual_refs( ], op_flags=[["readonly"]] * 3, # type: ignore ) + for path, offset, length in it: index = it.multi_index + if append_axis is not None: + list_index = list(index) + # Offset by the number of existing chunks on the append axis + list_index[append_axis] += existing_num_chunks + index = tuple(list_index) chunk_key = "/".join(str(i) for i in index) # set each reference individually