From d620c406925498935a595559db05edea753586a5 Mon Sep 17 00:00:00 2001 From: Joshua Gould Date: Tue, 17 Sep 2024 14:13:12 -0400 Subject: [PATCH] anndata --- cirrocumulus/abstract_backed_dataset.py | 6 +- cirrocumulus/sparse_dataset.py | 373 ------------------------ tests/test_sparse.py | 19 +- 3 files changed, 12 insertions(+), 386 deletions(-) delete mode 100644 cirrocumulus/sparse_dataset.py diff --git a/cirrocumulus/abstract_backed_dataset.py b/cirrocumulus/abstract_backed_dataset.py index 25baadb8..a997bff7 100644 --- a/cirrocumulus/abstract_backed_dataset.py +++ b/cirrocumulus/abstract_backed_dataset.py @@ -3,10 +3,10 @@ import pandas as pd import scipy.sparse from anndata import AnnData +from anndata._core.sparse_dataset import sparse_dataset from cirrocumulus.abstract_dataset import AbstractDataset from cirrocumulus.anndata_util import ADATA_LAYERS_UNS_KEY, ADATA_MODULE_UNS_KEY -from cirrocumulus.sparse_dataset import SparseDataset # string_dtype = h5py.check_string_dtype(dataset.dtype) @@ -73,8 +73,8 @@ def get_X(self, var_ids, keys, node): get_item = var_ids.get_indexer_for(keys) if self.is_group(node): - sparse_dataset = SparseDataset(node) # sparse - X = sparse_dataset[:, get_item] + ds = sparse_dataset(node) # sparse + X = ds[:, get_item] else: # dense X = self.slice_dense_array(node, get_item) var = pd.DataFrame(index=keys) diff --git a/cirrocumulus/sparse_dataset.py b/cirrocumulus/sparse_dataset.py deleted file mode 100644 index 53802e84..00000000 --- a/cirrocumulus/sparse_dataset.py +++ /dev/null @@ -1,373 +0,0 @@ -"""This module implements on disk sparse datasets. - -This code is based on and uses the conventions of h5sparse_ by `Appier Inc.`_. -See the copyright and license note in this directory source code. - -.. _h5sparse: https://github.com/appier/h5sparse -.. _Appier Inc.: https://www.appier.com/ -""" - -import collections.abc as cabc -from itertools import accumulate, chain -from typing import Iterable, NamedTuple, Sequence, Tuple, Type, Union - -import numpy as np -import scipy.sparse as ss -from anndata._core.index import _subset -from scipy.sparse import _sparsetools - - -try: - # Not really important, just for IDEs to be more helpful - from scipy.sparse.compressed import _cs_matrix -except ImportError: - _cs_matrix = ss.spmatrix - -Index1D = Union[slice, int, str, np.int64, np.ndarray] -Index = Union[Index1D, Tuple[Index1D, Index1D], ss.spmatrix] - - -def unpack_index(index: Index) -> tuple[Index1D, Index1D]: - if not isinstance(index, tuple): - return index, slice(None) - elif len(index) == 2: - return index - elif len(index) == 1: - return index[0], slice(None) - else: - raise IndexError("invalid number of indices") - - -class BackedFormat(NamedTuple): - format_str: str - backed_type: Type["BackedSparseMatrix"] - memory_type: Type[ss.spmatrix] - - -class BackedSparseMatrix(_cs_matrix): - """Mixin class for backed sparse matrices. - - Largely needed for the case `backed_sparse_csr(...)[:]`, since that calls copy on `.data`, - `.indices`, and `.indptr`. - """ - - def copy(self) -> ss.spmatrix: - if ss.issparse(self.data): - return super().copy() - else: - return SparseDataset(self.group).to_memory() - - def _set_many(self, i: Iterable[int], j: Iterable[int], x): - """Sets value at each (i, j) to x. - - Here (i,j) index major and minor respectively, and must not contain duplicate entries. - """ - # Scipy 1.3+ compat - n_samples = 1 if np.isscalar(x) else len(x) - offsets = self._offsets(i, j, n_samples) - - if -1 not in offsets: - # make a list for interaction with h5py or zarr - offsets = list(offsets) - # only affects existing non-zero cells - self.data[offsets] = x - return - - else: - raise ValueError("You cannot change the sparsity structure of a SparseDataset.") - # replace where possible - # mask = offsets > -1 - # # offsets[mask] - # bool_data_mask = np.zeros(len(self.data), dtype=bool) - # bool_data_mask[offsets[mask]] = True - # self.data[bool_data_mask] = x[mask] - # # self.data[offsets[mask]] = x[mask] - # # only insertions remain - # mask = ~mask - # i = i[mask] - # i[i < 0] += M - # j = j[mask] - # j[j < 0] += N - # self._insert_many(i, j, x[mask]) - - def _zero_many(self, i: Sequence[int], j: Sequence[int]): - """Sets value at each (i, j) to zero, preserving sparsity structure. - - Here (i,j) index major and minor respectively. - """ - offsets = self._offsets(i, j, len(i)) - - # only assign zeros to the existing sparsity structure - self.data[list(offsets[offsets > -1])] = 0 - - def _offsets(self, i: Iterable[int], j: Iterable[int], n_samples: int) -> np.ndarray: - i, j, M, N = self._prepare_indices(i, j) - offsets = np.empty(n_samples, dtype=self.indices.dtype) - ret = _sparsetools.csr_sample_offsets( - M, N, self.indptr, self.indices, n_samples, i, j, offsets - ) - if ret == 1: - # rinse and repeat - self.sum_duplicates() - _sparsetools.csr_sample_offsets( - M, N, self.indptr, self.indices, n_samples, i, j, offsets - ) - return offsets - - -class backed_csr_matrix(BackedSparseMatrix, ss.csr_matrix): - def _get_intXslice(self, row: int, col: slice) -> ss.csr_matrix: - return ss.csr_matrix(get_compressed_vector(self, row), shape=(1, self.shape[1]))[:, col] - - def _get_sliceXslice(self, row: slice, col: slice) -> ss.csr_matrix: - out_shape = ( - slice_len(row, self.shape[0]), - slice_len(col, self.shape[1]), - ) - if out_shape[0] == 1: - return self._get_intXslice(slice_as_int(row, self.shape[0]), col) - elif out_shape[1] == self.shape[1] and out_shape[0] < self.shape[0]: - return self._get_arrayXslice(np.arange(*row.indices(self.shape[0])), col) - return super()._get_sliceXslice(row, col) - - def _get_arrayXslice(self, row: Sequence[int], col: slice) -> ss.csr_matrix: - idxs = np.asarray(row) - if idxs.dtype == bool: - idxs = np.where(idxs) - return ss.csr_matrix(get_compressed_vectors(self, idxs), shape=(len(idxs), self.shape[1]))[ - :, col - ] - - -class backed_csc_matrix(BackedSparseMatrix, ss.csc_matrix): - def _get_sliceXint(self, row: slice, col: int) -> ss.csc_matrix: - return ss.csc_matrix(get_compressed_vector(self, col), shape=(self.shape[0], 1))[row, :] - - def _get_sliceXslice(self, row: slice, col: slice) -> ss.csc_matrix: - out_shape = ( - slice_len(row, self.shape[0]), - slice_len(col, self.shape[1]), - ) - if out_shape[1] == 1: - return self._get_sliceXint(row, slice_as_int(col, self.shape[1])) - elif out_shape[0] == self.shape[0] and out_shape[1] < self.shape[1]: - if col.step is not None: - return self._get_sliceXarray(row, np.arange(*col.indices(self.shape[1]))) - start = col.start - stop = col.stop - if stop is not None and stop > 0: - stop += 1 - if start is not None and start < 0: - start -= 1 - indptr_slice = slice(start, stop) - indptr = self.group["indptr"][indptr_slice] - data = self.group["data"][indptr[0] : indptr[-1]] - indices = self.group["indices"][indptr[0] : indptr[-1]] - indptr -= indptr[0] - shape = (self.shape[0], indptr.size - 1) - return ss.csc_matrix((data, indices, indptr), shape=shape) # much faster - # return self._get_sliceXarray(row, np.arange(*col.indices(self.shape[1]))) - return super()._get_sliceXslice(row, col) - - def _get_sliceXarray(self, row: slice, col: Sequence[int]) -> ss.csc_matrix: - idxs = np.asarray(col) - if idxs.dtype == bool: - idxs = np.where(idxs) - return ss.csc_matrix(get_compressed_vectors(self, idxs), shape=(self.shape[0], len(idxs)))[ - row, : - ] - - -FORMATS = [ - BackedFormat("csr", backed_csr_matrix, ss.csr_matrix), - BackedFormat("csc", backed_csc_matrix, ss.csc_matrix), -] - - -def slice_len(s: slice, dim: int) -> int: - """Returns length of `a[s]` where `len(a) == dim`.""" - return len(range(*s.indices(dim))) - - -def slice_as_int(s: slice, dim: int) -> int: - """Converts slices of length 1 to the integer index they’ll access.""" - out = list(range(*s.indices(dim))) - assert len(out) == 1 - return out[0] - - -def get_compressed_vectors( - x: BackedSparseMatrix, row_idxs: Iterable[int] -) -> Tuple[Sequence, Sequence, Sequence]: - slices = [slice(*(x.indptr[i : i + 2])) for i in row_idxs] - data = np.concatenate([x.data[s] for s in slices]) - indices = np.concatenate([x.indices[s] for s in slices]) - indptr = list(accumulate(chain((0,), (s.stop - s.start for s in slices)))) - return data, indices, indptr - - -def get_compressed_vector(x: BackedSparseMatrix, idx: int) -> Tuple[Sequence, Sequence, Sequence]: - s = slice(*(x.indptr[idx : idx + 2])) - data = x.data[s] - indices = x.indices[s] - indptr = [0, len(data)] - return data, indices, indptr - - -def get_format_str(data: ss.spmatrix) -> str: - for fmt, _, memory_class in FORMATS: - if isinstance(data, memory_class): - return fmt - raise ValueError(f"Data type {type(data)} is not supported.") - - -def get_memory_class(format_str: str) -> Type[ss.spmatrix]: - for fmt, _, memory_class in FORMATS: - if format_str == fmt: - return memory_class - raise ValueError(f"Format string {format_str} is not supported.") - - -def get_backed_class(format_str: str) -> Type[BackedSparseMatrix]: - for fmt, backed_class, _ in FORMATS: - if format_str == fmt: - return backed_class - raise ValueError(f"Format string {format_str} is not supported.") - - -class SparseDataset: - """Analogous to :class:`h5py.Dataset or zarr.Group`, but for sparse matrices.""" - - def __init__(self, group): - self.group = group - self.indptr = None - - @property - def dtype(self) -> np.dtype: - return self.group["data"].dtype - - @property - def format_str(self) -> str: - if "h5sparse_format" in self.group.attrs: - return self.group.attrs["h5sparse_format"] - else: - # Should this be an extra field? - return self.group.attrs["encoding-type"].replace("_matrix", "") - - @property - def name(self) -> str: - return self.group.name - - @property - def shape(self) -> Tuple[int, int]: - shape = self.group.attrs.get("h5sparse_shape") - return tuple(self.group.attrs["shape"] if shape is None else shape) - - @property - def value(self) -> ss.spmatrix: - return self.to_memory() - - def __repr__(self) -> str: - return ( - f"' - ) - - def __getitem__(self, index: Union[Index, Tuple[()]]) -> Union[float, ss.spmatrix]: - row, col = self._normalize_index(index) - mtx = self.to_backed() - return mtx[row, col] - - def __setitem__(self, index: Union[Index, Tuple[()]], value): - row, col = self._normalize_index(index) - mock_matrix = self.to_backed() - mock_matrix[row, col] = value - - def _normalize_index(self, index: Union[Index, Tuple[()]]) -> Tuple[np.ndarray, np.ndarray]: - if isinstance(index, tuple) and len(index) == 0: - index = slice(None) - row, col = unpack_index(index) - if all(isinstance(x, cabc.Iterable) for x in (row, col)): - row, col = np.ix_(row, col) - return row, col - - def append(self, sparse_matrix: ss.spmatrix): - # Prep variables - shape = self.shape - if isinstance(sparse_matrix, SparseDataset): - sparse_matrix = sparse_matrix.to_backed() - - # Check input - if not ss.isspmatrix(sparse_matrix): - raise NotImplementedError( - "Currently, only sparse matrices of equivalent format can be " - "appended to a SparseDataset." - ) - if self.format_str not in {"csr", "csc"}: - raise NotImplementedError( - f"The append method for format {self.format_str} " f"is not implemented." - ) - if self.format_str != get_format_str(sparse_matrix): - raise ValueError( - f"Matrices must have same format. Currently are " - f"{self.format_str!r} and {get_format_str(sparse_matrix)!r}" - ) - - # shape - if self.format_str == "csr": - assert ( - shape[1] == sparse_matrix.shape[1] - ), "CSR matrices must have same size of dimension 1 to be appended." - new_shape = (shape[0] + sparse_matrix.shape[0], shape[1]) - elif self.format_str == "csc": - assert ( - shape[0] == sparse_matrix.shape[0] - ), "CSC matrices must have same size of dimension 0 to be appended." - new_shape = (shape[0], shape[1] + sparse_matrix.shape[1]) - else: - assert False, "We forgot to update this branching to a new format" - if "h5sparse_shape" in self.group.attrs: - del self.group.attrs["h5sparse_shape"] - self.group.attrs["shape"] = new_shape - - # data - data = self.group["data"] - orig_data_size = data.shape[0] - data.resize((orig_data_size + sparse_matrix.data.shape[0],)) - data[orig_data_size:] = sparse_matrix.data - - # indptr - indptr = self.group["indptr"] - orig_data_size = indptr.shape[0] - append_offset = indptr[-1] - indptr.resize((orig_data_size + sparse_matrix.indptr.shape[0] - 1,)) - indptr[orig_data_size:] = sparse_matrix.indptr[1:].astype(np.int64) + append_offset - - # indices - indices = self.group["indices"] - orig_data_size = indices.shape[0] - indices.resize((orig_data_size + sparse_matrix.indices.shape[0],)) - indices[orig_data_size:] = sparse_matrix.indices - - def to_backed(self) -> BackedSparseMatrix: - format_class = get_backed_class(self.format_str) - mtx = format_class(self.shape, dtype=self.dtype) - mtx.group = self.group - mtx.data = self.group["data"] - mtx.indices = self.group["indices"] - mtx.indptr = self.group["indptr"] - return mtx - - def to_memory(self) -> ss.spmatrix: - format_class = get_memory_class(self.format_str) - mtx = format_class(self.shape, dtype=self.dtype) - mtx.data = self.group["data"][...] - mtx.indices = self.group["indices"][...] - mtx.indptr = self.group["indptr"][...] - return mtx - - -@_subset.register(SparseDataset) -def subset_sparsedataset(d, subset_idx): - return d[subset_idx] diff --git a/tests/test_sparse.py b/tests/test_sparse.py index 899893e7..ac3288a3 100644 --- a/tests/test_sparse.py +++ b/tests/test_sparse.py @@ -3,6 +3,7 @@ import numpy as np import pytest import anndata as ad +from anndata._core.sparse_dataset import sparse_dataset from anndata.tests.helpers import ( array_bool_subset, array_int_subset, @@ -12,8 +13,6 @@ ) from scipy import sparse -from cirrocumulus.sparse_dataset import SparseDataset - subset_func2 = subset_func @@ -49,8 +48,8 @@ def ondisk_equivalent_adata_zarr(tmp_path): csr_mem.write_zarr(csr_path) csc_mem.write_zarr(csc_path) - csr_disk = SparseDataset(zarr.open(csr_path / "X")) - csc_disk = SparseDataset(zarr.open(csc_path / "X")) + csr_disk = sparse_dataset(zarr.open(csr_path / "X")) + csc_disk = sparse_dataset(zarr.open(csc_path / "X")) return csr_mem, csr_disk, csc_disk @@ -122,7 +121,7 @@ def test_dataset_append_memory(tmp_path, sparse_format, append_method): with h5py.File(h5_path, "a") as f: ad._io.specs.write_elem(f, "mtx", a) - diskmtx = SparseDataset(f["mtx"]) + diskmtx = sparse_dataset(f["mtx"]) diskmtx.append(b) fromdisk = diskmtx.to_memory() @@ -147,8 +146,8 @@ def test_dataset_append_disk(tmp_path, sparse_format, append_method): with h5py.File(h5_path, "a") as f: ad._io.specs.write_elem(f, "a", a) ad._io.specs.write_elem(f, "b", b) - a_disk = SparseDataset(f["a"]) - b_disk = SparseDataset(f["b"]) + a_disk = sparse_dataset(f["a"]) + b_disk = sparse_dataset(f["b"]) a_disk.append(b_disk) fromdisk = a_disk.to_memory() @@ -173,8 +172,8 @@ def test_wrong_shape(tmp_path, sparse_format, a_shape, b_shape): with h5py.File(h5_path, "a") as f: ad._io.specs.write_elem(f, "a", a_mem) ad._io.specs.write_elem(f, "b", b_mem) - a_disk = SparseDataset(f["a"]) - b_disk = SparseDataset(f["b"]) + a_disk = sparse_dataset(f["a"]) + b_disk = sparse_dataset(f["b"]) with pytest.raises(AssertionError): a_disk.append(b_disk) @@ -186,7 +185,7 @@ def test_wrong_formats(tmp_path): with h5py.File(h5_path, "a") as f: ad._io.specs.write_elem(f, "base", base) - disk_mtx = SparseDataset(f["base"]) + disk_mtx = sparse_dataset(f["base"]) pre_checks = disk_mtx.to_memory() with pytest.raises(ValueError):