Skip to content

Commit 006e9cf

Browse files
committed
Commit of dask/xarray issue to new branch
1 parent 20c9ed2 commit 006e9cf

File tree

6 files changed

+403
-113
lines changed

6 files changed

+403
-113
lines changed

XarrayActive/active_chunk.py

+63-66
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ def _set_active_options(self, chunks={}, chunk_limits=True):
3232
self._active_chunks = chunks
3333
self._chunk_limits = chunk_limits
3434

35-
36-
3735
# Holds all Active routines.
3836
class ActiveChunk:
3937

@@ -43,23 +41,26 @@ def _post_process_data(self, data):
4341
# Perform any post-processing steps on the data here
4442
return data
4543

46-
def _standard_mean(self, axes=None, skipna=None, **kwargs):
44+
def _standard_sum(self, axes=None, skipna=None, **kwargs):
4745
"""
4846
Standard Mean routine matches the normal routine for dask, required at this
4947
stage if Active mean not available.
5048
"""
51-
size = 1
52-
for i in axes:
53-
size *= self.shape[i]
5449

5550
arr = np.array(self)
5651
if skipna:
57-
total = np.nanmean(arr, axis=axes, **kwargs) *size
52+
total = np.nansum(arr, axis=axes, **kwargs)
5853
else:
59-
total = np.mean(arr, axis=axes, **kwargs) *size
60-
return {'n': self._numel(axes=axes), 'total': total}
54+
total = np.sum(arr, axis=axes, **kwargs)
55+
return total
56+
57+
def _standard_max(self, axes=None, skipna=None, **kwargs):
58+
return np.max(self, axis=axes)
6159

62-
def _numel(self, axes=None):
60+
def _standard_min(self, axes=None, skipna=None, **kwargs):
61+
return np.min(self, axis=axes)
62+
63+
def _numel(self, method, axes=None):
6364
if not axes:
6465
return self.size
6566

@@ -72,7 +73,7 @@ def _numel(self, axes=None):
7273

7374
return np.full(newshape, size)
7475

75-
def active_mean(self, axis=None, skipna=None, **kwargs):
76+
def active_method(self, method, axis=None, skipna=None, **kwargs):
7677
"""
7778
Use PyActiveStorage package functionality to perform mean of this Fragment.
7879
@@ -83,47 +84,66 @@ def active_mean(self, axis=None, skipna=None, **kwargs):
8384
:returns: A ``duck array`` (numpy-like) with the reduced array or scalar value,
8485
as specified by the axes parameter.
8586
"""
87+
88+
standard_methods = {
89+
'mean': self._standard_sum,
90+
'sum' : self._standard_sum,
91+
'max' : self._standard_max,
92+
'min' : self._standard_min
93+
}
94+
ret = None
95+
n = self._numel(method, axes=axis)
96+
8697
try:
8798
from activestorage.active import Active
8899
except ImportError:
89100
# Unable to import Active package. Default to using normal mean.
90101
print("ActiveWarning: Unable to import active module - defaulting to standard method.")
91-
return self._standard_mean(axes=axis, skipna=skipna, **kwargs)
92-
93-
active = Active(self.filename, self.address)
94-
active.method = "mean"
95-
extent = tuple(self.get_extent())
96-
data = active[extent]
97-
98-
if axis == None:
99-
axis = tuple([i for i in range(self.ndim)])
100-
101-
n = self._numel(axes=axis)
102-
103-
if len(axis) == self.ndim:
104-
105-
t = self._post_process_data(data) * n
102+
ret = {
103+
'n': n,
104+
'total': standard_methods[method](axes=axis, skipna=skipna, **kwargs)
105+
}
106106

