Skip to content

Commit 099c21e

Browse files
authored
Merge pull request #1 from dwest77a/functional
Version 2024.8.16: Functional XarrayActive
2 parents 0f15d5b + 3b5fb94 commit 099c21e

13 files changed

+649
-716
lines changed

.github/workflows/ci.yml

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
name: Automatic Test
2+
# Specify which GitHub events will trigger a CI build
3+
4+
on: push
5+
# Define a single job, build
6+
7+
jobs:
8+
build:
9+
# Specify an OS for the runner
10+
runs-on: ubuntu-latest
11+
12+
#Define steps
13+
steps:
14+
15+
# Firstly, checkout repo
16+
- name: Checkout repository
17+
uses: actions/checkout@v2
18+
# Set up Python env
19+
- name: Setup Python
20+
uses: actions/setup-python@v2
21+
with:
22+
python-version: 3.11
23+
# Install dependencies
24+
- name: Install Python dependencies
25+
run: |
26+
python3 -m pip install --upgrade pip
27+
pip3 install -r requirements.txt
28+
pip3 install -e .
29+
# Test with pytest
30+
- name: Run pytest
31+
run: |
32+
pytest

XarrayActive/active_chunk.py

+106-43
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,158 @@
11
import numpy as np
2+
from itertools import product
23

34

4-
# Holds all CFA-specific Active routines.
5+
class ActiveOptionsContainer:
6+
"""
7+
Container for ActiveOptions properties.
8+
"""
9+
@property
10+
def active_options(self):
11+
"""
12+
Property of the datastore that relates private option variables to the standard
13+
``active_options`` parameter.
14+
"""
15+
return {
16+
'chunks': self._active_chunks,
17+
'chunk_limits': self._chunk_limits,
18+
}
19+
20+
@active_options.setter
21+
def active_options(self, value):
22+
self._set_active_options(**value)
23+
24+
def _set_active_options(self, chunks={}, chunk_limits=True):
25+
26+
if chunks == {}:
27+
raise NotImplementedError(
28+
'Default chunking is not implemented, please provide a chunk scheme '
29+
' - active_options = {"chunks": {}}'
30+
)
31+
32+
self._active_chunks = chunks
33+
self._chunk_limits = chunk_limits
34+
35+
# Holds all Active routines.
536
class ActiveChunk:
637

738
description = "Container class for Active routines performed on each chunk. All active-per-chunk content can be found here."
8-
9-
def __init__(self, *args, **kwargs):
10-
raise NotImplementedError
1139

1240
def _post_process_data(self, data):
1341
# Perform any post-processing steps on the data here
1442
return data
1543

16-
def _standard_mean(self, axis=None, skipna=None, **kwargs):
44+
def _standard_sum(self, axes=None, skipna=None, **kwargs):
1745
"""
1846
Standard Mean routine matches the normal routine for dask, required at this
1947
stage if Active mean not available.
2048
"""
21-
size = 1
22-
for i in axis:
23-
size *= self.shape[i]
2449

2550
arr = np.array(self)
2651
if skipna:
27-
total = np.nanmean(arr, axis=axis, **kwargs) *size
52+
total = np.nansum(arr, axis=axes, **kwargs)
2853
else:
29-
total = np.mean(arr, axis=axis, **kwargs) *size
30-
return {'n': self._numel(arr, axis=axis), 'total': total}
54+
total = np.sum(arr, axis=axes, **kwargs)
55+
return total
56+
57+
def _standard_max(self, axes=None, skipna=None, **kwargs):
58+
return np.max(self, axis=axes)
59+
60+
def _standard_min(self, axes=None, skipna=None, **kwargs):
61+
return np.min(self, axis=axes)
3162

32-
def _numel(self, axis=None):
33-
if not axis:
63+
def _numel(self, method, axes=None):
64+
if not axes:
3465
return self.size
3566

3667
size = 1
37-
for i in axis:
68+
for i in axes:
3869
size *= self.shape[i]
3970
newshape = list(self.shape)
40-
newshape[axis] = 1
71+
for ax in axes:
72+
newshape[ax] = 1
4173

4274
return np.full(newshape, size)
4375

