Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rechunk method for uncompressed arrays #199

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies:
- pytest
- pooch
- scipy
- dask
- fsspec
- s3fs
- fastparquet
Expand Down
11 changes: 11 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ def netcdf4_files(tmpdir):
return filepath1, filepath2


@pytest.fixture
def netcdf3_file(tmpdir):
# Set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature")

# Save it to disk as netCDF (in temporary directory)
filepath = f"{tmpdir}/air.nc"
ds.to_netcdf(filepath, format="NETCDF3_CLASSIC")
ds.close()


@pytest.fixture
def hdf5_empty(tmpdir):
filepath = f"{tmpdir}/empty.nc"
Expand Down
59 changes: 58 additions & 1 deletion virtualizarr/manifests/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ..types.kerchunk import KerchunkArrRefs
from ..zarr import ZArray
from .array_api import MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, _isnan
from .manifest import ChunkManifest
from .manifest import ChunkManifest, subchunk


class ManifestArray:
Expand Down Expand Up @@ -229,6 +229,63 @@ def __getitem__(
else:
raise NotImplementedError(f"Doesn't support slicing with {indexer}")

def rechunk(self: "ManifestArray", chunks: tuple[int, ...]) -> "ManifestArray":
"""
Split the chunks in the manifest into smaller chunks.

Only valid for manifestarrays pointing to uncompressed data files.

Parameters
----------
chunks: tuple[int, ...]
New chunks to split the current chunks in the manifest into.
Will set the `.chunks` attribute of the returned ManifestArray.
Each element must be an integer divisor of the corresponding existing chunk length.
Each element must be an integer divisor of the corresponding array length (i.e. regular-length chunks).

Returns
-------
manifestarray: ManifestArray
New ManifestArray but with the chunks replaced by the requested smaller chunks.

See Also
--------
kerchunk.utils.subchunk
"""
if self.zarray.compressor is not None:
raise ValueError(
f"Cannot rechunk a ManifestArray which points to compressed data on disk, but compressor={self.zarray.compressor}"
)

# perform basic validation checks at the start
new_zarray = self.zarray.replace(chunks=chunks)

if any(new_len > old_len for new_len, old_len in zip(chunks, self.chunks)):
# TODO we could theoretically handle this case if we checked that adjacent entries in the manifest had the same filepath and contiguous byte ranges
# but it's a lot of effort to implement something that doesn't have a clear use case.
raise NotImplementedError(
f"Can only rechunk into smaller chunks, but existing chunk shape = {self.chunks} and requested new chunk shape = {chunks}"
)

# Check if integer divisor of array shape & chunk shape
# (If its an integer divisor of old chunk shape it must be divisor of old array shape)
if any(old_len % new_len != 0 for new_len, old_len in zip(chunks, self.chunks)):
raise ValueError(
f"New chunk lengths must be integer divisor of old chunk lengths, but existing chunk shape = {self.chunks} and requested new chunk shape = {chunks}"
)

# Find subchunks, i.e. pattern of how many new chunks fit into one old chunk
subchunks: tuple[int, ...] = tuple(
old_len // new_len for new_len, old_len in zip(chunks, self.chunks)
)

rechunked_manifest = subchunk(self.manifest, subchunks=subchunks)

return ManifestArray(
chunkmanifest=rechunked_manifest,
zarray=new_zarray,
)

def rename_paths(
self,
new: str | Callable[[str], str],
Expand Down
74 changes: 74 additions & 0 deletions virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,77 @@ def get_chunk_grid_shape(chunk_keys: Iterable[ChunkKey]) -> tuple[int, ...]:
max(indices_along_one_dim) + 1 for indices_along_one_dim in zipped_indices
)
return chunk_grid_shape


def subchunk(manifest: ChunkManifest, subchunks: tuple[int, ...]) -> ChunkManifest:
"""
Split every chunk in the manifest into an array of smaller subchunks.

Only valid for manifests pointing to uncompressed data files.
"""

original_paths = manifest._paths
original_offsets = manifest._offsets
original_lengths = manifest._lengths
original_shape = original_paths.shape

# Split each chunk's byte range up according to new chunking scheme
def generate_subchunk_offsets(
offset_element: int, length_element: int, subchunks: tuple[int, ...]
) -> np.ndarray[Any, np.dtype[np.uint64]]:
return np.linspace(
start=offset_element,
stop=offset_element + length_element,
num=np.prod(subchunks),
endpoint=False,
dtype=np.dtype("uint64"),
).reshape(subchunks)

def generate_subchunk_lengths(
length_element: int, subchunks: tuple[int, ...]
) -> np.ndarray[Any, np.dtype[np.uint64]]:
# Note: Once Zarr supports non-variable-length chunking we can no longer assume all chunk byte ranges are same length
new_length = length_element / np.prod(subchunks)
return (
np.repeat(
new_length,
repeats=np.prod(subchunks),
)
.reshape(subchunks)
.astype(np.dtype("uint64"))
)

def generate_subchunk_paths(
path_element: str, subchunks: tuple[int, ...]
) -> np.ndarray[Any, np.dtypes.StringDType]: # type: ignore[name-defined]
"""Paths are just repeated as each subchunk is pointing to the same file"""
return (
np.repeat(path_element, repeats=np.prod(subchunks))
.reshape(subchunks)
.astype(np.dtypes.StringDType) # type: ignore[attr-defined]
)

# Replace every chunk in the manifest with the new sub-array of chunks
# Create a nested list of subarrays using np.ndindex for general N-dimensional support
paths_subarrays = np.empty(original_shape, dtype=object)
offsets_subarrays = np.empty(original_shape, dtype=object)
lengths_subarrays = np.empty(original_shape, dtype=object)
for idx in np.ndindex(original_shape):
paths_subarrays[idx] = generate_subchunk_paths(original_paths[idx], subchunks)
offsets_subarrays[idx] = generate_subchunk_offsets(
original_offsets[idx], original_lengths[idx], subchunks
)
lengths_subarrays[idx] = generate_subchunk_lengths(
original_lengths[idx], subchunks
)

# Use np.block to assemble the final rechunked arrays
new_paths_array = np.block(paths_subarrays.tolist())
new_offsets_array = np.block(offsets_subarrays.tolist())
new_lengths_array = np.block(lengths_subarrays.tolist())

return ChunkManifest.from_arrays(
paths=new_paths_array,
offsets=new_offsets_array,
lengths=new_lengths_array,
)
1 change: 1 addition & 0 deletions virtualizarr/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def _importorskip(


has_astropy, requires_astropy = _importorskip("astropy")
has_dask, requires_dask = _importorskip("dask")
has_s3fs, requires_s3fs = _importorskip("s3fs")
has_tifffile, requires_tifffile = _importorskip("tifffile")

Expand Down
33 changes: 31 additions & 2 deletions virtualizarr/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import xarray.testing as xrt

from virtualizarr import open_virtual_dataset
from virtualizarr.manifests.array import ManifestArray
from virtualizarr.manifests.manifest import ChunkManifest
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.tests import requires_dask
from virtualizarr.zarr import ZArray


Expand Down Expand Up @@ -222,3 +222,32 @@ def test_open_scalar_variable(tmpdir):

vds = open_virtual_dataset(f"{tmpdir}/scalar.nc", indexes={})
assert vds["a"].shape == ()


@requires_dask
def test_rechunk_via_manifest_vs_xarray(netcdf3_file, tmpdir):
# start with uncompressed netCDF3 file on disk

new_chunks = {"time": 1460}

# open and split into two time chunks using virtualizarr
vds = open_virtual_dataset(netcdf3_file, indexes={}).chunk(**new_chunks)

# check we have not accidentally created a dask array
assert isinstance(vds["air"].data, ManifestArray)

refs_path = f"{tmpdir}/refs.json"
vds.virtualize.to_kerchunk(refs_path, format="json")

# open chunked references
roundtrip_ds = xr.open_dataset(
refs_path,
engine="kerchunk",
chunks={},
)

# open original file using xarray and dask, and rechunk
expected_ds = xr.open_dataset(netcdf3_file, chunks={}).chunk(**new_chunks)

# assert both approaches equivalent
xrt.assert_identical(roundtrip_ds, expected_ds)
145 changes: 145 additions & 0 deletions virtualizarr/tests/test_manifests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,148 @@ def test_refuse_combine():
for func in [np.concatenate, np.stack]:
with pytest.raises(ValueError, match="inconsistent dtypes"):
func([marr1, marr2], axis=0)


class TestRechunk:
@pytest.mark.parametrize(
"shape, chunks_dict, new_chunks, expected_chunks_dict",
[
(
(2, 3),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 120},
},
(1, 3),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 60},
"1.0": {"path": "foo.nc", "offset": 160, "length": 60},
},
),
(
(2, 3),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 120},
},
(2, 1),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 40},
"0.1": {"path": "foo.nc", "offset": 140, "length": 40},
"0.2": {"path": "foo.nc", "offset": 180, "length": 40},
},
),
(
(2, 3),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 120},
},
(1, 1),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 20},
"0.1": {"path": "foo.nc", "offset": 120, "length": 20},
"0.2": {"path": "foo.nc", "offset": 140, "length": 20},
"1.0": {"path": "foo.nc", "offset": 160, "length": 20},
"1.1": {"path": "foo.nc", "offset": 180, "length": 20},
"1.2": {"path": "foo.nc", "offset": 200, "length": 20},
},
),
(
(4, 3),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 60},
"1.0": {"path": "foo.nc", "offset": 160, "length": 60},
},
(1, 3),
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 30},
"1.0": {"path": "foo.nc", "offset": 130, "length": 30},
"2.0": {"path": "foo.nc", "offset": 160, "length": 30},
"3.0": {"path": "foo.nc", "offset": 190, "length": 30},
},
),
],
)
def test_rechunk(self, shape, chunks_dict, new_chunks, expected_chunks_dict):
zarray = ZArray(
chunks=(2, 3),
compressor=None,
dtype=np.dtype("int32"),
fill_value=0.0,
filters=None,
order="C",
shape=shape,
zarr_format=2,
)

