Skip to content

Commit

Permalink
Initial attempt at appending
Browse files Browse the repository at this point in the history
  • Loading branch information
abarciauskas-bgse committed Oct 25, 2024
1 parent fffdc2d commit d3a4048
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions virtualizarr/writers/icechunk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Optional, cast

import numpy as np
from xarray import Dataset
Expand All @@ -24,7 +24,9 @@
}


def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None:
def dataset_to_icechunk(
ds: Dataset, store: "IcechunkStore", append_dim: Optional[str] = None
) -> None:
"""
Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store.
Expand All @@ -51,7 +53,10 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None:

# TODO only supports writing to the root group currently
# TODO pass zarr_format kwarg?
root_group = Group.from_store(store=store)
if store.mode.str == "a":
root_group = Group.open(store=store, zarr_format=3)
else:
root_group = Group.from_store(store=store)

# TODO this is Frozen, the API for setting attributes must be something else
# root_group.attrs = ds.attrs
Expand All @@ -63,6 +68,7 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None:
ds.attrs,
store=store,
group=root_group,
append_dim=append_dim,
)


Expand All @@ -71,6 +77,7 @@ def write_variables_to_icechunk_group(
attrs,
store,
group,
append_dim: Optional[str] = None,
):
virtual_variables = {
name: var
Expand All @@ -96,6 +103,7 @@ def write_variables_to_icechunk_group(
group=group,
name=name,
var=var,
append_dim=append_dim,
)


Expand All @@ -104,6 +112,7 @@ def write_variable_to_icechunk(
group: "Group",
name: str,
var: Variable,
append_dim: Optional[str] = None,
) -> None:
"""Write a single (possibly virtual) variable into an icechunk store"""
if isinstance(var.data, ManifestArray):
Expand All @@ -112,6 +121,7 @@ def write_variable_to_icechunk(
group=group,
name=name,
var=var,
append_dim=append_dim,
)
else:
raise ValueError(
Expand All @@ -124,15 +134,37 @@ def write_virtual_variable_to_icechunk(
group: "Group",
name: str,
var: Variable,
append_dim: Optional[str] = None,
) -> None:
"""Write a single virtual variable into an icechunk store"""
ma = cast(ManifestArray, var.data)
zarray = ma.zarray
shape = zarray.shape
mode = store.mode.str

# Aimee: resize the array if it already exists
# TODO: assert chunking and encoding is the same
existing_keys = tuple(group.array_keys())
append_axis, existing_num_chunks = None, None
if name in existing_keys and mode == "a":
# resize
dims = var.dims
if append_dim in dims:
append_axis = dims.index(append_dim)
existing_array = group[name]
existing_size = existing_array.shape[append_axis]
existing_num_chunks = int(
existing_size / existing_array.chunks[append_axis]
)
new_shape = list(existing_array.shape)
new_shape[append_axis] += var.shape[append_axis]
shape = tuple(new_shape)
existing_array.resize(new_shape)

# creates array if it doesn't already exist
arr = group.require_array(
name=name,
shape=zarray.shape,
shape=shape,
chunk_shape=zarray.chunks,
dtype=encode_dtype(zarray.dtype),
codecs=zarray._v3_codec_pipeline(),
Expand All @@ -142,6 +174,7 @@ def write_virtual_variable_to_icechunk(
)

# TODO it would be nice if we could assign directly to the .attrs property
# Aimee: assert that new attributes are the same as existing attributes
for k, v in var.attrs.items():
arr.attrs[k] = encode_zarr_attr_value(v)
arr.attrs["_ARRAY_DIMENSIONS"] = encode_zarr_attr_value(var.dims)
Expand All @@ -156,6 +189,8 @@ def write_virtual_variable_to_icechunk(
group=group,
arr_name=name,
manifest=ma.manifest,
append_axis=append_axis,
existing_num_chunks=existing_num_chunks,
)


Expand All @@ -164,6 +199,8 @@ def write_manifest_virtual_refs(
group: "Group",
arr_name: str,
manifest: ChunkManifest,
append_axis: Optional[int] = None,
existing_num_chunks: Optional[int] = None,
) -> None:
"""Write all the virtual references for one array manifest at once."""

Expand All @@ -181,8 +218,14 @@ def write_manifest_virtual_refs(
],
op_flags=[["readonly"]] * 3, # type: ignore
)

for path, offset, length in it:
index = it.multi_index
if append_axis is not None:
list_index = list(index)
# Offset by the number of existing chunks on the append axis
list_index[append_axis] += existing_num_chunks
index = tuple(list_index)
chunk_key = "/".join(str(i) for i in index)

# set each reference individually
Expand Down

0 comments on commit d3a4048

Please sign in to comment.