Skip to content

Commit

Permalink
Finish tests for AAclust, add coverage and correlation
Browse files Browse the repository at this point in the history
  • Loading branch information
breimanntools committed Oct 2, 2023
1 parent e7137e5 commit a5bb440
Show file tree
Hide file tree
Showing 284 changed files with 593 additions and 113 deletions.
Binary file modified aaanalysis/__pycache__/utils.cpython-39.pyc
Binary file not shown.
62 changes: 62 additions & 0 deletions aaanalysis/_archive/_archive_aaclust.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
This is a script for the AAclust().compute_correlation().
"""

import numpy as np
from aaanalysis.aaclust._aaclust import _cluster_center, min_cor_all


def compute_correlation(X, X_ref, labels=None, labels_ref=None, n=3, positive=True, on_center=False):
"""
Computes the Pearson correlation of given data with reference data.
Parameters
----------
X : `array-like, shape (n_samples, n_features)`
Feature matrix. Rows correspond to scales and columns to amino acids.
X_ref : `array-like, shape (n_samples, n_features)`
Feature matrix of reference data.
labels : `array-like, shape (n_samples, )`
Cluster labels for each sample in ``X``.
labels_ref : `array-like, shape (n_samples, )`
Cluster labels for the reference data.
n
Number of top centers to consider based on correlation strength.
positive
If True, considers positive correlations. Else, negative correlations.
on_center
If True, correlation is computed with cluster centers. Otherwise, with all cluster members.
Returns
-------
list_top_center_name_corr : list
Names and correlations of centers having the strongest (positive/negative) correlation with test data samples.
"""
names_ref = list(dict.fromkeys(labels_ref))
masks_ref = [[i == label for i in labels_ref] for label in names_ref]
if on_center:
# Get centers for all clusters in reference data
centers = np.concatenate([_cluster_center(X_ref[mask]) for mask in masks_ref], axis=0)
# Compute correlation of test data with centers
Xtest_centers = np.concatenate([X, centers], axis=0)
n_test = X.shape[0]
X_corr = np.corrcoef(Xtest_centers)[:n_test, n_test:]
else:
masks_test = [[True if i == j else False for j in range(0, len(labels))] for i, _ in enumerate(labels)]
# Compute minimum correlation of test data with each group of reference data
X_corr = np.array(
[[min_cor_all(np.concatenate([X[mask_test], X_ref[mask_ref]], axis=0)) for mask_ref in masks_ref] for
mask_test in masks_test])
# Get index for n centers with highest/lowest correlation for each scale
if positive:
list_top_center_ind = X_corr.argsort()[:, -n:][:, ::-1]
else:
list_top_center_ind = X_corr.argsort()[:, :n]
# Get name and correlation for centers correlating strongest (positive/negative) with test data samples
list_top_center_name_corr = []
for i, ind in enumerate(list_top_center_ind):
top_corr = X_corr[i, :][ind]
top_names = [names_ref[x] for x in ind]
str_corr = ";".join([f"{name} ({round(corr, 3)})" for name, corr in zip(top_names, top_corr)])
list_top_center_name_corr.append(str_corr)
return list_top_center_name_corr
Binary file modified aaanalysis/_utils/__pycache__/_check_data.cpython-39.pyc
Binary file not shown.
Binary file modified aaanalysis/_utils/__pycache__/_check_type.cpython-39.pyc
Binary file not shown.
12 changes: 9 additions & 3 deletions aaanalysis/_utils/_check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,13 @@ def check_X_unique_samples(X, min_n_unique_samples=3):
def check_labels(labels=None):
""""""
if labels is None:
raise ValueError("'labels' should not be None")

raise ValueError(f"'labels' should not be None.")
# Convert labels to a numpy array if it's not already
labels = np.asarray(labels)

unique_labels = set(labels)
if len(unique_labels) == 1:
raise ValueError(f"'labels' should contain more than one different value ({unique_labels})")
raise ValueError(f"'labels' should contain more than one different value ({unique_labels}).")
wrong_types = [l for l in unique_labels if not np.issubdtype(type(l), np.integer)]
if wrong_types:
raise ValueError(f"Labels in 'labels' should be type int, but contain: {set(map(type, wrong_types))}")
Expand All @@ -79,6 +78,13 @@ def check_match_X_labels(X=None, labels=None):
if n_samples != len(labels):
raise ValueError(f"n_samples does not match for 'X' ({len(X)}) and 'labels' ({len(labels)}).")

# Check sets
def check_superset_subset(subset=None, superset=None, name_subset=None, name_superset=None):
""""""
wrong_elements = [x for x in subset if x not in superset]
if len(wrong_elements) != 0:
raise ValueError(f"'{name_superset}' does not contain the following elements of '{name_subset}': {wrong_elements}")


# df checking functions
def check_col_in_df(df=None, name_df=None, col=None, col_type=None, accept_nan=False, error_if_exists=False):
Expand Down
18 changes: 15 additions & 3 deletions aaanalysis/_utils/_check_type.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
Basic utility check functions for type checking
"""
import pandas as pd
import numpy as np


