Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler Dask array construction, w/ optional batching #462

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions dkist/io/dask_utils.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,57 @@
from functools import partial

import dask.array as da
import dask
import numpy as np

__all__ = ["stack_loader_array"]


def stack_loader_array(loader_array, chunksize):
def stack_loader_array(loader_array, output_shape, chunksize=None):
"""
Stack a loader array along each of its dimensions.
Converts an array of loaders to a dask array that loads a chunk from each loader

This results in a dask array with the correct chunks and dimensions.

Parameters
----------
loader_array : `dkist.io.reference_collections.BaseFITSArrayContainer`
loader_array : `dkist.io.loaders.BaseFITSLoader`
An array of loader objects
output_shape : tuple[int]
The intended shape of the final array
chunksize : tuple[int]
Can be used to set a chunk size. If not provided, each batch is one chunk

Returns
-------
array : `dask.array.Array`
"""
# If the chunksize isn't specified then use the whole array shape
chunksize = chunksize or loader_array.flat[0].shape

if loader_array.size == 1:
return tuple(loader_to_dask(loader_array, chunksize))[0]
if len(loader_array.shape) == 1:
return da.stack(loader_to_dask(loader_array, chunksize))
stacks = []
for i in range(loader_array.shape[0]):
stacks.append(stack_loader_array(loader_array[i], chunksize))
return da.stack(stacks)


def _partial_to_array(loader, *, meta, chunks):
# Set the name of the array to the filename, that should be unique within the array
return da.from_array(loader, meta=meta, chunks=chunks, name=loader.fileuri)


def loader_to_dask(loader_array, chunksize):
"""
Map a call to `dask.array.from_array` onto all the elements in ``loader_array``.

This is done so that an explicit ``meta=`` argument can be provided to
prevent loading data from disk.
"""
if loader_array.size != 1 and len(loader_array.shape) != 1:
raise ValueError("Can only be used on one dimensional arrays")

loader_array = np.atleast_1d(loader_array)

# The meta argument to from array is used to determine properties of the
# array, such as dtype. We explicitly specify it here to prevent dask
# trying to auto calculate it by reading from the actual array on disk.
meta = np.zeros((0,), dtype=loader_array[0].dtype)

to_array = partial(_partial_to_array, meta=meta, chunks=chunksize)

return map(to_array, loader_array)
file_shape = loader_array.flat[0].shape

tasks = {}
for i, loader in enumerate(loader_array.flat):
# The key identifies this chunk's position in the (partially-flattened) final data cube
key = ("load_files", i)
key += (0,) * len(file_shape)
# Each task will be to call _call_loader, with the loader as an argument
tasks[key] = (_call_loader, loader)

dsk = dask.highlevelgraph.HighLevelGraph.from_collections("load_files", tasks, dependencies=())
# Specifies that each chunk occupies a space of 1 pixel in the first dimension, and all the pixels in the others
chunks = ((1,) * loader_array.size,) + tuple((s,) for s in file_shape)
array = dask.array.Array(dsk,
Comment on lines +37 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is only true for some arrays? i.e. VISP and not VBI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this stage, the array is just (n_chunks, *chunk_size), and a few lines down the actual data cube shape is imposed. I think that approach seemed easier that figuring out how to assemble the loaders into the actual data cube shape from the beginning.

name="load_files",
chunks=chunks,
dtype=loader_array.flat[0].dtype)
# Now impose the higher dimensions on the data cube
array = array.reshape(output_shape)
if chunksize is not None:
# If requested, re-chunk the array. Not sure this is optimal
new_chunks = (1,) * (array.ndim - len(chunksize)) + chunksize
array = array.rechunk(new_chunks)
return array


def _call_loader(loader):
data = loader.data
# The data needs an extra dimension for the leading index of the intermediate data cube, which has a leading
# index for file number
data = np.expand_dims(data, 0)
return data
2 changes: 1 addition & 1 deletion dkist/io/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _generate_array(self) -> dask.array.Array:
still have a reference to this `~.FileManager` object, meaning changes
to this object will be reflected in the data loaded by the array.
"""
return stack_loader_array(self.loader_array, self.chunksize).reshape(self.output_shape)
return stack_loader_array(self.loader_array, self.output_shape, self.chunksize)


class StripedExternalArray(BaseStripedExternalArray):
Expand Down
Loading