From 29aaf6cf561e027587b3b2eb2bb152e0134db8b0 Mon Sep 17 00:00:00 2001 From: Jeffrey Newman Date: Thu, 2 May 2024 11:56:14 -0500 Subject: [PATCH] Sharrow updates (#52) * add expr to error message * extra logging * fix deprecation * ruffen * dask_scheduler * faster sharrow for missing categoricals --- sharrow/aster.py | 24 +++++++++++++++++- sharrow/shared_memory.py | 30 +++++++++++++++++++--- sharrow/sparse.py | 2 +- sharrow/tests/test_categorical.py | 42 +++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 5 deletions(-) diff --git a/sharrow/aster.py b/sharrow/aster.py index 0e93e84..8762b27 100755 --- a/sharrow/aster.py +++ b/sharrow/aster.py @@ -408,7 +408,9 @@ def _replacement( if self.get_default or ( topname == pref_topname and not self.swallow_errors ): - raise KeyError(f"{topname}..{attr}") + raise KeyError( + f"{topname}..{attr}\nexpression={self.original_expr}" + ) # we originally raised a KeyError here regardless, but what if # we just give back the original node, and see if other spaces, # possibly fallback spaces, might work? If nothing works then @@ -1010,6 +1012,16 @@ def visit_Compare(self, node): f"\ncategories: {left_dictionary}", stacklevel=2, ) + # at this point, the right value is not in the left's categories, so + # it is guaranteed to be not equal to any of the categories. + if isinstance(node.ops[0], ast.Eq): + result = ast.Constant(False) + elif isinstance(node.ops[0], ast.NotEq): + result = ast.Constant(True) + else: + raise ValueError( + f"unexpected operator {node.ops[0]}" + ) from None if right_decoded is not None: result = ast.Compare( left=left.slice, @@ -1043,6 +1055,16 @@ def visit_Compare(self, node): f"\ncategories: {right_dictionary}", stacklevel=2, ) + # at this point, the left value is not in the right's categories, so + # it is guaranteed to be not equal to any of the categories. + if isinstance(node.ops[0], ast.Eq): + result = ast.Constant(False) + elif isinstance(node.ops[0], ast.NotEq): + result = ast.Constant(True) + else: + raise ValueError( + f"unexpected operator {node.ops[0]}" + ) from None if left_decoded is not None: result = ast.Compare( left=ast_Constant(left_decoded), diff --git a/sharrow/shared_memory.py b/sharrow/shared_memory.py index c8825b1..0098c06 100644 --- a/sharrow/shared_memory.py +++ b/sharrow/shared_memory.py @@ -3,6 +3,7 @@ import logging import os import pickle +import time import dask import dask.array as da @@ -247,7 +248,9 @@ def release_shared_memory(self): def delete_shared_memory_files(key): delete_shared_memory_files(key) - def to_shared_memory(self, key=None, mode="r+", _dupe=True): + def to_shared_memory( + self, key=None, mode="r+", _dupe=True, dask_scheduler="threads" + ): """ Load this Dataset into shared memory. @@ -262,9 +265,13 @@ def to_shared_memory(self, key=None, mode="r+", _dupe=True): An identifying key for this shared memory. Use the same key in `from_shared_memory` to recreate this Dataset elsewhere. mode : {‘r+’, ‘r’, ‘w+’, ‘c’}, optional - This methid returns a copy of the Dataset in shared memory. + This method returns a copy of the Dataset in shared memory. If memmapped, that copy can be opened in various modes. See numpy.memmap() for details. + dask_scheduler : str, default 'threads' + The scheduler to use when loading dask arrays into shared memory. + Typically "threads" for multi-threaded reads or "synchronous" + for single-threaded reads. See dask.compute() for details. Returns ------- @@ -287,6 +294,7 @@ def to_shared_memory(self, key=None, mode="r+", _dupe=True): def emit(k, a, is_coord): nonlocal names, wrappers, sizes, position if sparse is not None and isinstance(a.data, sparse.GCXS): + logger.info(f"preparing sparse array {a.name}") wrappers.append( { "sparse": True, @@ -308,6 +316,7 @@ def emit(k, a, is_coord): ) a_nbytes = a.data.nbytes else: + logger.info(f"preparing dense array {a.name}") wrappers.append( { "dims": a.dims, @@ -335,12 +344,15 @@ def emit(k, a, is_coord): emit(k, a, False) mem = create_shared_memory_array(key, size=position) + + logger.info("declaring shared memory buffer") if key.startswith("memmap:"): buffer = memoryview(mem) else: buffer = mem.buf tasks = [] + task_names = [] for w in wrappers: _is_sparse = w.get("sparse", False) _size = w["nbytes"] @@ -348,6 +360,7 @@ def emit(k, a, is_coord): _pos = w["position"] a = self._obj[_name] if _is_sparse: + logger.info(f"running load task: {_name} ({si_units(_size)})") ad = a.data _size_d = w["data.nbytes"] _size_i = w["indices.nbytes"] @@ -373,19 +386,30 @@ def emit(k, a, is_coord): mem_arr_i[:] = ad.indices[:] mem_arr_p[:] = ad.indptr[:] else: + logger.info(f"preparing load task: {_name} ({si_units(_size)})") mem_arr = np.ndarray( shape=a.shape, dtype=a.dtype, buffer=buffer[_pos : _pos + _size] ) if isinstance(a, xr.DataArray) and isinstance(a.data, da.Array): tasks.append(da.store(a.data, mem_arr, lock=False, compute=False)) + task_names.append(_name) else: mem_arr[:] = a[:] if tasks: - dask.compute(tasks, scheduler="threads") + t = time.time() + logger.info(f"running {len(tasks)} dask data load tasks") + if dask_scheduler == "synchronous": + for task, task_name in zip(tasks, task_names): + logger.info(f"running load task: {task_name}") + dask.compute(task, scheduler=dask_scheduler) + else: + dask.compute(tasks, scheduler=dask_scheduler) + logger.info(f"completed dask data load in {time.time()-t:.3f} seconds") if key.startswith("memmap:"): mem.flush() + logger.info("storing metadata in shared memory") create_shared_list( [pickle.dumps(self._obj.attrs)] + [pickle.dumps(i) for i in wrappers], key ) diff --git a/sharrow/sparse.py b/sharrow/sparse.py index 732735c..db3d71a 100644 --- a/sharrow/sparse.py +++ b/sharrow/sparse.py @@ -163,7 +163,7 @@ def apply_mapper(x): raise ImportError("sparse is not installed") sparse_data = sparse.GCXS( - sparse.COO((i_, j_), data, shape=shape), compressed_axes=(0,) + sparse.COO(np.stack((i_, j_)), data, shape=shape), compressed_axes=(0,) ) self._obj[f"_s_{name}"] = xr.DataArray( sparse_data, diff --git a/sharrow/tests/test_categorical.py b/sharrow/tests/test_categorical.py index c5ab0b1..cd2a4e0 100644 --- a/sharrow/tests/test_categorical.py +++ b/sharrow/tests/test_categorical.py @@ -177,6 +177,48 @@ def test_missing_categorical(): a = a.isel(expressions=0) assert all(a == np.asarray([1, 0, 1, 1, 1, 1])) + expr = "df.TourMode2 != 'BAD'" + with pytest.warns(UserWarning): + f8 = tree.setup_flow({expr: expr}, with_root_node_name="df") + a = f8.load_dataarray(dtype=np.int8) + a = a.isel(expressions=0) + assert all(a == np.asarray([1, 1, 1, 1, 1, 1])) + + expr = "'BAD' != df.TourMode2" + with pytest.warns(UserWarning): + f9 = tree.setup_flow({expr: expr}, with_root_node_name="df") + a = f9.load_dataarray(dtype=np.int8) + a = a.isel(expressions=0) + assert all(a == np.asarray([1, 1, 1, 1, 1, 1])) + + expr = "(df.TourMode2 == 'BAD') * 2" + with pytest.warns(UserWarning): + fA = tree.setup_flow({expr: expr}, with_root_node_name="df") + a = fA.load_dataarray(dtype=np.int8) + a = a.isel(expressions=0) + assert all(a == np.asarray([0, 0, 0, 0, 0, 0])) + + expr = "(df.TourMode2 == 'BAD') * 2.2" + with pytest.warns(UserWarning): + fB = tree.setup_flow({expr: expr}, with_root_node_name="df") + a = fB.load_dataarray(dtype=np.int8) + a = a.isel(expressions=0) + assert all(a == np.asarray([0, 0, 0, 0, 0, 0])) + + expr = "np.exp(df.TourMode2 == 'BAD') * 2.2" + with pytest.warns(UserWarning): + fC = tree.setup_flow({expr: expr}, with_root_node_name="df") + a = fC.load_dataarray(dtype=np.float32) + a = a.isel(expressions=0) + assert all(a == np.asarray([2.2, 2.2, 2.2, 2.2, 2.2, 2.2], dtype=np.float32)) + + expr = "(df.TourMode2 != 'BAD') * 2" + with pytest.warns(UserWarning): + fD = tree.setup_flow({expr: expr}, with_root_node_name="df") + a = fD.load_dataarray(dtype=np.int8) + a = a.isel(expressions=0) + assert all(a == np.asarray([2, 2, 2, 2, 2, 2])) + def test_categorical_indexing(tours_dataset: xr.Dataset, skims_dataset: xr.Dataset): tree = sharrow.DataTree(tours=tours_dataset)