# Type checking functions
def check_number_val(name=None, val=None, accept_none=False, just_int=False):
Expand Down Expand Up @@ -67,12 +70,21 @@ def check_tuple(name=None, val=None, n=None, check_n=True, accept_none=False):
raise ValueError(f"'{name}' ({val}) should be a tuple with {n} elements.")


def check_list(name=None, val=None, accept_none=False):
def check_list(name=None, val=None, accept_none=False, convert=True):
""""""
if accept_none and val is None:
return None
if not isinstance(val, list):
raise ValueError(f"'{name}' (type: {type(val)}) should be a list.")
if not convert:
if not isinstance(val, list):
raise ValueError(f"'{name}' (type: {type(val)}) should be a list.")
else:
allowed_types = (list, tuple, np.ndarray, pd.Series)
if not isinstance(val, allowed_types):
raise ValueError(f"'{name}' (type: {type(val)}) should be one of {allowed_types}.")
if isinstance(val, np.ndarray) and val.ndim != 1:
raise ValueError(f"'{name}' is a multi-dimensional numpy array and cannot be considered as a list.")
val = list(val)
return val

# Check special types
def check_ax(ax=None, accept_none=False):
Expand Down
Binary file modified aaanalysis/aaclust/__pycache__/_aaclust_statics.cpython-39.pyc
Binary file not shown.
Binary file modified aaanalysis/aaclust/__pycache__/aaclust.cpython-39.pyc
Binary file not shown.
75 changes: 44 additions & 31 deletions aaanalysis/aaclust/_aaclust_statics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,52 @@
from aaanalysis.aaclust._aaclust import _cluster_center, compute_medoids, min_cor_all

# I Helper Functions
def _sort_X_labels_names(X, labels=None, names=None):
""""""
sorted_order = np.argsort(labels)
labels = [labels[i] for i in sorted_order]
X = X[sorted_order]
if names:
names = [names[i] for i in sorted_order]
return X, labels, names

def _get_df_corr(X=None, X_ref=None):
""""""
# Temporary labels to avoid any confusion with potential duplicates
X_labels = range(len(X))
X_ref_labels = range(len(X), len(X) + len(X_ref))
combined = np.vstack((X, X_ref))
df_corr_full = pd.DataFrame(combined.T).corr()
# Select only the rows corresponding to X and columns corresponding to X_ref
df_corr = df_corr_full.loc[X_labels, X_ref_labels]
return df_corr


