From c58c7f55067b0fa9665aa03aa1a8d63b29959939 Mon Sep 17 00:00:00 2001 From: Hakim Elakhrass Date: Sat, 14 Oct 2023 19:45:36 +0200 Subject: [PATCH] Adding PSI for continious data --- nannyml/drift/univariate/calculator.py | 5 ++ nannyml/drift/univariate/methods.py | 83 +++++++++++++++++++++++--- tests/drift/test_drift.py | 1 + 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/nannyml/drift/univariate/calculator.py b/nannyml/drift/univariate/calculator.py index 6f205298..07bcfe35 100644 --- a/nannyml/drift/univariate/calculator.py +++ b/nannyml/drift/univariate/calculator.py @@ -12,6 +12,7 @@ - L-infinity distance (categorical) - Jensen-Shannon distance - Hellinger distance +- Population Stability Index (continuous) For more information, check out the `tutorial`_ or the `deep dive`_. @@ -50,6 +51,7 @@ 'wasserstein': StandardDeviationThreshold(std_lower_multiplier=None), 'hellinger': ConstantThreshold(lower=None, upper=0.1), 'l_infinity': ConstantThreshold(lower=None, upper=0.1), + 'psi': ConstantThreshold(lower=None, upper=0.25), } @@ -97,6 +99,7 @@ def __init__( - `kolmogorov_smirnov` - `hellinger` - `wasserstein` + - `psi` chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. @@ -118,6 +121,7 @@ def __init__( 'wasserstein': StandardDeviationThreshold(std_lower_multiplier=None), 'hellinger': ConstantThreshold(upper=0.1), 'l_infinity': ConstantThreshold(upper=0.1) + 'psi': ConstantThreshold(upper=0.25) } A dictionary allowing users to set a custom threshold for each method. It links a `Threshold` subclass @@ -130,6 +134,7 @@ def __init__( - `wasserstein`: `StandardDeviationThreshold(std_lower_multiplier=None)` - `hellinger`: `ConstantThreshold(upper=0.1)` - `l_infinity`: `ConstantThreshold(upper=0.1)` + - `psi`: `ConstantThreshold(upper=0.25)` The `chi2` method does not support custom thresholds for now. Additional research is required to determine how to transition from its current p-value based implementation. diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index e165964b..4a3295f4 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -271,7 +271,7 @@ def __init__(self, **kwargs) -> None: column_name: str, default='jensen-shannon' The name used to indicate the metric in columns of a DataFrame. lower_threshold_limit : float, default=0 - An optional lower threshold for the performance metric. + An optional lower threshold for the drift metric. """ self._treat_as_type: str self._bins: np.ndarray @@ -353,9 +353,9 @@ def __init__(self, **kwargs) -> None: column_name: str, default='kolmogorov-smirnov' The name used to indicate the metric in columns of a DataFrame. upper_threshold_limit : float, default=1.0 - An optional upper threshold for the performance metric. + An optional upper threshold for the drift metric. lower_threshold_limit : float, default=0 - An optional lower threshold for the performance metric. + An optional lower threshold for the drift metric. """ self._reference_data: Optional[pd.Series] = None self._reference_size: float @@ -434,9 +434,9 @@ def __init__(self, **kwargs) -> None: column_name: str, default='chi2' The name used to indicate the metric in columns of a DataFrame. upper_threshold_limit : float, default=1.0 - An optional upper threshold for the performance metric. + An optional upper threshold for the drift metric. lower_threshold_limit : float, default=0 - An optional lower threshold for the performance metric. + An optional lower threshold for the drift metric. """ self._reference_data_vcs: pd.Series self._p_value: float @@ -499,7 +499,7 @@ def __init__(self, **kwargs) -> None: column_name: str, default='l_infinity' The name used to indicate the metric in columns of a DataFrame. lower_threshold_limit : float, default=0 - An optional lower threshold for the performance metric. + An optional lower threshold for the drift metric. """ self._reference_proba: Optional[dict] = None @@ -553,7 +553,7 @@ def __init__(self, **kwargs) -> None: column_name: str, default='wasserstein' The name used to indicate the metric in columns of a DataFrame. lower_threshold_limit : float, default=0 - An optional lower threshold for the performance metric. + An optional lower threshold for the drift metric. """ self._reference_data: Optional[pd.Series] = None @@ -660,7 +660,7 @@ def __init__(self, **kwargs) -> None: column_name: str, default='hellinger' The name used to indicate the metric in columns of a DataFrame. lower_threshold_limit : float, default=0 - An optional lower threshold for the performance metric. + An optional lower threshold for the drift metric. """ self._treat_as_type: str @@ -719,3 +719,70 @@ def _calculate(self, data: pd.Series): del reference_proba_in_bins return distance + +@MethodFactory.register(key='psi', feature_type=FeatureType.CONTINUOUS) +class PSI(Method): + """Calculates the Population Stability Index (PSI) between two distributions.""" + + def __init__(self, **kwargs) -> None: + super().__init__( + display_name='Population Stability Index', + column_name='psi', + lower_threshold_limit=0, + **kwargs, + ) + """ + Parameters + ---------- + display_name : str, default='Population Stability Index' + The name of the metric. Used to display in plots. + column_name: str, default='psi' + The name used to indicate the metric in columns of a DataFrame. + lower_threshold_limit : float, default=0 + An optional lower threshold for the drift metric. + """ + + self._reference_bins = None + + def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self: + #pick optimal bin size using freedman diaconis rule + data = np.array(reference_data) + IQR = np.percentile(data, 75) - np.percentile(data, 25) + n = len(data) + bin_width = 2 * IQR * n**(-1/3) + bin_num = int(np.ceil((data.max() - data.min()) / bin_width)) + + reference_data = _remove_missing_data(reference_data) + _, self._reference_bins = np.histogram(reference_data, bins=bin_num, density=True) + + + # Calculate bin frequencies for the reference data + self._reference_count, _ = np.histogram(reference_data, bins=self._reference_bins) + self._reference_proba = self._reference_count / len(reference_data) + + return self + + def _calculate(self, data: pd.Series): + if self._reference_bins is None: + raise NotFittedException( + "tried to call 'calculate' on an unfitted method " f"{self.display_name}. Please run 'fit' first" + ) + data = _remove_missing_data(data) + if data.empty: + return np.nan + + # Calculate bin frequencies for the analysis data + data_counts, _ = np.histogram(data, bins=self._reference_bins) + data_probs = data_counts / len(data) + + # Use the previously calculated bin frequencies for the reference data + ref_probs = self._reference_proba + + psi_values = [ + (data_prob - ref_prob) * np.log(data_prob / ref_prob) + if ref_prob > 1e-10 and data_prob > 1e-10 + else 0 + for data_prob, ref_prob in zip(data_probs, ref_probs) + ] + + return sum(psi_values) diff --git a/tests/drift/test_drift.py b/tests/drift/test_drift.py index 845f08fa..9cc3b72f 100644 --- a/tests/drift/test_drift.py +++ b/tests/drift/test_drift.py @@ -322,6 +322,7 @@ def test_univariate_drift_calculator_without_custom_thresholds(): 'hellinger': ConstantThreshold(lower=1, upper=2), 'jensen_shannon': ConstantThreshold(lower=1, upper=2), 'l_infinity': ConstantThreshold(lower=1, upper=2), + 'psi': ConstantThreshold(lower=1, upper=2), }, ], )