Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring/step1 #23

Merged
merged 9 commits into from
Nov 22, 2024
166 changes: 141 additions & 25 deletions src/stripepy/stripepy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import pathlib
import time
from typing import Dict, List, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd
import scipy.sparse as ss
import seaborn as sns
Expand All @@ -11,39 +14,138 @@
from .utils import TDA, finders, regressions, stripe


def log_transform(I: ss.csr_matrix) -> ss.csr_matrix:
def _log_transform(I: ss.csr_matrix) -> ss.csr_matrix:
"""
Apply a log-transform to a sparse matrix ignoring (i.e. dropping) NaNs.

Parameters
----------
I : ss.csr_matrix
the sparse matrix to be transformed

Returns
-------
ss.csr_matrix
the log-transformed sparse matrix
"""

I.data[np.isnan(I.data)] = 0
I.eliminate_zeros()
Iproc = I.log1p()
return Iproc


def step_1(I, genomic_belt, resolution, RoI=None, output_folder=None):

print("1.1) Log-transformation...")
Iproc = log_transform(I)

print("1.2) Focusing on a neighborhood of the main diagonal...")
matrix_belt = int(genomic_belt / resolution)

LT_Iproc = ss.tril(Iproc, k=0, format="csr") - ss.tril(Iproc, k=-matrix_belt, format="csr")
UT_Iproc = ss.triu(Iproc, k=0, format="csr") - ss.triu(Iproc, k=matrix_belt, format="csr")

# Scaling
print("1.3) Projection onto [0, 1]...")
scaling_factor_Iproc = Iproc.max()
Iproc /= scaling_factor_Iproc
LT_Iproc /= scaling_factor_Iproc
UT_Iproc /= scaling_factor_Iproc

def _band_extraction(I: ss.csr_matrix, resolution: int, genomic_belt: int) -> Tuple[ss.csr_matrix, ss.csr_matrix]:
"""
Given a symmetric sparse matrix in CSR format, do the following:

* Split the input matrix into a upper/lower-triangular matrices
* Zero (i.e. drop) all values that lie outside the first genomic_belt // resolution diagonals

Parameters
----------
I : ss.csr_matrix
the sparse matrix to be processed
resolution : int
the genomic resolution of the sparse matrix I
genomic_belt: int
the width of the genomic belt to be extracted

Returns
-------
Tuple[ss.csr_matrix, ss.csr_matrix]
2 elements tuple with the lower-triangular and upper-triangular matrix after band extraction
"""

assert resolution > 0
assert genomic_belt > 0

matrix_belt = genomic_belt // resolution
LT_I = ss.tril(I, k=0, format="csr") - ss.tril(I, k=-matrix_belt, format="csr")
UT_I = ss.triu(I, k=0, format="csr") - ss.triu(I, k=matrix_belt, format="csr")
return LT_I, UT_I


def _scale_Iproc(
I: ss.csr_matrix, LT_I: ss.csr_matrix, UT_I: ss.csr_matrix
) -> Tuple[ss.csr_matrix, ss.csr_matrix, ss.csr_matrix]:
"""
Rescale matrices LT_I and UT_I based on the maximum value found in matrix I

Parameters
----------
I : ss.csr_matrix
the sparse matrix used to compute the scaling factor
LT_I : ss.csr_matrix
the lower-triangular sparse matrix to be rescaled
UT_I : ss.csr_matrix
the upper-triangular sparse matrix to be rescaled

Returns
-------
Tuple[ss.csr_matrix, ss.csr_matrix]
the rescaled lower and upper-triangular matrices
"""

scaling_factor_Iproc = I.max()
return tuple(J / scaling_factor_Iproc for J in [I, LT_I, UT_I]) # noqa


def _extract_RoIs(I: ss.csr_matrix, RoI: Dict[str, List[int]]) -> npt.NDArray:
"""
Extract a region of interest (ROI) from the sparse matrix I

Parameters
----------
I: ss.csr_matrix
the sparse matrix to be processed
RoI: Dict[str, List[int]]
dictionary with the region of interest in matrix ('matrix') and genomic ('genomic') coordinates

Returns
-------
npt.NDArray
dense matrix with the interactions for the regions of interest
"""

rows = cols = slice(RoI["matrix"][0], RoI["matrix"][1])
I_RoI = I[rows, cols].toarray()
return I_RoI


