Skip to content

Commit aaf0cd8

Browse files
authored
Merge pull request #4 from dwest77a/functional
2024.9.0 Final Update
2 parents 099c21e + eb405a5 commit aaf0cd8

19 files changed

+152
-104
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
# XarrayActive
2-
For use with the Xarray module as an additional backend.
2+
For use with the Xarray module as an additional backend. See the module[PyActiveStorage](https://github.com/NCAS-CMS/PyActiveStorage) for more details.
33

44
## Installation
55

66
```
77
pip install xarray==2024.6.0
8-
pip install -e .
8+
pip install XarrayActive==2024.9.0
99
```
1010

1111
## Usage
1212

1313
```
1414
import xarray as xr
1515
16-
ds = xr.open_dataset('cfa_file.nc', engine='Active')
16+
ds = xr.open_dataset('any_file.nc', engine='Active')
1717
# Plot data
1818
1919
```

XarrayActive/__init__.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .active_xarray import ActiveDataset # Used by CFAPyX
2-
from .active_dask import DaskActiveArray # Used by CFAPyX
3-
from .active_chunk import ActiveChunk # Used by CFAPyX
4-
from .backend import ActiveBackendEntrypoint
1+
from XarrayActive.active_xarray import ActiveDataset # Used by CFAPyX
2+
from XarrayActive.active_dask import DaskActiveArray # Used by CFAPyX
3+
from XarrayActive.active_chunk import ActiveChunk # Used by CFAPyX
4+
from XarrayActive.backend import ActiveBackendEntrypoint
376 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
3.88 KB
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

XarrayActive/active_chunk.py

+55-14
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
class ActiveOptionsContainer:
66
"""
7-
Container for ActiveOptions properties.
7+
Container for ActiveOptions properties. Only for use within XarrayActive.
88
"""
99
@property
1010
def active_options(self):
@@ -23,28 +23,40 @@ def active_options(self, value):
2323

2424
def _set_active_options(self, chunks={}, chunk_limits=True):
2525

26+
# Auto chunking is not currently supported - 23/08/24
2627
if chunks == {}:
2728
raise NotImplementedError(
2829
'Default chunking is not implemented, please provide a chunk scheme '
2930
' - active_options = {"chunks": {}}'
3031
)
32+
33+
if 'auto' in chunks.items():
34+
raise NotImplementedError(
35+
'Auto chunking is not implemented, please provide a chunk scheme '
36+
' - active_options = {"chunks": {}}'
37+
)
3138

3239
self._active_chunks = chunks
3340
self._chunk_limits = chunk_limits
3441

35-
# Holds all Active routines.
3642
class ActiveChunk:
43+
"""
44+
Container class for all Active-required methods to perform on each chunk.
45+
All active-per-chunk content should be found here.
46+
"""
3747

38-
description = "Container class for Active routines performed on each chunk. All active-per-chunk content can be found here."
48+
description = "Container class for Active routines performed on each chunk."
3949

4050
def _post_process_data(self, data):
41-
# Perform any post-processing steps on the data here
51+
"""
52+
Perform any post-processing steps on the data here.
53+
"""
4254
return data
4355

4456
def _standard_sum(self, axes=None, skipna=None, **kwargs):
4557
"""
46-
Standard Mean routine matches the normal routine for dask, required at this
47-
stage if Active mean not available.
58+
Standard sum routine matches the normal routine for dask, required at this
59+
stage if Active mean/sum not available.
4860
"""
4961

5062
arr = np.array(self)
@@ -55,12 +67,31 @@ def _standard_sum(self, axes=None, skipna=None, **kwargs):
5567
return total
5668

5769
def _standard_max(self, axes=None, skipna=None, **kwargs):
70+
"""
71+
Standard max routine if Active not available, warning will be given.
72+
Kwargs may be necessary to add here.
73+
"""
5874
return np.max(self, axis=axes)
5975

6076
def _standard_min(self, axes=None, skipna=None, **kwargs):
77+
"""
78+
Standard min routine if Active not available, warning will be given.
79+
Kwargs may be necessary to add here.
80+
"""
6181
return np.min(self, axis=axes)
6282

6383
def _numel(self, method, axes=None):
84+
"""
85+
Number of elements remaining after a reduction, to allow
86+
dask to combine reductions from all different chunks.
87+
Example:
88+
(2,3,4) chunk reduced along second dimension. Will
89+
give a (2,3) array where each value is 4 - for the
90+
length of the dimension along which a reduction
91+
took place.
92+
93+
"""
94+
# Applied reduction across all axes
6495
if not axes:
6596
return self.size
6697

@@ -91,41 +122,47 @@ def active_method(self, method, axis=None, skipna=None, **kwargs):
91122
'max' : self._standard_max,
92123
'min' : self._standard_min
93124
}
94-
ret = None
125+
partial = None
95126
n = self._numel(method, axes=axis)
96127