44-
def active_mean(self, axis=None, skipna=None, **kwargs):
76+
def active_method(self, method, axis=None, skipna=None, **kwargs):
4577
"""
4678
Use PyActiveStorage package functionality to perform mean of this Fragment.
4779
48-
:param axis: (int) The axis over which to perform the active_mean operation.
80+
:param axis: (int) The axes over which to perform the active_mean operation.
4981
5082
:param skipna: (bool) Skip NaN values when calculating the mean.
5183
5284
:returns: A ``duck array`` (numpy-like) with the reduced array or scalar value,
53-
as specified by the axis parameter.
85+
as specified by the axes parameter.
5486
"""
87+
88+
standard_methods = {
89+
'mean': self._standard_sum,
90+
'sum' : self._standard_sum,
91+
'max' : self._standard_max,
92+
'min' : self._standard_min
93+
}
94+
ret = None
95+
n = self._numel(method, axes=axis)
96+
5597
try:
5698
from activestorage.active import Active
5799
except ImportError:
58100
# Unable to import Active package. Default to using normal mean.
59101
print("ActiveWarning: Unable to import active module - defaulting to standard method.")
60-
return self._standard_mean(axis=axis, skipna=skipna, **kwargs)
61-
62-
active = Active(self.filename, self.address)
63-
active.method = "mean"
64-
extent = self.get_extent()
65-
66-
if not axis is None:
67-
return {
68-
'n': self._numel(axis=axis),
69-
'total': self._post_process_data(active[extent])
102+
ret = {
103+
'n': n,
104+
'total': standard_methods[method](axes=axis, skipna=skipna, **kwargs)
70105
}
71106

72-
# Experimental Recursive requesting to get each 1D column along the axis being requested.
73-
range_recursives = []
74-
for dim in range(self.ndim):
75-
if dim != axis:
76-
range_recursives.append(range(extent[dim].start, extent[dim].stop+1))
77-
else:
78-
range_recursives.append(extent[dim])
79-
results = np.array(self._get_elements(active, range_recursives, hyperslab=[]))
107+
if not ret:
108+
109+
active = Active(self.filename, self.address)
110+
active.method = method
111+
extent = tuple(self.get_extent())
112+
113+
if axis == None:
114+
axis = tuple([i for i in range(self.ndim)])
115+
116+
n = self._numel(method, axes=axis)
117+
118+
if len(axis) == self.ndim:
119+
data = active[extent]
120+
t = self._post_process_data(data) * n
121+
122+
ret = {
123+
'n': n,
124+
'total': t
125+
}
126+
127+
if not ret:
128+
# Experimental Recursive requesting to get each 1D column along the axes being requested.
129+
range_recursives = []
130+
for dim in range(self.ndim):
131+
if dim not in axis:
132+
range_recursives.append(range(extent[dim].start, extent[dim].stop))
133+
else:
134+
range_recursives.append(extent[dim])
135+
results = np.array(self._get_elements(active, range_recursives, hyperslab=[]))
136+
137+
t = self._post_process_data(results) * n
138+
ret = {
139+
'n': n,
140+
'total': t
141+
}
80142

81-
return {
82-
'n': self._numel(axis=axis),
83-
'total': self._post_process_data(results)
84-
}
143+
if method == 'mean':
144+
return ret
145+
else:
146+
return ret['total']/ret['n']
85147

86148
def _get_elements(self, active, recursives, hyperslab=[]):
87149
dimarray = []
88-
current = recursives[0]
89-
if not len(recursives) > 1:
150+
if not len(recursives) > 0:
90151

91152
# Perform active slicing and meaning here.
92-
return active[hyperslab]
153+
return active[tuple(hyperslab)].flatten()[0]
154+
155+
current = recursives[0]
93156

94157
if type(current) == slice:
95158
newslab = hyperslab + [current]

XarrayActive/active_dask.py

+123-15
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,44 @@
11
import dask.array as da
2-
from dask.array.reductions import mean_agg
2+
from dask.array.reductions import mean_agg, mean_combine, nanmax, nanmin
3+
from dask.utils import deepmap
4+
from dask.array.core import _concatenate2
5+
import numpy as np
36

4-
5-
def block_active_mean(arr, *args, **kwargs):
6-
if hasattr(arr,'active_mean'):
7-
return arr.active_mean(*args, **kwargs)
7+
def partition_mean(arr, *args, **kwargs):
8+
return partition_method(arr, 'mean', *args, **kwargs)
9+
10+
def partition_max(arr, *args, **kwargs):
11+
return partition_method(arr, 'max', *args, **kwargs)
12+
13+
def partition_min(arr, *args, **kwargs):
14+
return partition_method(arr, 'min', *args, **kwargs)
15+
16+
def partition_sum(arr, *args, **kwargs):
17+
return partition_method(arr, 'sum', *args, **kwargs)
18+
19+
def partition_method(arr, method, *args, **kwargs):
20+
if hasattr(arr,'active_method'):
21+
return arr.active_method(method,*args, **kwargs)
822
else:
9-
# Here's where barebones Xarray might fall over - may need a non-CFA custom class.
10-
raise NotImplementedError
23+
# Additional handling for 'meta' calculations in dask.
24+
# Not currently implemented, bypassed using None
25+
if arr.size == 0:
26+
return None
27+
return None
28+
29+
def general_combine(pairs, axis=None):
30+
if not isinstance(pairs, list):
31+
pairs = [pairs]
32+
return _concatenate2(pairs, axes=axis)
33+
34+
def max_agg(pairs, axis=None, **kwargs):
35+
return general_combine(pairs, axis=axis).max(axis=axis, **kwargs)
36+
37+
def min_agg(pairs, axis=None, **kwargs):
38+
return general_combine(pairs, axis=axis).min(axis=axis, **kwargs)
39+
40+
def sum_agg(pairs, axis=None, **kwargs):
41+
return general_combine(pairs, axis=axis).sum(axis=axis, **kwargs)
1142

1243
class DaskActiveArray(da.Array):
1344

@@ -17,11 +48,12 @@ class DaskActiveArray(da.Array):
1748
def is_active(self):
1849
return True
1950

20-
def copy(self):
21-
"""
22-
Create a new DaskActiveArray instance with all the same parameters as the current instance.
23-
"""
24-
return DaskActiveArray(self.dask, self.name, self.chunks, meta=self)
51+
#def copy(self):
52+
# """
53+
# Create a new DaskActiveArray instance with all the same parameters as the current instance.
54+
# """
55+
# copy_arr = DaskActiveArray(self.dask, self.name, self.chunks, meta=self)
56+
# return copy_arr
2557

2658
def __getitem__(self, index):
2759
"""
@@ -49,10 +81,86 @@ def active_mean(self, axis=None, skipna=None):
4981

5082
newarr = da.reduction(
5183
self,
52-
block_active_mean,
84+
partition_mean,
5385
mean_agg,
86+
combine=mean_combine,
87+
axis=axis,
88+
dtype=self.dtype,
89+
)
90+
91+
return newarr
92+
93+
def active_max(self, axis=None, skipna=None):
94+
"""
95+
Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``.
96+
Combines the results of the dask delayed ``active_max`` operations on each block into a single dask Array,
97+
which is then mapped to a new DaskActiveArray object.
98+
99+
:param axis: (int) The index of the axis on which to perform the active max.
100+
101+
:param skipna: (bool) Skip NaN values when calculating the max.
102+
103+
:returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using
104+
the concatenations of active_means from each chunk.
105+
"""
106+
107+
newarr = da.reduction(
108+
self,
109+
partition_max,
110+
max_agg,
111+
combine=max_agg,
112+
axis=axis,
113+
dtype=self.dtype,
114+
)
115+
116+
return newarr
117+
118+
def active_min(self, axis=None, skipna=None):
119+
"""
120+
Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``.
121+
Combines the results of the dask delayed ``active_min`` operations on each block into a single dask Array,
122+
which is then mapped to a new DaskActiveArray object.
123+
124+
:param axis: (int) The index of the axis on which to perform the active min.
125+
126+
:param skipna: (bool) Skip NaN values when calculating the min.
127+
128+
:returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using
129+
the concatenations of active_means from each chunk.
130+
"""
131+
132+
newarr = da.reduction(
133+
self,
134+
partition_min,
135+
min_agg,
136+
combine=min_agg,
137+
axis=axis,
138+
dtype=self.dtype,
139+
)
140+
141+
return newarr
142+
143+
def active_sum(self, axis=None, skipna=None):
144+
"""
145+
Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``.
146+
Combines the results of the dask delayed ``active_sum`` operations on each block into a single dask Array,
147+
which is then mapped to a new DaskActiveArray object.
148+
149+
:param axis: (int) The index of the axis on which to perform the active sum.
150+
151+
:param skipna: (bool) Skip NaN values when calculating the sum.
152+
153+
:returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using
154+
the concatenations of active_means from each chunk.
155+
"""
156+
157+
newarr = da.reduction(
158+
self,
159+
partition_sum,
160+
sum_agg,
161+
combine=sum_agg,
54162
axis=axis,
55-
dtype=self.dtype
163+
dtype=self.dtype,
56164
)
57165

58-
return DaskActiveArray(newarr.dask, newarr.name, newarr.chunks, meta=newarr)
166+
return newarr

0 commit comments

Comments
 (0)