manifest = ChunkManifest(entries=chunks_dict)
marr = ManifestArray(zarray=zarray, chunkmanifest=manifest)

rechunked = marr.rechunk(chunks=new_chunks)

expected_zarray = zarray.replace(chunks=new_chunks)

assert rechunked.manifest.dict() == expected_chunks_dict
assert rechunked.zarray == expected_zarray

def test_rechunk_compressed(self):
zarray = ZArray(
chunks=(2, 3),
compressor="zlib",
dtype=np.dtype("int32"),
fill_value=0.0,
filters=None,
order="C",
shape=(2, 3),
zarr_format=2,
)

chunks_dict = {
"0.0": {"path": "foo.nc", "offset": 100, "length": 120},
}
manifest = ChunkManifest(entries=chunks_dict)
marr = ManifestArray(zarray=zarray, chunkmanifest=manifest)

with pytest.raises(ValueError, match="compressed"):
marr.rechunk(chunks=(1, 3))

def test_rechunk_non_integer_subchunks(self):
zarray = ZArray(
chunks=(2, 3),
compressor=None,
dtype=np.dtype("int32"),
fill_value=0.0,
filters=None,
order="C",
shape=(2, 3),
zarr_format=2,
)

chunks_dict = {
"0.0": {"path": "foo.nc", "offset": 100, "length": 120},
}
manifest = ChunkManifest(entries=chunks_dict)
marr = ManifestArray(zarray=zarray, chunkmanifest=manifest)

