Skip to content

Commit

Permalink
Merge pull request #23 from rea1991/refactoring/step1
Browse files Browse the repository at this point in the history
Refactor `stripepy.step_1()` to improve code clarity and simplify unit testing.
  • Loading branch information
robomics authored Nov 22, 2024
2 parents 2f68e25 + 14b671f commit 689cbba
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 30 deletions.
166 changes: 141 additions & 25 deletions src/stripepy/stripepy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import pathlib
import time
from typing import Dict, List, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd
import scipy.sparse as ss
import seaborn as sns
Expand All @@ -11,39 +14,138 @@
from .utils import TDA, finders, regressions, stripe


def log_transform(I: ss.csr_matrix) -> ss.csr_matrix:
def _log_transform(I: ss.csr_matrix) -> ss.csr_matrix:
"""
Apply a log-transform to a sparse matrix ignoring (i.e. dropping) NaNs.
Parameters
----------
I : ss.csr_matrix
the sparse matrix to be transformed
Returns
-------
ss.csr_matrix
the log-transformed sparse matrix
"""

I.data[np.isnan(I.data)] = 0
I.eliminate_zeros()
Iproc = I.log1p()
return Iproc


def step_1(I, genomic_belt, resolution, RoI=None, output_folder=None):

print("1.1) Log-transformation...")
Iproc = log_transform(I)

print("1.2) Focusing on a neighborhood of the main diagonal...")
matrix_belt = int(genomic_belt / resolution)

LT_Iproc = ss.tril(Iproc, k=0, format="csr") - ss.tril(Iproc, k=-matrix_belt, format="csr")
UT_Iproc = ss.triu(Iproc, k=0, format="csr") - ss.triu(Iproc, k=matrix_belt, format="csr")

# Scaling
print("1.3) Projection onto [0, 1]...")
scaling_factor_Iproc = Iproc.max()
Iproc /= scaling_factor_Iproc
LT_Iproc /= scaling_factor_Iproc
UT_Iproc /= scaling_factor_Iproc

def _band_extraction(I: ss.csr_matrix, resolution: int, genomic_belt: int) -> Tuple[ss.csr_matrix, ss.csr_matrix]:
"""
Given a symmetric sparse matrix in CSR format, do the following:
* Split the input matrix into a upper/lower-triangular matrices
* Zero (i.e. drop) all values that lie outside the first genomic_belt // resolution diagonals
Parameters
----------
I : ss.csr_matrix
the sparse matrix to be processed
resolution : int
the genomic resolution of the sparse matrix I
genomic_belt: int
the width of the genomic belt to be extracted
Returns
-------
Tuple[ss.csr_matrix, ss.csr_matrix]
2 elements tuple with the lower-triangular and upper-triangular matrix after band extraction
"""

assert resolution > 0
assert genomic_belt > 0

matrix_belt = genomic_belt // resolution
LT_I = ss.tril(I, k=0, format="csr") - ss.tril(I, k=-matrix_belt, format="csr")
UT_I = ss.triu(I, k=0, format="csr") - ss.triu(I, k=matrix_belt, format="csr")
return LT_I, UT_I


def _scale_Iproc(
I: ss.csr_matrix, LT_I: ss.csr_matrix, UT_I: ss.csr_matrix
) -> Tuple[ss.csr_matrix, ss.csr_matrix, ss.csr_matrix]:
"""
Rescale matrices LT_I and UT_I based on the maximum value found in matrix I
Parameters
----------
I : ss.csr_matrix
the sparse matrix used to compute the scaling factor
LT_I : ss.csr_matrix
the lower-triangular sparse matrix to be rescaled
UT_I : ss.csr_matrix
the upper-triangular sparse matrix to be rescaled
Returns
-------
Tuple[ss.csr_matrix, ss.csr_matrix]
the rescaled lower and upper-triangular matrices
"""

scaling_factor_Iproc = I.max()
return tuple(J / scaling_factor_Iproc for J in [I, LT_I, UT_I]) # noqa


def _extract_RoIs(I: ss.csr_matrix, RoI: Dict[str, List[int]]) -> npt.NDArray:
"""
Extract a region of interest (ROI) from the sparse matrix I
Parameters
----------
I: ss.csr_matrix
the sparse matrix to be processed
RoI: Dict[str, List[int]]
dictionary with the region of interest in matrix ('matrix') and genomic ('genomic') coordinates
Returns
-------
npt.NDArray
dense matrix with the interactions for the regions of interest
"""

rows = cols = slice(RoI["matrix"][0], RoI["matrix"][1])
I_RoI = I[rows, cols].toarray()
return I_RoI


