Skip to content

Commit

Permalink
Update tests for test_aaclust_eval.py
Browse files Browse the repository at this point in the history
  • Loading branch information
breimanntools committed Oct 2, 2023
1 parent fac37e9 commit 82eb5f8
Show file tree
Hide file tree
Showing 352 changed files with 263 additions and 236 deletions.
Binary file modified aaanalysis/__pycache__/utils.cpython-39.pyc
Binary file not shown.
Binary file modified aaanalysis/_utils/__pycache__/_check_data.cpython-39.pyc
Binary file not shown.
Binary file modified aaanalysis/_utils/__pycache__/utils_cpp.cpython-39.pyc
Binary file not shown.
92 changes: 45 additions & 47 deletions aaanalysis/_utils/_check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,21 @@
# Write wrapper around scikit checkers
from sklearn.utils import check_array

# Array checking functions
def check_array_like(name=None, val=None, dtype=None, accept_none=False,
ensure_2d=False, allow_nan=False):
# Helper functions
def _check_array_like(name=None, val=None, dtype=None, ensure_2d=False, allow_nan=False):
"""
Check if the provided value matches the specified dtype.
If dtype is None, checks for general array-likeness.
If dtype is 'int', 'float', or 'any', checks for specific types.
"""
if accept_none and val is None:
return None

# Convert DataFrame and Series to np.ndarray
if isinstance(val, (pd.DataFrame, pd.Series)):
val = val.values

if name is None:
raise ValueError(f"'{name}' should not be None.")
# Utilize Scikit-learn's check_array for robust checking
if dtype == 'int':
expected_dtype = 'int'
elif dtype == 'float':
expected_dtype = 'float64'
elif dtype == 'any':
elif dtype == 'any' or dtype is None:
expected_dtype = None
else:
raise ValueError(f"'dtype' ({dtype}) not recognized.")
Expand All @@ -37,51 +31,55 @@ def check_array_like(name=None, val=None, dtype=None, accept_none=False,
f"\nscikit message:\n\t{e}")
return val

# TODO separation of concerns
def check_feat_matrix(X=None, y=None, y_name="labels", accept_none_y=True,
ensure_2d=True, allow_nan=False, min_n_unique_samples=3, min_n_features=2):
"""Check feature matrix valid and matches with y if (if provided)"""
# Check if X is None
if X is None:
raise ValueError("Feature matrix 'X' should not be None.")
if not accept_none_y and y is None:
raise ValueError(f"'{y_name}' ({y}) should not be None.")
# Use check_array from scikit to convert
try:
X = check_array(X, dtype="float64", ensure_2d=ensure_2d, force_all_finite=not allow_nan)
except Exception as e:
raise ValueError(f"Feature matrix 'X' should be array-like with float values."
f"\nscikit message:\n\t{e}")

# Check X values (not Nan, inf or None)
if not allow_nan and np.any(np.isnan(X)):
raise ValueError("Feature matrix 'X' should not contain NaN values.")
if np.any(np.isinf(X)):
raise ValueError("Feature matrix 'X' should not contain infinite values.")
if X.dtype == object:
if np.any([elem is None for row in X for elem in row]):
raise ValueError("Feature matrix 'X' should not contain None.")
# Check feature matrix and labels
def check_X(X, min_n_samples=3, min_n_features=2, ensure_2d=True, allow_nan=False):
"""Check the feature matrix X is valid."""
X = _check_array_like(name="X", val=X, dtype="float", ensure_2d=ensure_2d, allow_nan=allow_nan)
n_samples, n_features = X.shape
if n_samples < min_n_samples:
raise ValueError(f"n_samples={n_samples} (in 'X') should be >= {min_n_samples}."
f"\nX = {X}")
if n_features < min_n_features:
raise ValueError(f"n_features={n_features} (in 'X') should be >= {min_n_features}."
f"\nX = {X}")
return X


# Check all identical samples
def check_X_unique_samples(X, min_n_unique_samples=3):
"""Check if the matrix X has a sufficient number of unique samples."""
if len(set(map(tuple, X))) == 1:
raise ValueError("Feature matrix 'X' should not have all identical samples.")

n_samples, n_features = X.shape
n_unique_samples = len(set(map(tuple, X)))
if y is not None and n_samples != len(y):
raise ValueError(f"Number of samples does not match for 'X' ({n_samples}) and '{y_name}' ({y}.")