with pytest.raises(ValueError, match="integer divisor"):
marr.rechunk(chunks=(1, 2))

def test_rechunk_larger_chunks(self):
zarray = ZArray(
chunks=(1, 3),
compressor=None,
dtype=np.dtype("int32"),
fill_value=0.0,
filters=None,
order="C",
shape=(2, 3),
zarr_format=2,
)

chunks_dict = {
"0.0": {"path": "foo.nc", "offset": 100, "length": 60},
"1.0": {"path": "foo.nc", "offset": 160, "length": 60},
}
manifest = ChunkManifest(entries=chunks_dict)
marr = ManifestArray(zarray=zarray, chunkmanifest=manifest)

with pytest.raises(NotImplementedError, match="smaller chunks"):
marr.rechunk(chunks=(2, 3))
10 changes: 10 additions & 0 deletions virtualizarr/tests/test_xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ def test_combine_by_coords_keeping_manifestarrays(self, netcdf4_files):
assert isinstance(combined_vds["lon"].data, ManifestArray)


def test_rechunk(netcdf3_file):
# start with uncompressed netCDF3 file on disk

# open and split into two time chunks using virtualizarr
vds = open_virtual_dataset(netcdf3_file, indexes={}).chunk(time=1460)

# check we have not accidentally created a dask array
assert isinstance(vds["air"].data, ManifestArray)


class TestRenamePaths:
def test_rename_to_str(self, netcdf4_file):
vds = open_virtual_dataset(netcdf4_file, indexes={})
Expand Down
Loading