Skip to content

Commit

Permalink
Merge pull request #404 from NannyML/fix/explicit-drift
Browse files Browse the repository at this point in the history
Explicit data type conversion for univariate drift
  • Loading branch information
nnansters authored Jul 5, 2024
2 parents 8bffbcd + 336a056 commit 6517553
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 116 deletions.
6 changes: 3 additions & 3 deletions nannyml/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,10 @@ def _estimate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
raise NotImplementedError(f"'{self.__class__.__name__}' must implement the '_calculate' method")


def _split_features_by_type(data: pd.DataFrame, feature_column_names: List[str]) -> Tuple[List[str], List[str]]:
continuous_column_names = [col for col in feature_column_names if _column_is_continuous(data[col])]
def _split_features_by_type(data: pd.DataFrame, feature_column_names: Iterable[str]) -> Tuple[List[str], List[str]]:
continuous_column_names = [col for col in sorted(feature_column_names) if _column_is_continuous(data[col])]

categorical_column_names = [col for col in feature_column_names if _column_is_categorical(data[col])]
categorical_column_names = [col for col in sorted(feature_column_names) if _column_is_categorical(data[col])]

return continuous_column_names, categorical_column_names

Expand Down
56 changes: 41 additions & 15 deletions nannyml/drift/univariate/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import warnings
from logging import Logger
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -61,6 +61,7 @@ class UnivariateDriftCalculator(AbstractCalculator):
def __init__(
self,
column_names: Union[str, List[str]],
treat_as_numerical: Optional[Union[str, List[str]]] = None,
treat_as_categorical: Optional[Union[str, List[str]]] = None,
timestamp_column_name: Optional[str] = None,
categorical_methods: Optional[Union[str, List[str]]] = None,
Expand All @@ -79,6 +80,8 @@ def __init__(
column_names: Union[str, List[str]]
A string or list containing the names of features in the provided data set.
A drift score will be calculated for each entry in this list.
treat_as_numerical: Union[str, List[str]]
A single column name or list of column names to be treated as numerical by the calculator.
treat_as_categorical: Union[str, List[str]]
A single column name or list of column names to be treated as categorical by the calculator.
timestamp_column_name: str
Expand Down Expand Up @@ -204,6 +207,12 @@ def __init__(
column_names = [column_names]
self.column_names = column_names

if not treat_as_numerical:
treat_as_numerical = []
if isinstance(treat_as_numerical, str):
treat_as_numerical = [treat_as_numerical]
self.treat_as_numerical = treat_as_numerical

if not treat_as_categorical:
treat_as_categorical = []
if isinstance(treat_as_categorical, str):
Expand Down Expand Up @@ -255,22 +264,10 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> UnivariateDrift

_list_missing(self.column_names, reference_data)

self.continuous_column_names, self.categorical_column_names = _split_features_by_type(
reference_data, self.column_names
self.continuous_column_names, self.categorical_column_names = self._split_continuous_and_categorical(
reference_data
)

for column_name in self.treat_as_categorical:
if column_name not in self.column_names:
self._logger.info(
f"ignoring 'treat_as_categorical' value '{column_name}' because it was not in "
f"listed column names"
)
break
if column_name in self.continuous_column_names:
self.continuous_column_names.remove(column_name)
if column_name not in self.categorical_column_names:
self.categorical_column_names.append(column_name)

timestamps = reference_data[self.timestamp_column_name] if self.timestamp_column_name else None
for column_name in self.continuous_column_names:
methods = []
Expand Down Expand Up @@ -399,6 +396,35 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:

return self.result

def _split_continuous_and_categorical(self, data: pd.DataFrame) -> Tuple[List[str], List[str]]:
"""Splits the features in the data set into continuous and categorical features."""
treat_as_numerical_set, treat_as_categorical_set = set(self.treat_as_numerical), set(self.treat_as_categorical)
column_names_set = set(self.column_names)

invalid_continuous_column_names = treat_as_numerical_set - column_names_set
treat_as_numerical_set = treat_as_numerical_set - invalid_continuous_column_names
if invalid_continuous_column_names:
self._logger.info(
f"ignoring 'treat_as_numerical' values {list(invalid_continuous_column_names)} because "
f"they were not in listed column names"
)

invalid_categorical_column_names = treat_as_categorical_set - column_names_set
treat_as_categorical_set = treat_as_categorical_set - invalid_categorical_column_names
if invalid_categorical_column_names:
self._logger.info(
f"ignoring 'treat_as_categorical' values {list(invalid_categorical_column_names)} because "
f"they were not in listed column names"
)

unspecified_columns = column_names_set - treat_as_numerical_set - treat_as_categorical_set
continuous_column_names, categorical_column_names = _split_features_by_type(data, unspecified_columns)

continuous_column_names = continuous_column_names + list(treat_as_numerical_set)
categorical_column_names = categorical_column_names + list(treat_as_categorical_set)

return continuous_column_names, categorical_column_names


def _calculate_for_column(
data: pd.DataFrame, column_name: str, method: Method, logger: Optional[Logger] = None
Expand Down
195 changes: 128 additions & 67 deletions nannyml/drift/univariate/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from scipy.stats import chi2_contingency, ks_2samp, wasserstein_distance

from nannyml._typing import Self
from nannyml.base import _column_is_categorical, _remove_nans
from nannyml.base import _remove_nans
from nannyml.chunk import Chunker
from nannyml.exceptions import InvalidArgumentsException, NotFittedException
from nannyml.thresholds import Threshold, calculate_threshold_values
Expand Down Expand Up @@ -247,8 +247,7 @@ def inner_wrapper(wrapped_class: Type[Method]) -> Type[Method]:


@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CONTINUOUS)
@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CATEGORICAL)
class JensenShannonDistance(Method):
class ContinuousJensenShannonDistance(Method):
"""Calculates Jensen-Shannon distance.
By default an alert will be raised if `distance > 0.1`.
Expand All @@ -272,34 +271,17 @@ def __init__(self, **kwargs) -> None:
lower_threshold_limit : float, default=0
An optional lower threshold for the performance metric.
"""
self._treat_as_type: str
self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None):
reference_data = _remove_nans(reference_data)
if _column_is_categorical(reference_data):
treat_as_type = 'cat'
else:
n_unique_values = len(np.unique(reference_data))
len_reference = len(reference_data)
if n_unique_values > 50 or n_unique_values / len_reference > 0.1:
treat_as_type = 'cont'
else:
treat_as_type = 'cat'
len_reference = len(reference_data)

if treat_as_type == 'cont':
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins
else:
reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique

self._treat_as_type = treat_as_type
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins

return self

Expand All @@ -308,15 +290,9 @@ def _calculate(self, data: pd.Series):
data = _remove_nans(data)
if data.empty:
return np.nan
if self._treat_as_type == 'cont':
len_data = len(data)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data

else:
data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)
len_data = len(data)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
Expand All @@ -325,7 +301,63 @@ def _calculate(self, data: pd.Series):

distance = jensenshannon(reference_proba_in_bins, data_proba_in_bins, base=2)

del reference_proba_in_bins
return distance


@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CATEGORICAL)
class CategoricalJensenShannonDistance(Method):
"""Calculates Jensen-Shannon distance.
By default an alert will be raised if `distance > 0.1`.
"""