if n_samples == 0 or n_features == 0:
raise ValueError(f"Shape of 'X' ({n_samples}, {n_features}) indicates empty feature matrix."
f"\nX = {X}")
if n_unique_samples < min_n_unique_samples or n_samples < min_n_unique_samples:
raise ValueError(f"Number of unique samples ({n_unique_samples}) should be at least {min_n_unique_samples}."
f"\nX = {X}")
if n_features < min_n_features:
raise ValueError(f"'n_features' ({n_features}) should be at least {min_n_features}."
if n_unique_samples < min_n_unique_samples:
raise ValueError(f"n_unique_samples ({n_unique_samples}) should be >= {min_n_unique_samples}."
f"\nX = {X}")
return X

def check_labels(labels=None):
""""""
if labels is None:
raise ValueError("'labels' should not be None")

# Convert labels to a numpy array if it's not already
labels = np.asarray(labels)

unique_labels = set(labels)
if len(unique_labels) == 1:
raise ValueError(f"'labels' should contain more than one different value ({unique_labels})")
wrong_types = [l for l in unique_labels if not np.issubdtype(type(l), np.integer)]
if wrong_types:
raise ValueError(f"Labels in 'labels' should be type int, but contain: {set(map(type, wrong_types))}")
return labels


def check_match_X_labels(X=None, labels=None):
""""""
n_samples, n_features = X.shape
if n_samples != len(labels):
raise ValueError(f"n_samples does not match for 'X' ({len(X)}) and 'labels' ({len(labels)}).")


# df checking functions
def check_col_in_df(df=None, name_df=None, col=None, col_type=None, accept_nan=False, error_if_exists=False):
Expand Down
2 changes: 1 addition & 1 deletion aaanalysis/_utils/utils_cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def check_y_categorical(df=None, y=None):
"of 'df': {}".format(list_cat_columns))


def check_labels(labels=None, df=None, name_df=None):
def check_labels_(labels=None, df=None, name_df=None):
"""Check if y not None and just containing 0 and 1"""
if labels is None:
raise ValueError("'labels' should not be None")
Expand Down
Binary file modified aaanalysis/aaclust/__pycache__/_aaclust_bic.cpython-39.pyc
Binary file not shown.
Binary file modified aaanalysis/aaclust/__pycache__/aaclust.cpython-39.pyc
Binary file not shown.
41 changes: 25 additions & 16 deletions aaanalysis/aaclust/_aaclust_bic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,47 @@
def bic_score(X, labels=None):
"""Computes the BIC metric for given clusters.
Returns
-------
BIC : float
BIC value between -inf and inf. Greater values indicate better clustering.
See also
--------
https://stats.stackexchange.com/questions/90769/using-bic-to-estimate-the-number-of-k-in-kmeans
"""
epsilon = 1e-10 # prevent division by zero

# Check if labels match to number of clusters
n_classes = len(set(labels))
n_samples, n_features = X.shape
if n_classes >= n_samples:
raise ValueError(f"Number of classes in 'labels' ({n_classes}) must be smaller than n_samples ({n_samples})")
if n_features == 0:
raise ValueError(f"'n_features' should not be 0")

# Map labels to increasing order starting with 0
unique_labels, inverse = np.unique(labels, return_inverse=True)
labels = inverse
centers, center_labels = compute_centers(X, labels=labels)
size_clusters = np.bincount(labels)
# Compute variance over all clusters beforehand
list_masks = [[i == label for i in labels] for label in center_labels]
sum_squared_dist = sum(
[sum(distance.cdist(X[list_masks[i]], [centers[i]], 'euclidean') ** 2) for i in range(n_classes)])

# Compute variance over all clusters
list_masks = [labels == label for label in center_labels]
sum_squared_dist = sum([sum(distance.cdist(X[mask], [center], 'euclidean') ** 2) for mask, center in zip(list_masks, centers)])

# Compute between-cluster variance
denominator = (n_samples - n_classes) * n_features
bet_clu_var = (1.0 / denominator) * sum_squared_dist
if bet_clu_var == 0:
raise ValueError("The between-cluster variance should not be 0")
# Compute BIC
denominator = max((n_samples - n_classes) * n_features, epsilon)
bet_clu_var = max((1.0 / denominator) * sum_squared_dist, epsilon)

# Compute BIC components
const_term = 0.5 * n_classes * np.log(n_samples) * (n_features + 1)
bic_components = []
for i in range(n_classes):
component = (size_clusters[i] * np.log(size_clusters[i]) - size_clusters[i] * np.log(n_samples) - (
(size_clusters[i] * n_features) / 2) * np.log(2 * np.pi * bet_clu_var) - (
(size_clusters[i] - 1) * n_features / 2))
bic_components.append(component)

log_size_clusters = np.log(size_clusters + epsilon)
log_n_samples = np.log(n_samples + epsilon)
log_bcv = np.log(2 * np.pi * bet_clu_var)

bic_components = size_clusters * (log_size_clusters - log_n_samples) - 0.5 * size_clusters * n_features * log_bcv - 0.5 * (size_clusters - 1) * n_features
bic = np.sum(bic_components) - const_term
return bic

return bic
Loading

0 comments on commit 82eb5f8

Please sign in to comment.