107-
r = {
107+
if not ret:
108+
109+
active = Active(self.filename, self.address)
110+
active.method = method
111+
extent = tuple(self.get_extent())
112+
113+
if axis == None:
114+
axis = tuple([i for i in range(self.ndim)])
115+
116+
n = self._numel(method, axes=axis)
117+
118+
if len(axis) == self.ndim:
119+
data = active[extent]
120+
t = self._post_process_data(data) * n
121+
122+
ret = {
123+
'n': n,
124+
'total': t
125+
}
126+
127+
if not ret:
128+
# Experimental Recursive requesting to get each 1D column along the axes being requested.
129+
range_recursives = []
130+
for dim in range(self.ndim):
131+
if dim not in axis:
132+
range_recursives.append(range(extent[dim].start, extent[dim].stop))
133+
else:
134+
range_recursives.append(extent[dim])
135+
results = np.array(self._get_elements(active, range_recursives, hyperslab=[]))
136+
137+
t = self._post_process_data(results) * n
138+
ret = {
108139
'n': n,
109140
'total': t
110141
}
111-
return r
112-
113-
# Experimental Recursive requesting to get each 1D column along the axes being requested.
114-
range_recursives = []
115-
for dim in range(self.ndim):
116-
if dim not in axis:
117-
range_recursives.append(range(extent[dim].start, extent[dim].stop))
118-
else:
119-
range_recursives.append(extent[dim])
120-
results = np.array(self._get_elements(active, range_recursives, hyperslab=[]))
121-
122-
t = self._post_process_data(results) * n
123-
return {
124-
'n': n,
125-
'total': t
126-
}
142+
143+
if method == 'mean':
144+
return ret
145+
else:
146+
return ret['total']/ret['n']
127147

128148
def _get_elements(self, active, recursives, hyperslab=[]):
129149
dimarray = []
@@ -144,26 +164,3 @@ def _get_elements(self, active, recursives, hyperslab=[]):
144164
dimarray.append(self._get_elements(active, recursives[1:], hyperslab=newslab))
145165

146166
return dimarray
147-
148-
def _determine_chunk_space(chunks, shape, dims, chunk_limits=True):
149-
150-
if not chunks:
151-
return None
152-
153-
chunk_space = [1 for i in range(len(shape))]
154-
155-
max_chunks = np.prod(shape)
156-
if chunk_limits:
157-
max_chunks = int(max_chunks/ 2e6)
158-
159-
for x, d in enumerate(dims):
160-
if d not in chunks:
161-
continue
162-
163-
chunks_in_dim = chunks[d]
164-
if chunks_in_dim > max_chunks:
165-
chunks_in_dim = max_chunks
166-
167-
chunk_space[x] = int(shape[x]/chunks[d])
168-
169-
return tuple(chunk_space)

XarrayActive/active_dask.py

+125-16
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,43 @@
11
import dask.array as da
22
from dask.array.reductions import mean_agg, mean_combine
3+
from dask.utils import deepmap
4+
from dask.array.core import _concatenate2
5+
import numpy as np
36

4-
5-
def block_active_mean(arr, *args, **kwargs):
6-
if hasattr(arr,'active_mean'):
7-
return arr.active_mean(*args, **kwargs)
7+
def partition_mean(arr, *args, **kwargs):
8+
return partition_method(arr, 'mean', *args, **kwargs)
9+
10+
def partition_max(arr, *args, **kwargs):
11+
return partition_method(arr, 'max', *args, **kwargs)
12+
13+
def partition_min(arr, *args, **kwargs):
14+
return partition_method(arr, 'min', *args, **kwargs)
15+
16+
def partition_sum(arr, *args, **kwargs):
17+
return partition_method(arr, 'sum', *args, **kwargs)
18+
19+
def partition_method(arr, method, *args, **kwargs):
20+
if hasattr(arr,'active_method'):
21+
return arr.active_method(method,*args, **kwargs)
822
else:
923
# Here's where barebones Xarray might fall over - may need a non-CFA custom class.
24+
x=input('stopped')
1025
raise NotImplementedError
1126

27+
def general_combine(pairs, axis=None):
28+
if not isinstance(pairs, list):
29+
pairs = [pairs]
30+
return _concatenate2(pairs, axes=axis)
31+
32+
def max_agg(pairs, axis=None, **kwargs):
33+
return general_combine(pairs, axis=axis).max(axis=axis, **kwargs)
34+
35+
def min_agg(pairs, axis=None, **kwargs):
36+
return general_combine(pairs, axis=axis).min(axis=axis, **kwargs)
37+
38+
def sum_agg(pairs, axis=None, **kwargs):
39+
return general_combine(pairs, axis=axis).sum(axis=axis, **kwargs)
40+
1241
class DaskActiveArray(da.Array):
1342

1443
description = "Dask Array Wrapper enabling the use of Active Storage."
@@ -21,17 +50,22 @@ def copy(self):
2150
"""
2251
Create a new DaskActiveArray instance with all the same parameters as the current instance.
2352
"""
24-
return DaskActiveArray(self.dask, self.name, self.chunks, meta=self)
53+
copy_arr = DaskActiveArray(self.dask, self.name, self.chunks, meta=self)
54+
return copy_arr
2555

26-
def __getitem__(self, index):
27-
"""
28-
Perform indexing for this ActiveArray. May need to overwrite further if it turns out
29-
the indexing is performed **after** the dask `getter` method (i.e if retrieval and indexing
30-
are separate items on the dask graph). If this is the case, will need another `from_delayed`
31-
and `concatenation` method as used in ``active_mean``.
32-
"""
33-
arr = super().__getitem__(index)
34-
return DaskActiveArray(arr.dask, arr.name, arr.chunks, meta=arr)
56+
#def __getitem__(self, index):
57+
# """
58+
# Perform indexing for this ActiveArray. May need to overwrite further if it turns out
59+
# the indexing is performed **after** the dask `getter` method (i.e if retrieval and indexing
60+
# are separate items on the dask graph). If this is the case, will need another `from_delayed`
61+
# and `concatenation` method as used in ``active_mean``.
62+
# """
63+
# copy = self.copy()
64+
65+
# return copy._getitem(index)
66+
67+
#def _getitem(self, index):
68+
# return super().__getitem__(index)
3569

3670
def active_mean(self, axis=None, skipna=None):
3771
"""
@@ -48,12 +82,87 @@ def active_mean(self, axis=None, skipna=None):
4882
"""
4983

5084
newarr = da.reduction(
51-
self,
52-
block_active_mean,
85+
self.copy(),
86+
partition_mean,
5387
mean_agg,
5488
combine=mean_combine,
5589
axis=axis,
5690
dtype=self.dtype,
5791
)
5892

5993
return newarr
94+
95+
def active_max(self, axis=None, skipna=None):
96+
"""
97+
Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``.
98+
Combines the results of the dask delayed ``active_max`` operations on each block into a single dask Array,
99+
which is then mapped to a new DaskActiveArray object.
100+
101+
:param axis: (int) The index of the axis on which to perform the active max.
102+
103+
:param skipna: (bool) Skip NaN values when calculating the max.
104+
105+
:returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using
106+
the concatenations of active_means from each chunk.
107+
"""
108+
109+
newarr = da.reduction(
110+
self.copy(),
111+
partition_max,
112+
max_agg,
113+
combine=max_agg,
114+
axis=axis,
115+
dtype=self.dtype,
116+
)
117+
118+
return newarr
119+
120+
def active_min(self, axis=None, skipna=None):
121+
"""
122+
Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``.
123+
Combines the results of the dask delayed ``active_min`` operations on each block into a single dask Array,
124+
which is then mapped to a new DaskActiveArray object.
125+
126+
:param axis: (int) The index of the axis on which to perform the active min.
127+
128+
:param skipna: (bool) Skip NaN values when calculating the min.
129+
130+
:returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using
131+
the concatenations of active_means from each chunk.
132+
"""
133+
134+
newarr = da.reduction(
135+
self.copy(),
136+
partition_min,
137+
min_agg,
138+
combine=min_agg,
139+
axis=axis,
140+
dtype=self.dtype,
141+
)
142+
143+
return newarr
144+
145+
def active_sum(self, axis=None, skipna=None):
146+
"""
147+
Perform ``dask delayed`` active mean for each ``dask block`` which corresponds to a single ``chunk``.
148+
Combines the results of the dask delayed ``active_sum`` operations on each block into a single dask Array,
149+
which is then mapped to a new DaskActiveArray object.
150+
151+
:param axis: (int) The index of the axis on which to perform the active sum.
152+
153+
:param skipna: (bool) Skip NaN values when calculating the sum.
154+
155+
:returns: A new ``DaskActiveArray`` object which has been reduced along the specified axes using
156+
the concatenations of active_means from each chunk.
157+
"""
158+
159+
newarr = da.reduction(
160+
self.copy(),
161+
partition_sum,
162+
sum_agg,
163+
combine=sum_agg,
164+
axis=axis,
165+
dtype=self.dtype,
166+
)
167+
168+
return newarr

0 commit comments

Comments
 (0)