diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..04b965f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,32 @@ +name: Automatic Test +# Specify which GitHub events will trigger a CI build + +on: push +# Define a single job, build + +jobs: + build: + # Specify an OS for the runner + runs-on: ubuntu-latest + + #Define steps + steps: + + # Firstly, checkout repo + - name: Checkout repository + uses: actions/checkout@v2 + # Set up Python env + - name: Setup Python + uses: actions/setup-python@v2 + with: + python-version: 3.11 + # Install dependencies + - name: Install Python dependencies + run: | + python3 -m pip install --upgrade pip + pip3 install -r requirements.txt + pip3 install -e . + # Test with pytest + - name: Run pytest + run: | + pytest \ No newline at end of file diff --git a/XarrayActive/active_chunk.py b/XarrayActive/active_chunk.py index 8f03f7a..e20b975 100644 --- a/XarrayActive/active_chunk.py +++ b/XarrayActive/active_chunk.py @@ -1,95 +1,158 @@ import numpy as np +from itertools import product -# Holds all CFA-specific Active routines. +class ActiveOptionsContainer: + """ + Container for ActiveOptions properties. + """ + @property + def active_options(self): + """ + Property of the datastore that relates private option variables to the standard + ``active_options`` parameter. + """ + return { + 'chunks': self._active_chunks, + 'chunk_limits': self._chunk_limits, + } + + @active_options.setter + def active_options(self, value): + self._set_active_options(**value) + + def _set_active_options(self, chunks={}, chunk_limits=True): + + if chunks == {}: + raise NotImplementedError( + 'Default chunking is not implemented, please provide a chunk scheme ' + ' - active_options = {"chunks": {}}' + ) + + self._active_chunks = chunks + self._chunk_limits = chunk_limits + +# Holds all Active routines. class ActiveChunk: description = "Container class for Active routines performed on each chunk. All active-per-chunk content can be found here." - - def __init__(self, *args, **kwargs): - raise NotImplementedError def _post_process_data(self, data): # Perform any post-processing steps on the data here return data - def _standard_mean(self, axis=None, skipna=None, **kwargs): + def _standard_sum(self, axes=None, skipna=None, **kwargs): """ Standard Mean routine matches the normal routine for dask, required at this stage if Active mean not available. """ - size = 1 - for i in axis: - size *= self.shape[i] arr = np.array(self) if skipna: - total = np.nanmean(arr, axis=axis, **kwargs) *size + total = np.nansum(arr, axis=axes, **kwargs) else: - total = np.mean(arr, axis=axis, **kwargs) *size - return {'n': self._numel(arr, axis=axis), 'total': total} + total = np.sum(arr, axis=axes, **kwargs) + return total + + def _standard_max(self, axes=None, skipna=None, **kwargs): + return np.max(self, axis=axes) + + def _standard_min(self, axes=None, skipna=None, **kwargs): + return np.min(self, axis=axes) - def _numel(self, axis=None): - if not axis: + def _numel(self, method, axes=None): + if not axes: return self.size size = 1 - for i in axis: + for i in axes: size *= self.shape[i] newshape = list(self.shape) - newshape[axis] = 1 + for ax in axes: + newshape[ax] = 1 return np.full(newshape, size) - def active_mean(self, axis=None, skipna=None, **kwargs): + def active_method(self, method, axis=None, skipna=None, **kwargs): """ Use PyActiveStorage package functionality to perform mean of this Fragment. - :param axis: (int) The axis over which to perform the active_mean operation. + :param axis: (int) The axes over which to perform the active_mean operation. :param skipna: (bool) Skip NaN values when calculating the mean. :returns: A ``duck array`` (numpy-like) with the reduced array or scalar value, - as specified by the axis parameter. + as specified by the axes parameter. """ + + standard_methods = { + 'mean': self._standard_sum, + 'sum' : self._standard_sum, + 'max' : self._standard_max, + 'min' : self._standard_min + } + ret = None + n = self._numel(method, axes=axis) + try: from activestorage.active import Active except ImportError: # Unable to import Active package. Default to using normal mean. print("ActiveWarning: Unable to import active module - defaulting to standard method.") - return self._standard_mean(axis=axis, skipna=skipna, **kwargs) - - active = Active(self.filename, self.address) - active.method = "mean" - extent = self.get_extent() - - if not axis is None: - return { - 'n': self._numel(axis=axis), - 'total': self._post_process_data(active[extent]) + ret = { + 'n': n, + 'total': standard_methods[method](axes=axis, skipna=skipna, **kwargs) } - # Experimental Recursive requesting to get each 1D column along the axis being requested. - range_recursives = [] - for dim in range(self.ndim): - if dim != axis: - range_recursives.append(range(extent[dim].start, extent[dim].stop+1)) - else: - range_recursives.append(extent[dim]) - results = np.array(self._get_elements(active, range_recursives, hyperslab=[])) + if not ret: + + active = Active(self.filename, self.address) + active.method = method + extent = tuple(self.get_extent()) + + if axis == None: + axis = tuple([i for i in range(self.ndim)]) + + n = self._numel(method, axes=axis) + + if len(axis) == self.ndim: + data = active[extent] + t = self._post_process_data(data) * n + + ret = { + 'n': n, + 'total': t + } + + if not ret: + # Experimental Recursive requesting to get each 1D column along the axes being requested. + range_recursives = [] + for dim in range(self.ndim): + if dim not in axis: + range_recursives.append(range(extent[dim].start, extent[dim].stop)) + else: + range_recursives.append(extent[dim]) + results = np.array(self._get_elements(active, range_recursives, hyperslab=[])) + + t = self._post_process_data(results) * n + ret = { + 'n': n, + 'total': t + } - return { - 'n': self._numel(axis=axis), - 'total': self._post_process_data(results) - } + if method == 'mean': + return ret + else: + return ret['total']/ret['n'] def _get_elements(self, active, recursives, hyperslab=[]): dimarray = [] - current = recursives[0] - if not len(recursives) > 1: + if not len(recursives) > 0: # Perform active slicing and meaning here. - return active[hyperslab] + return active[tuple(hyperslab)].flatten()[0] + + current = recursives[0] if type(current) == slice: newslab = hyperslab + [current] diff --git a/XarrayActive/active_dask.py b/XarrayActive/active_dask.py index 09fb185..bd0a196 100644 --- a/XarrayActive/active_dask.py +++ b/XarrayActive/active_dask.py @@ -1,13 +1,44 @@ import dask.array as da -from dask.array.reductions import mean_agg +from dask.array.reductions import mean_agg, mean_combine, nanmax, nanmin +from dask.utils import deepmap +from dask.array.core import _concatenate2 +import numpy as np - -def block_active_mean(arr, *args, **kwargs): - if hasattr(arr,'active_mean'): - return arr.active_mean(*args, **kwargs) +def partition_mean(arr, *args, **kwargs): + return partition_method(arr, 'mean', *args, **kwargs) + +def partition_max(arr, *args, **kwargs): + return partition_method(arr, 'max', *args, **kwargs) + +def partition_min(arr, *args, **kwargs): + return partition_method(arr, 'min', *args, **kwargs) + +def partition_sum(arr, *args, **kwargs): + return partition_method(arr, 'sum', *args, **kwargs) + +def partition_method(arr, method, *args, **kwargs): + if hasattr(arr,'active_method'): + return arr.active_method(method,*args, **kwargs) else: - # Here's where barebones Xarray might fall over - may need a non-CFA custom class. - raise NotImplementedError + # Additional handling for 'meta' calculations in dask. + # Not currently implemented, bypassed using None + if arr.size == 0: + return None + return None + +def general_combine(pairs, axis=None): + if not isinstance(pairs, list): + pairs = [pairs] + return _concatenate2(pairs, axes=axis) + +def max_agg(pairs, axis=None, **kwargs): + return general_combine(pairs, axis=axis).max(axis=axis, **kwargs) + +def min_agg(pairs, axis=None, **kwargs): + return general_combine(pairs, axis=axis).min(axis=axis, **kwargs) + +def sum_agg(pairs, axis=None, **kwargs): + return general_combine(pairs, axis=axis).sum(axis=axis, **kwargs) class DaskActiveArray(da.Array): @@ -17,11 +48,12 @@ class DaskActiveArray(da.Array): def is_active(self): return True - def copy(self): - """ - Create a new DaskActiveArray instance with all the same parameters as the current instance. - """ - return DaskActiveArray(self.dask, self.name, self.chunks, meta=self) + #def copy(self): + # """ + # Create a new DaskActiveArray instance with all the same parameters as the current instance. + # """ + # copy_arr = DaskActiveArray(self.dask, self.name, self.chunks, meta=self) + # return copy_arr def __getitem__(self, index): """ @@ -49,10 +81,86 @@ def active_mean(self, axis=None, skipna=None): newarr = da.reduction( self, - block_active_mean, + partition_mean, mean_agg, + combine=mean_combine, + axis=axis, + dtype=self.dtype, + ) + + return newarr + + def active_max(self, axis=None, skipna=None): + """ + Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``. + Combines the results of the dask delayed ``active_max`` operations on each block into a single dask Array, + which is then mapped to a new DaskActiveArray object. + + :param axis: (int) The index of the axis on which to perform the active max. + + :param skipna: (bool) Skip NaN values when calculating the max. + + :returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using + the concatenations of active_means from each chunk. + """ + + newarr = da.reduction( + self, + partition_max, + max_agg, + combine=max_agg, + axis=axis, + dtype=self.dtype, + ) + + return newarr + + def active_min(self, axis=None, skipna=None): + """ + Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``. + Combines the results of the dask delayed ``active_min`` operations on each block into a single dask Array, + which is then mapped to a new DaskActiveArray object. + + :param axis: (int) The index of the axis on which to perform the active min. + + :param skipna: (bool) Skip NaN values when calculating the min. + + :returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using + the concatenations of active_means from each chunk. + """ + + newarr = da.reduction( + self, + partition_min, + min_agg, + combine=min_agg, + axis=axis, + dtype=self.dtype, + ) + + return newarr + + def active_sum(self, axis=None, skipna=None): + """ + Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``. + Combines the results of the dask delayed ``active_sum`` operations on each block into a single dask Array, + which is then mapped to a new DaskActiveArray object. + + :param axis: (int) The index of the axis on which to perform the active sum. + + :param skipna: (bool) Skip NaN values when calculating the sum. + + :returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using + the concatenations of active_means from each chunk. + """ + + newarr = da.reduction( + self, + partition_sum, + sum_agg, + combine=sum_agg, axis=axis, - dtype=self.dtype + dtype=self.dtype, ) - return DaskActiveArray(newarr.dask, newarr.name, newarr.chunks, meta=newarr) + return newarr \ No newline at end of file diff --git a/XarrayActive/active_xarray.py b/XarrayActive/active_xarray.py index e3561d2..abebf6c 100644 --- a/XarrayActive/active_xarray.py +++ b/XarrayActive/active_xarray.py @@ -8,6 +8,7 @@ from xarray.core.dataarray import DataArray from .active_dask import DaskActiveArray +from xarray.core import duck_array_ops class ActiveDataArray(DataArray): # No additional properties @@ -15,19 +16,68 @@ class ActiveDataArray(DataArray): def mean( self, - dim, + *args, + **kwargs, + ): + + return self._active_op( + dataarray_active_mean, + *args, + **kwargs, + ) + + def max( + self, + *args, + **kwargs, + ): + + return self._active_op( + dataarray_active_max,#duck_array_ops.max, + *args, + **kwargs, + ) + + def min( + self, + *args, + **kwargs, + ): + + return self._active_op( + dataarray_active_min, + *args, + **kwargs, + ) + + def sum( + self, + *args, + **kwargs, + ): + + return self._active_op( + dataarray_active_sum, + *args, + **kwargs, + ) + + def _active_op( + self, + op = None, + dim = None, *, - skipna = None, - keep_attrs = None, + skipna: bool | None = None, + keep_attrs: bool | None = None, **kwargs, ): """ - Reduce this DataArray's data by applying ``mean`` along some dimension(s). + Reduce this DataArray's data by applying an operation along some dimension(s). Parameters ---------- dim : str, Iterable of Hashable, "..." or None, default: None - Name of dimension[s] along which to apply ``mean``. For e.g. ``dim="x"`` + Name of dimension[s] along which to apply the operation`. For e.g. ``dim="x"`` or ``dim=["x", "y"]``. If "..." or None, will reduce over all dimensions. skipna : bool or None, optional If True, skip missing values (as marked by NaN). By default, only @@ -40,24 +90,28 @@ def mean( returned without attributes. **kwargs : Any Additional keyword arguments passed on to the appropriate array - function for calculating ``mean`` on this object's data. + function for calculating the operation on this object's data. These could include dask-specific kwargs like ``split_every``. Returns ------- reduced : DataArray - New DataArray with ``mean`` applied to its data and the + New DataArray with ``max`` applied to its data and the indicated dimension(s) removed + See Also + -------- + numpy.max + dask.array.max """ return self.reduce( - dataarray_active_mean, # from duck_array_ops.mean + op, dim=dim, skipna=skipna, keep_attrs=keep_attrs, **kwargs, ) - + class ActiveDataset(Dataset): # No additional properties @@ -68,12 +122,12 @@ def _construct_dataarray(self, name): darr = super()._construct_dataarray(name) - is_active_variable = True # Convert variable to DaskActiveArray if not already defined as that type. # CFAPyX - FragmentArrayWrapper returns a DaskActiveArray upon indexing. variable = darr.variable + # If the active parts have been lost at this point. if not isinstance(variable.data, DaskActiveArray) and is_active_variable: variable.data = DaskActiveArray( variable.data.dask, @@ -96,7 +150,19 @@ def _construct_dataarray(self, name): fastpath=True ) -def dataarray_active_mean(array: DaskActiveArray, axis=None, skipna=None, **kwargs): +def dataarray_active_mean(array, *args, **kwargs): + return dataarray_active_method(array, 'mean', *args, **kwargs) + +def dataarray_active_max(array, *args, **kwargs): + return dataarray_active_method(array, 'max', *args, **kwargs) + +def dataarray_active_min(array, *args, **kwargs): + return dataarray_active_method(array, 'min', *args, **kwargs) + +def dataarray_active_sum(array, *args, **kwargs): + return dataarray_active_method(array, 'sum', *args, **kwargs) + +def dataarray_active_method(array: DaskActiveArray, method: str, axis=None, skipna=None, **kwargs): """ Function provided to dask reduction, activates the ``active_mean`` method of the ``DaskActiveArray``. @@ -109,10 +175,25 @@ def dataarray_active_mean(array: DaskActiveArray, axis=None, skipna=None, **kwar :returns: The result from performing the ``DaskActiveArray.active_mean`` method, which gives a new ``DaskActiveArray`` object. """ + from xarray.core import duck_array_ops + arr_methods = { + 'mean': array.active_mean, + 'max': array.active_max, + 'min': array.active_min, + 'sum': array.active_sum + } + + duck_methods = { + 'mean': duck_array_ops.mean, + 'max': duck_array_ops.max, + 'min': duck_array_ops.min, + 'sum': duck_array_ops.sum + } + from xarray.core import duck_array_ops try: - return array.active_mean(axis, skipna=skipna, **kwargs) + return arr_methods[method](axis, skipna=skipna, **kwargs) except AttributeError: print("ActiveWarning: Unable to compute active mean - array has already been loaded.") print("NetCDF file size may prohibit lazy loading and thus Active methods.") - return duck_array_ops.mean(array, axis=axis, skipna=skipna, **kwargs) + return duck_methods[method](array, axis=axis, skipna=skipna, **kwargs) diff --git a/XarrayActive/backend.py b/XarrayActive/backend.py index ce1d45c..da107b7 100644 --- a/XarrayActive/backend.py +++ b/XarrayActive/backend.py @@ -8,6 +8,7 @@ ) from .active_xarray import ActiveDataset +from .datastore import ActiveDataStore def open_active_dataset( filename_or_obj, @@ -18,6 +19,7 @@ def open_active_dataset( decode_coords=None, use_cftime=None, decode_timedelta=None, + active_options={}, group=None, ): """ @@ -31,7 +33,9 @@ def open_active_dataset( """ # Load the normal datastore from the provided file (object not supported). - store = NetCDF4DataStore.open(filename_or_obj, group=group) + store = ActiveDataStore.open(filename_or_obj, group=group) + + store.active_options = active_options # Xarray makes use of StoreBackendEntrypoints to provide the Dataset 'ds' store_entrypoint = ActiveStoreBackendEntrypoint() @@ -64,6 +68,7 @@ def open_dataset( decode_coords=None, use_cftime=None, decode_timedelta=None, + active_options={}, group=None, # backend specific keyword arguments # do not use 'chunks' or 'cache' here @@ -82,6 +87,7 @@ def open_dataset( decode_coords=decode_coords, use_cftime=use_cftime, decode_timedelta=decode_timedelta, + active_options=active_options, group=group) class ActiveStoreBackendEntrypoint(StoreBackendEntrypoint): diff --git a/XarrayActive/datastore.py b/XarrayActive/datastore.py index 171180c..16c46de 100644 --- a/XarrayActive/datastore.py +++ b/XarrayActive/datastore.py @@ -1,33 +1,57 @@ from xarray.backends import NetCDF4DataStore -from xarray.backends.common import ( - BackendArray, - robust_getitem -) +from xarray.core.utils import FrozenDict from xarray.coding.variables import pop_to -from xarray.coding.strings import create_vlen_dtype from xarray.core import indexing from xarray.core.variable import Variable -from dask.utils import SerializableLock -from dask.array.core import getter -from dask.base import tokenize - from contextlib import suppress -import functools -import operator import numpy as np -from .active_dask import DaskActiveArray +from .active_chunk import ( + ActiveOptionsContainer, +) + +from .wrappers import ActiveArrayWrapper + +class ActiveDataStore(NetCDF4DataStore, ActiveOptionsContainer): -class ActiveDataStore(NetCDF4DataStore): - def open_store_variable(self, name: str, var): + def get_variables(self): + """ + """ + return FrozenDict( + (k, self.open_variable(k, v)) for k, v in self.ds.variables.items() + ) + + def open_variable(self, name: str, var): + if name in self.ds.dimensions or not self._active_chunks: + return self.open_store_variable(name, var) + else: + return self.open_active_variable(name, var) + + def open_active_variable(self, name: str, var): import netCDF4 dimensions = var.dimensions + + units = '' + if hasattr(var, 'units'): + units = getattr(var, 'units') + attributes = {k: var.getncattr(k) for k in var.ncattrs()} - data = indexing.LazilyIndexedArray(ActiveSubarrayWrapper(name, self)) + data = indexing.LazilyIndexedArray( + ActiveArrayWrapper( + self._filename, + var, + var.shape, + units, + var.dtype, + named_dims=dimensions, + active_options=self.active_options + ) + ) + encoding = {} if isinstance(var.datatype, netCDF4.EnumType): @@ -66,100 +90,3 @@ def open_store_variable(self, name: str, var): encoding["original_shape"] = data.shape return Variable(dimensions, data, attributes, encoding) - -class ActiveSubarrayWrapper(BackendArray, SuperLazyArrayLike): - - def __init__(self, variable_name, datastore, chunks=None, extent=None): - self.datastore = datastore - self.variable_name = variable_name - - self._chunks = chunks - self._extent = extent - self._lock = SerializableLock() - - self._variable = self._get_variable() - self.shape = self._variable.shape - self.ndim = len(self.shape) - - dtype = self._variable.dtype - if dtype is str: - # use object dtype (with additional vlen string metadata) because that's - # the only way in numpy to represent variable length strings and to - # check vlen string dtype in further steps - # it also prevents automatic string concatenation via - # conventions.decode_cf_variable - dtype = create_vlen_dtype(str) - self.dtype = dtype - - self.__array_function__ = self.get_array - - def _get_variable(self, needs_lock=True): - ds = self.datastore._acquire(needs_lock) - variable = ds.variables[self.variable_name] - variable.set_auto_maskandscale(False) - # only added in netCDF4-python v1.2.8 - with suppress(AttributeError): - variable.set_auto_chartostring(False) - return variable - - def __array__(self): - - if not self._chunks: - # get_array should just get the whole array if that's what we're trying to do. - # indexing should just be added to the instance of this class, and then the - # built-in mean from _ActiveFragment should take care of things. - return self._variable - - - # for every dask chunk return a smaller object with the right extent. - # Create a chunk_shape tuple from chunks and _variable (figure out which chunk and which axis, divide etc.) - # Define a subarray for each chunk, with appropriate index. - - chunks = None # Need to find out what this needs to be. - - name = (f"{self.__class__.__name__}-{tokenize(self)}",) - dsk = {} - for pos in positions: - - subarray = ArrayPartition( - filename, - address, - dtype=, - shape=, - position=pos, - ) - - key = f"{subarray.__class__.__name__}-{tokenize(subarray)}" - dsk[key] = subarray - dsk[name + f_index] = ( - getter, # Dask default should be enough with the new indexing routine. - key, - extent, - False, - getattr(subarray,"_lock",False) - ) - - return DaskActiveArray(dsk, name, chunks=chunks, dtype=self.dtype) - - - def _getitem(self, key): - if self.datastore.is_remote: # pragma: no cover - getitem = functools.partial(robust_getitem, catch=RuntimeError) - else: - getitem = operator.getitem - - try: - with self.datastore.lock: - original_array = self.get_array(needs_lock=False) - array = getitem(original_array, key) - except IndexError: - # Catch IndexError in netCDF4 and return a more informative - # error message. This is most often called when an unsorted - # indexer is used before the data is loaded from disk. - msg = ( - "The indexing operation you are attempting to perform " - "is not valid on netCDF4.Variable object. Try loading " - "your data into memory first by calling .load()." - ) - raise IndexError(msg) - return array diff --git a/XarrayActive/partition.py b/XarrayActive/partition.py deleted file mode 100644 index 4986f0f..0000000 --- a/XarrayActive/partition.py +++ /dev/null @@ -1,414 +0,0 @@ -__author__ = "Daniel Westwood" -__contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" - -# Chunk wrapper is common to both CFAPyX and XarrayActive -VERSION = 1.0 - -import numpy as np -import netCDF4 - -from itertools import product -from dask.utils import SerializableLock - -try: - from XarrayActive import ActiveChunk -except: - class ActiveChunk: - pass - -class ArrayLike: - """ - Container class for Array-like behaviour - """ - description = 'Container class for Array-Like behaviour' - - def __init__(self, shape, units=None, dtype=None): - - # Standard parameters to store for array-like behaviour - self.shape = shape - self.units = units - self.dtype = dtype - - # Shape-based properties (Lazy loading means this may change in some cases) - @property - def size(self): - """ - Size property is derived from the current shape. In an ``ArrayLike`` - instance the shape is fixed, but other classes may alter the shape - at runtime. - """ - return product(self.shape) - - @property - def ndim(self): - """ - ndim property is derived from the current shape. In an ``ArrayLike`` - instance the shape is fixed, but other classes may alter the shape - at runtime. - """ - return len(self.shape) - - def copy(self, **kwargs): - """ - Return a new basic ArrayLike instance. Ignores provided kwargs - this class does not require, but other inheritors may.""" - return ArrayLike( - self.shape, - **self.get_kwargs() - ) - - def get_kwargs(self): - """ - Get the kwargs provided to this class initially - for creating a copy.""" - return { - 'units':self.units, - 'dtype':self.dtype - } - -class SuperLazyArrayLike(ArrayLike): - """ - Container class for SuperLazy Array-Like behaviour. ``SuperLazy`` behaviour is - defined as Lazy-Slicing behaviour for objects that are below the 'Dask Surface', - i.e for object that serve as Dask Chunks.""" - - description = "Container class for SuperLazy Array-Like behaviour" - - def __init__(self, shape, **kwargs): - """ - Adds an ``extent`` variable derived from the initial shape, - this can be altered by performing slices, which are not applied - 'Super-Lazily' to the data. - """ - - self._extent = [ - slice(0, i) for i in shape - ] - - super().__init__(shape, **kwargs) - - def __getitem__(self, selection): - """ - SuperLazy behaviour supported by saving index information to be applied when fetching the array. - This is considered ``SuperLazy`` because Dask already loads dask chunks lazily, but a further lazy - approach is required when applying Active methods. - """ - newextent = self._combine_slices(selection) - return self.copy(newextent) - - @property - def shape(self): - """ - Apply the current ``extent`` slices to determine the current array shape, - given all current slicing operations. This replaces shape as a simple - attribute in ``ArrayLike``, on instantiation the ``_shape`` private attribute - is defined, and subsequent attempts to retrieve the ``shape`` will depend on - the current ``extent``. - """ - current_shape = [] - if not self._extent: - return self._shape - for d, e in enumerate(self._extent): - start = e.start or 0 - stop = e.stop or self.shape[d] - step = e.step or 1 - current_shape.append(int((stop - start)/step)) - return tuple(current_shape) - - @shape.setter - def shape(self, value): - self._shape = value - - def _combine_slices(self, newslice): - """ - Combine existing ``extent`` attribute with a new set of slices. - - :param newslice: (tuple) A set of slices to apply to the data - 'Super-Lazily', i.e the slices will be combined with existing information - and applied later in the process. - - :returns: The combined set of slices. - """ - - if len(newslice) != len(self.shape): - if hasattr(self, 'active'): - # Mean has already been computed. Raise an error here - # since we should be down to dealing with numpy arrays now. - raise ValueError( - "Active chain broken - mean has already been accomplished." - ) - else: - self._array = np.array(self)[newslice] - return None - - def combine_sliced_dim(old, new, dim): - - ostart = old.start or 0 - ostop = old.stop or self._shape[dim] - ostep = old.step or 1 - - nstart = new.start or 0 - nstop = new.stop or self._shape[dim] - nstep = new.step or 1 - - start = ostart + ostep*nstart - step = ostep * nstep - stop = start + step * (nstop - nstart) - return slice(start, stop, step) - - - if not self._extent: - return newslice - else: - extent = self._extent - for dim in range(len(newslice)): - extent[dim] = combine_sliced_dim(extent[dim], newslice[dim], dim) - return extent - - def get_extent(self): - return self._extent - - def copy(self, newextent=None): - """ - Create a new instance of this class with all attributes of the current instance, but - with a new initial extent made by combining the current instance extent with the ``newextent``. - Each ArrayLike class must overwrite this class to get the best performance with multiple - slicing operations. - """ - kwargs = self.get_kwargs() - if newextent: - kwargs['extent'] = self._combine_slices(newextent) - - new_instance = SuperLazyArrayLike( - **kwargs - ) - return new_instance - -class ArrayPartition(ActiveChunk, SuperLazyArrayLike): - """ - Complete Array-like object with all proper methods for data retrieval. - May include methods from ``XarrayActive.ActiveChunk`` if installed.""" - - description = "Complete Array-like object with all proper methods for data retrieval." - - def __init__(self, - filename, - address, - dtype=None, - units=None, - shape=None, - position=None, - extent=None, - format=None, - ): - - """ - Wrapper object for the 'array' section of a fragment or chunk. Contains some metadata to ensure the - correct fragment/chunk is selected, but generally just serves the array to dask when required. - - :param filename: (str) The path to the data file from which this fragment or chunk is - derived, if known. Not used in this class other than to support a ``.copy`` mechanism of - higher-level classes like ``CFAPartition``. - - - - :param address: (str) The variable name/address within the underlying data file which this class represents. - - :param dtype: (obj) The datatype of the values represented in this Array-like class. - - :param units: (obj) The units of the values represented in this Array-like class. - - :param shape: (tuple) The shape of the partition represented by this class. - - - - :param position: (tuple) The position in ``index space`` into which this chunk belongs, this could be - ``fragment space`` or ``chunk space`` if using Active chunks. - - :param extent: (tuple) Initial set of slices to apply to this chunk. Further slices may be applied which - are concatenated to the original slice defined here, if present. For fragments this will be - the extent of the whole array, but when using Active chunks the fragment copies may only - cover a partition of the fragment. - - :param format: (str) The format type of the underlying data file from which this fragment or chunk is - derived, if known. Not used in this class other than to support a ``.copy`` mechanism of - higher-level classes like ``CFAPartition``. - """ - - self.__array_function__ = self.__array__ - - self.filename = filename - self.address = address - - self.format = format - self.position = position - - self._extent = extent - self._lock = SerializableLock() - - super().__init__(shape, dtype=dtype, units=units) - - def __array__(self, *args, **kwargs): - """ - Retrieves the array of data for this variable chunk, casted into a Numpy array. Use of this method - breaks the ``Active chain`` by retrieving all the data before any methods can be applied. - - :returns: A numpy array of the data for the correct variable with correctly applied selections - defined by the ``extent`` parameter. - """ - - # Unexplained xarray behaviour: - # If using xarray indexing, __array__ should not have a positional 'dtype' option. - # If casting DataArray to numpy, __array__ requires a positional 'dtype' option. - dtype = None - if args: - dtype = args[0] - - if dtype != self.dtype: - raise ValueError( - 'Requested datatype does not match this chunk' - ) - - ds = self.open() - - if '/' in self.address: - # Assume we're dealing with groups but we just need the data for this variable. - - addr = self.address.split('/') - group = '/'.join(addr[1:-1]) - varname = addr[-1] - - ds = ds.groups[group] - - else: - varname = self.address - - try: - array = ds.variables[varname] - except KeyError: - raise ValueError( - f"Dask Chunk at '{self.position}' does not contain " - f"the variable '{varname}'." - ) - - try: - var = np.array(array[tuple(self._extent)]) - except IndexError: - raise ValueError( - f"Unable to select required 'extent' of {self.extent} " - f"from fragment {self.position} with shape {array.shape}" - ) - - return self._post_process_data(var) - - def _post_process_data(self, data): - """ - Perform any post-processing steps on the data here. - - unit correction - - calendar correction - """ - return data - - def _try_openers(self, filename): - """ - Attempt to open the dataset using all possible methods. Currently only NetCDF is supported. - """ - for open in [ - self._open_netcdf, - self._open_pp, - self._open_um - ]: - try: - ds = open(filename) - except: - pass - if not ds: - raise FileNotFoundError( - 'No file type provided and opening failed with all known types.' - ) - return ds - - def _open_pp(self, filename): - raise NotImplementedError - - def _open_um(self, filename): - raise NotImplementedError - - def _open_netcdf(self, filename): - """ - Open a NetCDF file using the netCDF4 python package.""" - return netCDF4.Dataset(filename, mode='r') - - def get_kwargs(self): - """ - Return all the initial kwargs from instantiation, to support ``.copy()`` mechanisms by higher classes. - """ - return { - 'dtype': self.dtype, - 'units': self.units, - 'shape': self.shape, - 'position': self.position, - 'extent': self._extent, - 'format': self.format - } - - def copy(self, newextent=None): - """ - Create a new instance of this class with all attributes of the current instance, but - with a new initial extent made by combining the current instance extent with the ``newextent``. - Each ArrayLike class must overwrite this class to get the best performance with multiple - slicing operations. - """ - kwargs = self.get_kwargs() - if newextent: - kwargs['extent'] = self._combine_slices(newextent) - - new_instance = SuperLazyArrayLike( - self.filename, - self.address, - **kwargs, - ) - return new_instance - - def open(self): - """ - Open the source file for this chunk to extract data. Multiple file locations may be provided - for this object, in which case there is a priority for 'remote' sources first, followed by - 'local' sources - otherwise the order is as given in the fragment array variable ``location``. - """ - - filenames = self.filename - - if type(filenames) == str: - filenames = [filenames] - - # Tidy code - never going to be many filenames - local = [l for l in filenames if '://' not in l] - remote = [r for r in filenames if '://' in r] - relative = [d for d in filenames if d[:5] not in ('https','s3://','file:')] - - # Prioritise relative then remote options first if any are present. - filenames = relative + remote + local - - for filename in filenames: - try: - if not self.format: - # guess opening format. - return self._try_openers(filename) - - if self.format == 'nc': - return self._open_netcdf(filename) - else: - raise ValueError( - f"Unrecognised format '{self.format}'" - ) - except ValueError as err: - raise err - except: - pass - - raise FileNotFoundError( - f'None of the location options for chunk "{self.position}" could be accessed.' - f'Locations tried: {filenames}.' - ) - diff --git a/XarrayActive/subarray.py b/XarrayActive/subarray.py deleted file mode 100644 index fcc6dc0..0000000 --- a/XarrayActive/subarray.py +++ /dev/null @@ -1,117 +0,0 @@ -from XarrayActive.partition import ArrayPartition, ArrayLike - -class ActiveNetCDF4Wrapper(ArrayLike): - - def __init__( - self, - variable_name, - datastore, - chunks=None, - extent=None - ): - self.datastore = datastore - self.variable_name = variable_name - - self._chunks = chunks - self._extent = extent - self._lock = SerializableLock() - - self._variable = self.get_variable() - self.shape = self._variable.shape - self.ndim = len(self.shape) - - dtype = self._variable.dtype - if dtype is str: - # use object dtype (with additional vlen string metadata) because that's - # the only way in numpy to represent variable length strings and to - # check vlen string dtype in further steps - # it also prevents automatic string concatenation via - # conventions.decode_cf_variable - dtype = create_vlen_dtype(str) - - self.__array_function__ = self.get_array - - super().__init__(shape, units=units, dtype=dtype) - - def get_variable(self, needs_lock=True): - ds = self.datastore._acquire(needs_lock) - variable = ds.variables[self.variable_name] - variable.set_auto_maskandscale(False) - # only added in netCDF4-python v1.2.8 - with suppress(AttributeError): - variable.set_auto_chartostring(False) - return variable - - def get_array(self): - - if not self._chunks: - # get_array should just get the whole array if that's what we're trying to do. - # indexing should just be added to the instance of this class, and then the - # built-in mean from _ActiveFragment should take care of things. - return self._variable - else: - - # for every dask chunk return a smaller object with the right extent. - # Create a chunk_shape tuple from chunks and _variable (figure out which chunk and which axis, divide etc.) - # Define a subarray for each chunk, with appropriate index. - - f_indices = None # from chunks - chunks = None # Need to find out what this needs to be. - - name = (f"{self.__class__.__name__}-{tokenize(self)}",) - dsk = {} - for f_index in f_indices: - - position = None - extent = None - cformat = None - - subarray = ArrayPartition( - self.filename, - self.variable_name, - dtype=self.dtype, - units=self.units, - shape=self.shape, - position=position, - extent=extent, - format=cformat - ) - - key = f"{subarray.__class__.__name__}-{tokenize(subarray)}" - dsk[key] = subarray - dsk[name + f_index] = ( - getter, # Dask default should be enough with the new indexing routine. - key, - f_indices, - False, - getattr(subarray,"_lock",False) - ) - - return DaskActiveArray(dsk, name, chunks=chunks, dtype=self.dtype) - - - def __getitem__(self, index): - self._extent = self._combine_slices(index) - return self - - def _getitem(self, key): - if self.datastore.is_remote: # pragma: no cover - getitem = functools.partial(robust_getitem, catch=RuntimeError) - else: - getitem = operator.getitem - - try: - with self.datastore.lock: - original_array = self.get_array(needs_lock=False) - array = getitem(original_array, key) - except IndexError: - # Catch IndexError in netCDF4 and return a more informative - # error message. This is most often called when an unsorted - # indexer is used before the data is loaded from disk. - msg = ( - "The indexing operation you are attempting to perform " - "is not valid on netCDF4.Variable object. Try loading " - "your data into memory first by calling .load()." - ) - raise IndexError(msg) - return array \ No newline at end of file diff --git a/XarrayActive/wrappers.py b/XarrayActive/wrappers.py new file mode 100644 index 0000000..8b598c1 --- /dev/null +++ b/XarrayActive/wrappers.py @@ -0,0 +1,144 @@ +from arraypartition import ( + ArrayPartition, + ArrayLike, + get_chunk_space, + get_chunk_shape, + get_chunk_positions, + get_chunk_extent, + get_dask_chunks, + combine_slices +) +from .active_chunk import ( + ActiveChunk, + ActiveOptionsContainer +) + +from .active_dask import DaskActiveArray + +from dask.array.core import getter +from dask.base import tokenize + +from itertools import product + +class ActivePartition(ArrayPartition): + """ + Combines ActiveChunk - active methods, and ArrayPartition - array methods + into a single ChunkWrapper class. + """ + def copy(self, extent=None): + + kwargs = self.get_kwargs() + if extent: + kwargs['extent'] = combine_slices(self.shape, list(self.get_extent()), extent) + ap = ActivePartition( + self.filename, + self.address, + **kwargs + ) + return ap + +class ActiveArrayWrapper(ArrayLike, ActiveOptionsContainer): + """ + ActiveArrayWrapper behaves like an Array that can be indexed or referenced to + return a Dask-like array object. This class is essentially a constructor for the + partitions that feed into the returned Dask-like array into Xarray. + """ + def __init__( + self, + filename, + var, + shape, + units=None, + dtype=None, + named_dims=None, + active_options={}, + ): + + self._variable = var + + self.filename = filename + self.name = var.name + self.active_options = active_options + + self.named_dims = named_dims + + super().__init__(shape, units=units, dtype=dtype) + + self.chunk_shape = get_chunk_shape( + self._active_chunks, + self.shape, + self.named_dims, + chunk_limits=self._chunk_limits + ) + + self.chunk_space = get_chunk_space( + self.chunk_shape, + self.shape + ) + + self.__array_function__ = self.__array__ + + def __getitem__(self, selection): + """ + Non-lazy retrieval of the dask array when this object is indexed. + """ + arr = self.__array__() + return arr[selection] + + def __array__(self, *args, **kwargs): + + if not self._active_chunks: + # get_array should just get the whole array if that's what we're trying to do. + # indexing should just be added to the instance of this class, and then the + # built-in mean from _ActiveFragment should take care of things. + return self._variable + else: + + # for every dask chunk return a smaller object with the right extent. + # Create a chunk_shape tuple from chunks and _variable (figure out which chunk and which axis, divide etc.) + # Define a subarray for each chunk, with appropriate index. + + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + dsk = {} + positions = get_chunk_positions(self.chunk_space) + request = get_chunk_extent(positions[0], self.shape, self.chunk_space) + + global_extent = {} + + for position in positions: + position = tuple(position) + + extent = get_chunk_extent(position, self.shape, self.chunk_space) + cformat = None + global_extent[position] = extent + + chunk = ActivePartition( + self.filename, + self.name, + dtype=self.dtype, + units=self.units, + shape=self.chunk_shape, + position=position, + extent=extent, + format=cformat + ) + + c_identifier = f"{chunk.__class__.__name__}-{tokenize(chunk)}" + dsk[c_identifier] = chunk + dsk[array_name + position] = ( + getter, # Dask default should be enough with the new indexing routine. + c_identifier, + request, + False, + getattr(chunk,"_lock",False) + ) + + dask_chunks = get_dask_chunks( + self.shape, + self.chunk_space, + extent=global_extent, + dtype=self.dtype, + explicit_shapes=None + ) + + return DaskActiveArray(dsk, array_name[0], chunks=dask_chunks, dtype=self.dtype) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3ec3a50..fc4e55c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ h5py==3.11.0 dask==2024.7.0 cftime==1.6.4 cfunits==3.3.7 -pytest==7.2.0 \ No newline at end of file +pytest==7.2.0 +ArrayPartition==1.0 \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/rain_test.nc b/tests/rain_test.nc new file mode 100644 index 0000000..40e0d74 Binary files /dev/null and b/tests/rain_test.nc differ diff --git a/tests/test_active.py b/tests/test_active.py new file mode 100644 index 0000000..d8ffafc --- /dev/null +++ b/tests/test_active.py @@ -0,0 +1,102 @@ +# All routines for testing CFA general methods. +import xarray as xr +import numpy as np + +def test_active(): + + path_to_active = f'tests/rain_test.nc' + + try: + ds = xr.open_dataset( + path_to_active, + engine='Active', + active_options={})#{'chunks':{'time':2}}) + except Exception as err: + assert isinstance(err, NotImplementedError) + + ds = xr.open_dataset( + path_to_active, + engine='Active', + active_options={'chunks':{'time':2}}) + + assert 'p' in ds + assert ds['p'].shape == (20, 180, 360) + + p_sel = ds['p'].isel(time=slice(0,3),latitude=slice(140,145), longitude=slice(90,100)) + + assert p_sel.shape == (3, 5, 10) + + p_value = p_sel.mean() + + assert p_value.shape == () + assert (p_value.to_numpy() - 0.53279) < 0.01 + +def test_active_recursive(): + + path_to_active = f'tests/rain_test.nc' + + try: + ds = xr.open_dataset( + path_to_active, + engine='Active', + active_options={})#{'chunks':{'time':2}}) + except Exception as err: + assert isinstance(err, NotImplementedError) + + ds = xr.open_dataset( + path_to_active, + engine='Active', + active_options={'chunks':{'time':2}}) + + assert 'p' in ds + assert ds['p'].shape == (20, 180, 360) + + p_sel = ds['p'].isel(time=slice(0,3),latitude=slice(140,145), longitude=slice(90,100)) + + assert p_sel.shape == (3, 5, 10) + + p_mean = p_sel.mean(dim='time') + + assert p_mean.shape == (5, 10) + assert (p_mean[0][0].to_numpy() - 0.683402) < 0.01 + +def test_active_methods(): + + path_to_active = f'tests/rain_test.nc' + + try: + ds = xr.open_dataset( + path_to_active, + engine='Active', + active_options={})#{'chunks':{'time':2}}) + except Exception as err: + assert isinstance(err, NotImplementedError) + + ds = xr.open_dataset( + path_to_active, + engine='Active', + active_options={'chunks':{'time':2}}) + + assert 'p' in ds + assert ds['p'].shape == (20, 180, 360) + + p_sel = ds['p'].isel(latitude=slice(140,145), longitude=slice(90,100)) + + assert p_sel.shape == (20, 5, 10) + + p_value = p_sel.isel(time=slice(0,3)).max() + assert p_value.shape == () + assert (p_value.to_numpy() - 0.9978273) < 0.01 + + p_value = p_sel.isel(time=slice(0,3)).min() + assert p_value.shape == () + assert (p_value.to_numpy() - 0.0014456) < 0.01 + + p_value = p_sel.isel(time=slice(0,3)).sum() + assert p_value.shape == () + assert (p_value.to_numpy() - 76.7931739) < 0.01 + +if __name__ == '__main__': + test_active() + test_active_recursive() + test_active_methods() \ No newline at end of file