def __init__(self, **kwargs) -> None:
"""Initialize Jensen-Shannon method."""
super().__init__(
display_name='Jensen-Shannon distance',
column_name='jensen_shannon',
lower_threshold_limit=0,
**kwargs,
)
"""
Parameters
----------
display_name : str, default='Jensen-Shannon distance'
The name of the metric. Used to display in plots.
column_name: str, default='jensen-shannon'
The name used to indicate the metric in columns of a DataFrame.
lower_threshold_limit : float, default=0
An optional lower threshold for the performance metric.
"""
self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None):
reference_data = _remove_nans(reference_data)
reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique

return self

def _calculate(self, data: pd.Series):
reference_proba_in_bins = copy(self._reference_proba_in_bins)
data = _remove_nans(data)
if data.empty:
return np.nan

data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
data_proba_in_bins = np.append(data_proba_in_bins, leftover)
reference_proba_in_bins = np.append(reference_proba_in_bins, 0)

distance = jensenshannon(reference_proba_in_bins, data_proba_in_bins, base=2)

return distance

Expand Down Expand Up @@ -670,8 +702,7 @@ def _ecdf(self, vec: np.ndarray):


@MethodFactory.register(key='hellinger', feature_type=FeatureType.CONTINUOUS)
@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL)
class HellingerDistance(Method):
class ContinuousHellingerDistance(Method):
"""Calculates the Hellinger Distance between two distributions."""