def _plot_RoIs(
I: ss.csr_matrix, Iproc: ss.csr_matrix, RoI: Union[npt.NDArray, None], output_folder: Union[pathlib.Path, None]
) -> Union[npt.NDArray, None]:
"""
Helper function to plot a region of interest.
This function does nothing when RoI is None.
If output_folder is None (but RoI is not), then this function simply extracts the interactions
that would've been used for plotting, but generates no plots.
Parameters
----------
I: ss.csr_matrix
the unprocessed input sparse matrix
Iproc: ss.csr_matrix
the processed input sparse matrix
RoI: Union[npt.NDArray, None]
the region of interest to be plotted in matrix ('matrix') and genomic ('genomic') coordinates
output_folder: pathlib.Path
folder where to save the plots
Returns
-------
Union[npt.NDArray, None]
the dense matrix used for plotting or None when RoI is None
"""

# TODO rea1991 Once there is better test coverage, rewrite this as suggested in in #16
if RoI is not None:
print("1.4) Extracting a Region of Interest (RoI) for plot purposes...")
rows = cols = slice(RoI["matrix"][0], RoI["matrix"][1])
I_RoI = I[rows, cols].toarray()
Iproc_RoI = Iproc[rows, cols].toarray()
I_RoI = _extract_RoIs(I, RoI)
Iproc_RoI = _extract_RoIs(Iproc, RoI)

if output_folder is not None:

# Plots:
IO.HiC(
I_RoI,
Expand All @@ -62,8 +164,22 @@ def step_1(I, genomic_belt, resolution, RoI=None, output_folder=None):
compactify=False,
)
else:
I_RoI = None
Iproc_RoI = None
Iproc_RoI = None # TODO handle this case in _extract_RoIs
return Iproc_RoI


def step_1(I, genomic_belt, resolution, RoI=None, output_folder=None):

print("1.1) Log-transformation...")
Iproc = _log_transform(I)

print("1.2) Focusing on a neighborhood of the main diagonal...")
LT_Iproc, UT_Iproc = _band_extraction(Iproc, resolution, genomic_belt)

print("1.3) Projection onto [0, 1]...")
Iproc, LT_Iproc, UT_Iproc = _scale_Iproc(Iproc, LT_Iproc, UT_Iproc)

Iproc_RoI = _plot_RoIs(I, Iproc, RoI, output_folder)

return LT_Iproc, UT_Iproc, Iproc_RoI

Expand Down
13 changes: 12 additions & 1 deletion src/stripepy/utils/stripe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
class Stripe:

def __init__(self, seed=None, left_bw=None, right_bw=None, upp_bw=None, low_bw=None, top_pers=None, where=None):

# TODO: robomics
# -) make left_bw, right_bw, upp_bw, and low_bw positional and mandatory
# -) where should be optional keyword: if not specified, it can be determined by the four mandatory values
# -) top_pers is in [0, +inf]
# -) where is "lower_triangular", "upper_triangular", or None (if not set)

self.seed = seed
self.persistence = top_pers
self.L_bound = left_bw
Expand All @@ -18,6 +25,9 @@ def __init__(self, seed=None, left_bw=None, right_bw=None, upp_bw=None, low_bw=N

def compute_biodescriptors(self, I):

# TODO: robomics
# Check coordinates

if self.where == "lower_triangular":
convex_comb = int(round(0.99 * self.U_bound + 0.01 * self.D_bound))
rows = slice(convex_comb, self.D_bound)
Expand Down Expand Up @@ -101,10 +111,11 @@ def compute_biodescriptors(self, I):
* 100
)
if self.outer_descriptors["mean"] != 0
else -1.0
else -1.0 # TODO rea1991: check if nan is more suitable
)

else:
# TODO rea1991 Check if still a problem, otherwise remove!
self.inner_descriptors["five-number"] = [-1, -1, -1, -1, -1]
self.inner_descriptors["mean"] = -1
self.inner_descriptors["std"] = -1
Expand Down
8 changes: 4 additions & 4 deletions test/test_stripepy.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import numpy as np
import scipy.sparse as ss

from stripepy.stripepy import log_transform
from stripepy.stripepy import _log_transform


class TestLogTransform:

def test_empty(self):
I = ss.csr_matrix((0, 0), dtype=float)
Iproc = log_transform(I)
Iproc = _log_transform(I)

assert Iproc.shape == (0, 0)
assert np.issubdtype(I.dtype, np.floating)

def test_all_finite(self):
I = ss.rand(100, 100, density=0.5, format="csr")
Iproc = log_transform(I)
Iproc = _log_transform(I)

assert I.size == Iproc.size
assert I.shape == Iproc.shape
Expand All @@ -28,7 +28,7 @@ def test_with_nans(self):
num_nan_values = (I.data >= mean_I).sum()
I.data[I.data >= mean_I] = np.nan

Iproc = log_transform(I)
Iproc = _log_transform(I)

assert np.isfinite(Iproc.data).all()
assert Iproc.size == size_I - num_nan_values

0 comments on commit 689cbba

Please sign in to comment.