97128
try:
98129
from activestorage.active import Active
99130
except ImportError:
100131
# Unable to import Active package. Default to using normal mean.
101132
print("ActiveWarning: Unable to import active module - defaulting to standard method.")
102-
ret = {
133+
partial = {
103134
'n': n,
104135
'total': standard_methods[method](axes=axis, skipna=skipna, **kwargs)
105136
}
106137

107-
if not ret:
138+
if not partial:
108139

140+
# Create Active client
109141
active = Active(self.filename, self.address)
110142
active.method = method
143+
144+
# Fetch extent for this chunk instance.
111145
extent = tuple(self.get_extent())
112146

147+
# Properly format the 'axis' kwarg.
113148
if axis == None:
114149
axis = tuple([i for i in range(self.ndim)])
115150

151+
# Determine reduction parameter for combining chunk results for dask.
116152
n = self._numel(method, axes=axis)
117153

118154
if len(axis) == self.ndim:
119155
data = active[extent]
120156
t = self._post_process_data(data) * n
121157

122-
ret = {
158+
partial = {
123159
'n': n,
124160
'total': t
125161
}
126162

127-
if not ret:
163+
if not partial:
128164
# Experimental Recursive requesting to get each 1D column along the axes being requested.
165+
# - May be very bad performance due to many requests for (1,1,X) shapes
129166
range_recursives = []
130167
for dim in range(self.ndim):
131168
if dim not in axis:
@@ -135,17 +172,21 @@ def active_method(self, method, axis=None, skipna=None, **kwargs):
135172
results = np.array(self._get_elements(active, range_recursives, hyperslab=[]))
136173

137174
t = self._post_process_data(results) * n
138-
ret = {
175+
partial = {
139176
'n': n,
140177
'total': t
141178
}
142179

143180
if method == 'mean':
144-
return ret
181+
return partial
145182
else:
146-
return ret['total']/ret['n']
183+
return partial['total']/partial['n']
147184

148185
def _get_elements(self, active, recursives, hyperslab=[]):
186+
"""
187+
Recursive function to fetch and arrange the appropriate column slices
188+
from Active.
189+
"""
149190
dimarray = []
150191
if not len(recursives) > 0:
151192

XarrayActive/active_dask.py

+34-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
from dask.array.core import _concatenate2
55
import numpy as np
66

7+
from .active_chunk import ActiveChunk
8+
9+
## Partition Methods are the first step in the Dask Reductions.
10+
711
def partition_mean(arr, *args, **kwargs):
812
return partition_method(arr, 'mean', *args, **kwargs)
913

@@ -18,14 +22,19 @@ def partition_sum(arr, *args, **kwargs):
1822

1923
def partition_method(arr, method, *args, **kwargs):
2024
if hasattr(arr,'active_method'):
25+
# Active method for each array partition
2126
return arr.active_method(method,*args, **kwargs)
27+
elif arr.size != 0:
28+
print('ActiveWarning: Using standard mean given non-active array partition')
29+
return arr.mean(*args, **kwargs)
2230
else:
23-
# Additional handling for 'meta' calculations in dask.
24-
# Not currently implemented, bypassed using None
25-
if arr.size == 0:
26-
return None
31+
# Computing meta - dask operation not fully utilised.
2732
return None
2833

34+
## Combining results from Partition methods
35+
# - Dask built-in mean-agg and mean-combine for mean.
36+
# - Min/Max/Sum require simple functions for combine/aggregation.
37+
2938
def general_combine(pairs, axis=None):
3039
if not isinstance(pairs, list):
3140
pairs = [pairs]
@@ -46,14 +55,15 @@ class DaskActiveArray(da.Array):
4655

4756
@property
4857
def is_active(self):
58+
# Quick way of distinguishing from Dask Array
4959
return True
5060

51-
#def copy(self):
52-
# """
53-
# Create a new DaskActiveArray instance with all the same parameters as the current instance.
54-
# """
55-
# copy_arr = DaskActiveArray(self.dask, self.name, self.chunks, meta=self)
56-
# return copy_arr
61+
def copy(self):
62+
"""
63+
Create a new DaskActiveArray instance with all the same parameters as the current instance.
64+
"""
65+
copy_arr = DaskActiveArray(self.dask, self.name, self.chunks, meta=self)
66+
return copy_arr
5767

5868
def __getitem__(self, index):
5969
"""
@@ -78,7 +88,6 @@ def active_mean(self, axis=None, skipna=None):
7888
:returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using
7989
the concatenations of active_means from each chunk.
8090
"""
81-
8291
newarr = da.reduction(
8392
self,
8493
partition_mean,
@@ -163,4 +172,17 @@ def active_sum(self, axis=None, skipna=None):
163172
dtype=self.dtype,
164173
)
165174

166-
return newarr
175+
return newarr
176+
177+
def active_method(self, method, axis=None, skipna=None, **kwargs):
178+
"""
179+
Pointer to the active methods of the DaskActiveArray, for use
180+
in the nested Dask setup with CFA Dask-AnySize Chunks.
181+
"""
182+
methods = {
183+
'mean':self.active_mean,
184+
'max':self.active_max,
185+
'min':self.active_min,
186+
'sum':self.active_sum
187+
}
188+
return methods[method](axis=axis, skipna=skipna)

0 commit comments

Comments
 (0)