def __init__(self, **kwargs) -> None:
Expand All @@ -693,34 +724,70 @@ def __init__(self, **kwargs) -> None:
An optional lower threshold for the performance metric.
"""

self._treat_as_type: str
self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self:
reference_data = _remove_nans(reference_data)
if _column_is_categorical(reference_data):
treat_as_type = 'cat'
else:
n_unique_values = len(np.unique(reference_data))
len_reference = len(reference_data)
if n_unique_values > 50 or n_unique_values / len_reference > 0.1:
treat_as_type = 'cont'
else:
treat_as_type = 'cat'
len_reference = len(reference_data)

if treat_as_type == 'cont':
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins
else:
reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique
bins = np.histogram_bin_edges(reference_data, bins='doane')
reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference
self._bins = bins
self._reference_proba_in_bins = reference_proba_in_bins

return self

self._treat_as_type = treat_as_type
def _calculate(self, data: pd.Series):
data = _remove_nans(data)
if data.empty:
return np.nan
reference_proba_in_bins = copy(self._reference_proba_in_bins)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len(data)

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
data_proba_in_bins = np.append(data_proba_in_bins, leftover)
reference_proba_in_bins = np.append(reference_proba_in_bins, 0)

distance = np.sqrt(np.sum((np.sqrt(reference_proba_in_bins) - np.sqrt(data_proba_in_bins)) ** 2)) / np.sqrt(2)

return distance


@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL)
class CategoricalHellingerDistance(Method):
"""Calculates the Hellinger Distance between two distributions."""

def __init__(self, **kwargs) -> None:
"""Initialize Hellinger Distance method."""
super().__init__(
display_name='Hellinger distance',
column_name='hellinger',
lower_threshold_limit=0,
**kwargs,
)
"""
Parameters
----------
display_name : str, default='Hellinger distance'
The name of the metric. Used to display in plots.
column_name: str, default='hellinger'
The name used to indicate the metric in columns of a DataFrame.
lower_threshold_limit : float, default=0
An optional lower threshold for the performance metric.
"""

self._bins: np.ndarray
self._reference_proba_in_bins: np.ndarray

def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self:
reference_data = _remove_nans(reference_data)

reference_unique, reference_counts = np.unique(reference_data, return_counts=True)
reference_proba_per_unique = reference_counts / len(reference_data)
self._bins = reference_unique
self._reference_proba_in_bins = reference_proba_per_unique

return self

Expand All @@ -729,15 +796,11 @@ def _calculate(self, data: pd.Series):
if data.empty:
return np.nan
reference_proba_in_bins = copy(self._reference_proba_in_bins)
if self._treat_as_type == 'cont':
len_data = len(data)
data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data

else:
data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)
data_unique, data_counts = np.unique(data, return_counts=True)
data_counts_dic = dict(zip(data_unique, data_counts))
data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins]
data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data)

leftover = 1 - np.sum(data_proba_in_bins)
if leftover > 0:
Expand All @@ -746,6 +809,4 @@ def _calculate(self, data: pd.Series):

distance = np.sqrt(np.sum((np.sqrt(reference_proba_in_bins) - np.sqrt(data_proba_in_bins)) ** 2)) / np.sqrt(2)

del reference_proba_in_bins

return distance
Loading

0 comments on commit 6517553

Please sign in to comment.