# II Main Functions
def compute_correlation(X, X_ref, labels=None, labels_ref=None, n=3, positive=True, on_center=False):
def compute_correlation(X, X_ref=None, labels=None, labels_ref=None, names=None, names_ref=None):
"""Computes Pearson correlation of given data with reference data."""
names_ref = list(dict.fromkeys(labels_ref))
masks_ref = [[i == label for i in labels_ref] for label in names_ref]
if on_center:
# Get centers for all clusters in reference data
centers = np.concatenate([_cluster_center(X_ref[mask]) for mask in masks_ref], axis=0)
# Compute correlation of test data with centers
Xtest_centers = np.concatenate([X, centers], axis=0)
n_test = X.shape[0]
X_corr = np.corrcoef(Xtest_centers)[:n_test, n_test:]
# Sort based on labels
X, labels, names = _sort_X_labels_names(X, labels=labels, names=names)
if X_ref is not None:
X_ref, labels_ref, names_ref = _sort_X_labels_names(X_ref, labels=labels_ref, names=names_ref)
# Compute correlations
if X_ref is None:
df_corr = pd.DataFrame(X.T).corr()
else:
masks_test = [[True if i == j else False for j in range(0, len(labels))] for i, _ in enumerate(labels)]
# Compute minimum correlation of test data with each group of reference data
X_corr = np.array(
[[min_cor_all(np.concatenate([X[mask_test], X_ref[mask_ref]], axis=0)) for mask_ref in masks_ref] for
mask_test in masks_test])
# Get index for n centers with highest/lowest correlation for each scale
if positive:
list_top_center_ind = X_corr.argsort()[:, -n:][:, ::-1]
df_corr = _get_df_corr(X=X, X_ref=X_ref)
# Replace indexes and columns with names or labels
df_corr.index = names if names else labels
if X_ref is None:
df_corr.columns = names if names else labels
else:
list_top_center_ind = X_corr.argsort()[:, :n]
# Get name and correlation for centers correlating strongest (positive/negative) with test data samples
list_top_center_name_corr = []
for i, ind in enumerate(list_top_center_ind):
top_corr = X_corr[i, :][ind]
top_names = [names_ref[x] for x in ind]
str_corr = ";".join([f"{name} ({round(corr, 3)})" for name, corr in zip(top_names, top_corr)])
list_top_center_name_corr.append(str_corr)
return list_top_center_name_corr
df_corr.columns = names_ref if names_ref else labels_ref
return df_corr


# Obtain cluster names
def _get_cluster_names(list_names=None, name_medoid=None, name_unclassified="Unclassified"):
def _get_cluster_names(list_names=None, name_medoid=None,
name_unclassified="Unclassified",
shorten_names=True):
"""
Get list of cluster names sorted based on following criteria (descending order):
a) Frequency of term (most frequent term is preferred)
Expand All @@ -62,7 +73,8 @@ def remove_2nd_info(name_):
# Create list of shorter names not containing information given in parentheses
list_short_names = [x.split(" (")[0] for x in list_names if " (" in x]
if len(list_names) > 1:
list_names.extend(list_short_names)
if shorten_names:
list_names.extend(list_short_names)
# Obtain information to check criteria for sorting scale names
df_counts = pd.Series(list_names).value_counts().to_frame().reset_index() # Compute frequencies of names
df_counts.columns = ["name", "count"]
Expand All @@ -75,7 +87,7 @@ def remove_2nd_info(name_):
names_cluster = [name_unclassified]
return names_cluster

def name_clusters(X, labels=None, names=None):
def name_clusters(X, labels=None, names=None, shorten_names=True):
""""""
medoids, medoid_labels, medoid_ind = compute_medoids(X, labels=labels)
dict_medoids = dict(zip(medoid_labels, medoid_ind))
Expand All @@ -87,7 +99,8 @@ def name_clusters(X, labels=None, names=None):
name_medoid = names[dict_medoids[clust]]
list_names = [names[i] for i in range(0, len(names)) if labels[i] == clust]
names_cluster = _get_cluster_names(list_names=list_names, name_medoid=name_medoid,
name_unclassified=ut.STR_UNCLASSIFIED)
name_unclassified=ut.STR_UNCLASSIFIED,
shorten_names=shorten_names)
assigned = False
for name in names_cluster:
if name not in dict_cluster_names.values() or name == ut.STR_UNCLASSIFIED:
Expand Down
Loading

0 comments on commit a5bb440

Please sign in to comment.