def _plot_RoIs(
I: ss.csr_matrix, Iproc: ss.csr_matrix, RoI: Union[npt.NDArray, None], output_folder: Union[pathlib.Path, None]
) -> Union[npt.NDArray, None]:
"""
Helper function to plot a region of interest.
This function does nothing when RoI is None.
If output_folder is None (but RoI is not), then this function simply extracts the interactions
that would've been used for plotting, but generates no plots.

Parameters
----------
I: ss.csr_matrix
the unprocessed input sparse matrix
Iproc: ss.csr_matrix
the processed input sparse matrix
RoI: Union[npt.NDArray, None]
the region of interest to be plotted in matrix ('matrix') and genomic ('genomic') coordinates
output_folder: pathlib.Path
folder where to save the plots

Returns
-------
Union[npt.NDArray, None]
the dense matrix used for plotting or None when RoI is None
"""

# TODO rea1991 Once there is better test coverage, rewrite this as suggested in in #16
if RoI is not None:
print("1.4) Extracting a Region of Interest (RoI) for plot purposes...")
rows = cols = slice(RoI["matrix"][0], RoI["matrix"][1])
I_RoI = I[rows, cols].toarray()
Iproc_RoI = Iproc[rows, cols].toarray()
I_RoI = _extract_RoIs(I, RoI)
Iproc_RoI = _extract_RoIs(Iproc, RoI)

if output_folder is not None:

# Plots:
IO.HiC(
I_RoI,
Expand All @@ -62,8 +164,22 @@ def step_1(I, genomic_belt, resolution, RoI=None, output_folder=None):
compactify=False,
)
else:
I_RoI = None
Iproc_RoI = None
Iproc_RoI = None # TODO handle this case in _extract_RoIs
return Iproc_RoI


def step_1(I, genomic_belt, resolution, RoI=None, output_folder=None):

print("1.1) Log-transformation...")
Iproc = _log_transform(I)

print("1.2) Focusing on a neighborhood of the main diagonal...")
LT_Iproc, UT_Iproc = _band_extraction(Iproc, resolution, genomic_belt)

print("1.3) Projection onto [0, 1]...")
Iproc, LT_Iproc, UT_Iproc = _scale_Iproc(Iproc, LT_Iproc, UT_Iproc)

Iproc_RoI = _plot_RoIs(I, Iproc, RoI, output_folder)

return LT_Iproc, UT_Iproc, Iproc_RoI

Expand Down
13 changes: 12 additions & 1 deletion src/stripepy/utils/stripe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
class Stripe:

def __init__(self, seed=None, left_bw=None, right_bw=None, upp_bw=None, low_bw=None, top_pers=None, where=None):

# TODO: robomics
# -) make left_bw, right_bw, upp_bw, and low_bw positional and mandatory
# -) where should be optional keyword: if not specified, it can be determined by the four mandatory values
# -) top_pers is in [0, +inf]
# -) where is "lower_triangular", "upper_triangular", or None (if not set)

self.seed = seed
self.persistence = top_pers
self.L_bound = left_bw
Expand All @@ -18,6 +25,9 @@ def __init__(self, seed=None, left_bw=None, right_bw=None, upp_bw=None, low_bw=N

def compute_biodescriptors(self, I):

# TODO: robomics
# Check coordinates

if self.where == "lower_triangular":
convex_comb = int(round(0.99 * self.U_bound + 0.01 * self.D_bound))
rows = slice(convex_comb, self.D_bound)
Expand Down Expand Up @@ -101,10 +111,11 @@ def compute_biodescriptors(self, I):
* 100
)
if self.outer_descriptors["mean"] != 0
else -1.0
else -1.0 # TODO rea1991: check if nan is more suitable
)

else:
# TODO rea1991 Check if still a problem, otherwise remove!
self.inner_descriptors["five-number"] = [-1, -1, -1, -1, -1]
self.inner_descriptors["mean"] = -1
self.inner_descriptors["std"] = -1
Expand Down
8 changes: 4 additions & 4 deletions test/test_stripepy.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import numpy as np
import scipy.sparse as ss

from stripepy.stripepy import log_transform
from stripepy.stripepy import _log_transform


class TestLogTransform:

def test_empty(self):
I = ss.csr_matrix((0, 0), dtype=float)
Iproc = log_transform(I)
Iproc = _log_transform(I)

assert Iproc.shape == (0, 0)
assert np.issubdtype(I.dtype, np.floating)

def test_all_finite(self):
I = ss.rand(100, 100, density=0.5, format="csr")
Iproc = log_transform(I)
Iproc = _log_transform(I)

assert I.size == Iproc.size
assert I.shape == Iproc.shape
Expand All @@ -28,7 +28,7 @@ def test_with_nans(self):
num_nan_values = (I.data >= mean_I).sum()
I.data[I.data >= mean_I] = np.nan

Iproc = log_transform(I)
Iproc = _log_transform(I)

assert np.isfinite(Iproc.data).all()
assert Iproc.size == size_I - num_nan_values