-
Notifications
You must be signed in to change notification settings - Fork 8
/
test_dask.py
144 lines (114 loc) · 4.37 KB
/
test_dask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# pip install "dask[array]"
# pip install graphviz
# This just tests some dask operations to see if they behave as expected - it doesn't exercise any of the code in this repo.
import dask.array as da
import numpy as np
import tempfile
import unittest
def data_file(path):
return "data/%s" % path
def tmp_dir():
return tempfile.TemporaryDirectory(".zarr").name
input_file = data_file("adata.csv")
class TestDask(unittest.TestCase):
def setUp(self):
self.arr = np.array(
[
[0.0, 1.0, 0.0, 3.0, 0.0],
[2.0, 0.0, 3.0, 4.0, 5.0],
[4.0, 0.0, 0.0, 6.0, 7.0],
]
)
self.arr_d = da.from_array(self.arr.copy(), chunks=(2, 5))
def test_scalar_arithmetic(self):
Xd = (((self.arr_d + 1) * 2) - 4) / 1.1
X = (((self.arr + 1) * 2) - 4) / 1.1
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_broadcast(self):
a = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
Xd = self.arr_d + a
X = self.arr + a
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_eq(self):
Xd = self.arr_d == 0.0
X = self.arr == 0.0
self.assertEqual(Xd.dtype, X.dtype)
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_ne(self):
Xd = self.arr_d != 0.0
X = self.arr != 0.0
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_invert(self):
Xd = ~(self.arr_d == 0.0)
X = ~(self.arr == 0.0)
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_inplace(self):
self.arr_d += 1
self.arr += 1
self.assertTrue(np.array_equal(self.arr_d.compute(), self.arr))
def test_boolean_index(self):
Xd = np.sum(self.arr_d, axis=1) # sum rows
Xd = Xd[Xd > 5]
X = np.sum(self.arr, axis=1) # sum rows
X = X[X > 5]
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_subset_cols(self):
subset = np.array([True, False, True, False, True])
Xd = self.arr_d[:, subset]
X = self.arr[:, subset]
self.assertEqual(Xd.shape, X.shape)
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_subset_rows(self):
subset = np.array([True, False, True])
Xd = self.arr_d[subset, :]
X = self.arr[subset, :]
self.assertEqual(Xd.shape, X.shape)
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_log1p(self):
log1pnps = np.log1p(self.arr_d).compute()
log1pnp = np.log1p(self.arr)
self.assertTrue(np.array_equal(log1pnps, log1pnp))
def test_sum_cols(self):
Xd = np.sum(self.arr_d, axis=0)
X = np.sum(self.arr, axis=0)
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_sum_rows(self):
Xd = np.sum(self.arr_d, axis=1)
X = np.sum(self.arr, axis=1)
self.assertTrue(np.array_equal(Xd.compute(), X))
def test_mean(self):
def mean(X):
return X.mean(axis=0)
meannps = mean(self.arr_d).compute()
meannp = mean(self.arr)
self.assertTrue(np.array_equal(meannps, meannp))
def test_var(self):
def var(X):
mean = X.mean(axis=0)
mean_sq = np.multiply(X, X).mean(axis=0)
return mean_sq - mean ** 2
varnps = var(self.arr_d).compute()
varnp = var(self.arr)
self.assertTrue(np.array_equal(varnps, varnp))
def test_scale(self):
def _get_mean_var(X):
mean = X.mean(axis=0)
mean_sq = np.multiply(X, X).mean(axis=0)
var = (mean_sq - mean ** 2) * (X.shape[0] / (X.shape[0] - 1))
return mean, var
def scale(X):
mean, var = _get_mean_var(X)
return (X - mean) / var
scale(self.arr_d)
scale(self.arr)
# Uncomment to produce a task graph
# scale(self.arr_d).visualize(filename='task_graph.svg')
self.assertTrue(np.array_equal(self.arr_d.compute(), self.arr))
def test_rechunk(self):
arr = np.array([[0.0], [1.0], [2.0], [3.0], [4.0], [5.0], [6.0]])
arr_d = da.from_array(arr.copy(), chunks=(3, 1))
subset = np.array([True, True, False, True, True, True, True])
Xd = arr_d[subset, :]
self.assertEqual(Xd.chunks, ((2, 3, 1), (1,)))
Xd = Xd.rechunk((3, 1))
self.assertEqual(Xd.chunks, ((3, 3), (1,)))