-
Notifications
You must be signed in to change notification settings - Fork 147
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #408 from jnesfield/main
Added Numerical Range Data Quality Check
- Loading branch information
Showing
11 changed files
with
562 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Author: James Nesfield <jamesnesfield@live.com> | ||
# | ||
# License: Apache Software License 2.0 | ||
|
||
"""Package containing the Data Quality Calculators implementation.""" | ||
|
||
from .calculator import NumericalRangeCalculator | ||
from .result import Result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,263 @@ | ||
# Author: James Nesfield <jamesnesfield@live.com> | ||
# | ||
# License: Apache Software License 2.0 | ||
|
||
"""Continuous numerical variable range monitor to ensure range supplied is within training bounds.""" | ||
|
||
from typing import Any, Dict, List, Optional, Union | ||
|
||
import numpy as np | ||
import pandas as pd | ||
from pandas import MultiIndex | ||
|
||
from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type | ||
from nannyml.chunk import Chunker | ||
from nannyml.exceptions import InvalidArgumentsException | ||
from nannyml.thresholds import Threshold, calculate_threshold_values, ConstantThreshold | ||
from nannyml.usage_logging import UsageEvent, log_usage | ||
from .result import Result | ||
|
||
""" | ||
Values Out Of Range Data Quality Module. | ||
""" | ||
|
||
|
||
class NumericalRangeCalculator(AbstractCalculator): | ||
"""NumericalRangeCalculator ensures the monitoring data set numerical ranges match the reference data set ones.""" | ||
|
||
def __init__( | ||
self, | ||
column_names: Union[str, List[str]], | ||
normalize: bool = True, | ||
timestamp_column_name: Optional[str] = None, | ||
chunk_size: Optional[int] = None, | ||
chunk_number: Optional[int] = None, | ||
chunk_period: Optional[str] = None, | ||
chunker: Optional[Chunker] = None, | ||
threshold: Threshold = ConstantThreshold(lower=None, upper=0), | ||
): | ||
"""Creates a new NumericalRangeCalculator instance. | ||
Parameters | ||
---------- | ||
column_names: Union[str, List[str]] | ||
A string or list containing the names of features in the provided data set. | ||
Missing Values will be calculated for each entry in this list. | ||
normalize: bool, default=True | ||
Whether to provide the missing value ratio (True) or the absolute number of missing values (False). | ||
timestamp_column_name: str | ||
The name of the column containing the timestamp of the model prediction. | ||
chunk_size: int | ||
Splits the data into chunks containing `chunks_size` observations. | ||
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. | ||
chunk_number: int | ||
Splits the data into `chunk_number` pieces. | ||
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. | ||
chunk_period: str | ||
Splits the data according to the given period. | ||
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. | ||
chunker : Chunker | ||
The `Chunker` used to split the data sets into a lists of chunks. | ||
threshold: Threshold, default=StandardDeviationThreshold | ||
The threshold you wish to evaluate values on. Defaults to a StandardDeviationThreshold with default | ||
options. The other available value is ConstantThreshold. | ||
Examples | ||
-------- | ||
>>> import nannyml as nml | ||
>>> reference_df, analysis_df, _ = nml.load_synthetic_car_price_dataset() | ||
>>> feature_column_names = [col for col in reference_df.columns if col not in [ | ||
... 'fuel','transmission','timestamp', 'y_pred', 'y_true']] | ||
>>> calc = nml.NumericalRangeCalculator( | ||
... column_names=feature_column_names, | ||
... timestamp_column_name='timestamp', | ||
... ).fit(reference_df) | ||
>>> res = calc.calculate(analysis_df) | ||
>>> res.filter(period='analysis').plot().show() | ||
""" | ||
super(NumericalRangeCalculator, self).__init__( | ||
chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name | ||
) | ||
if isinstance(column_names, str): | ||
self.column_names = [column_names] | ||
elif isinstance(column_names, list): | ||
for el in column_names: | ||
if not isinstance(el, str): | ||
raise InvalidArgumentsException( | ||
f"column_names elements should be either a column name string or a list of strings, found\n{el}" | ||
) | ||
self.column_names = column_names | ||
else: | ||
raise InvalidArgumentsException( | ||
"column_names should be either a column name string or a list of columns names strings, " | ||
"found\n{column_names}" | ||
) | ||
self.result: Optional[Result] = None | ||
|
||
# threshold strategy is the same across all columns | ||
self.threshold = threshold | ||
self._upper_alert_thresholds: Dict[str, Optional[float]] = {column_name: 0 for column_name in self.column_names} | ||
self._lower_alert_thresholds: Dict[str, Optional[float]] = {column_name: 0 for column_name in self.column_names} | ||
|
||
self.lower_threshold_value_limit: float = 0 | ||
self.upper_threshold_value_limit: Optional[float] = None | ||
self.normalize = normalize | ||
|
||
if self.normalize: | ||
self.data_quality_metric = 'out_of_range_values_rate' | ||
self.upper_threshold_value_limit = 1 | ||
else: | ||
self.data_quality_metric = 'out_of_range_values_count' | ||
self.upper_threshold_value_limit = np.nan | ||
|
||
# object tracks values as list [min,max] | ||
self._reference_value_ranges: Dict[str, list] = {column_name: list() for column_name in self.column_names} | ||
|
||
def _calculate_out_of_range_stats(self, data: pd.Series, lower_bound: float, upper_bound: float): | ||
# to do make this calc out of range stats | ||
count_tot = data.shape[0] | ||
count_out_of_range = ((data < lower_bound) | (data > upper_bound)).sum() | ||
if self.normalize: | ||
count_out_of_range = count_out_of_range / count_tot | ||
return count_out_of_range | ||
|
||
@log_usage(UsageEvent.DQ_CALC_VALUES_OUT_OF_RANGE_FIT, metadata_from_self=['normalize']) | ||
def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): | ||
"""Fits the drift calculator to a set of reference data.""" | ||
if reference_data.empty: | ||
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') | ||
|
||
_list_missing(self.column_names, reference_data) | ||
|
||
# All provided columns must be continuous | ||
# We do not make int categorical | ||
continuous_column_names, categorical_column_names = _split_features_by_type(reference_data, self.column_names) | ||
if not set(self.column_names) == set(continuous_column_names): | ||
raise InvalidArgumentsException( | ||
f"Specified columns_names for NumericalRangeCalculator must all be continuous. " | ||
f"Categorical columns found: {categorical_column_names}" | ||
) | ||
|
||
for col in self.column_names: | ||
self._reference_value_ranges[col] = [reference_data[col].min(), reference_data[col].max()] | ||
|
||
self.result = self._calculate(data=reference_data) | ||
self.result.data[('chunk', 'period')] = 'reference' | ||
|
||
return self | ||
|
||
@log_usage(UsageEvent.DQ_CALC_VALUES_OUT_OF_RANGE_RUN, metadata_from_self=['normalize']) | ||
def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: | ||
"""Calculates methods for both categorical and continuous columns.""" | ||
if data.empty: | ||
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') | ||
|
||
_list_missing(self.column_names, data) | ||
|
||
chunks = self.chunker.split(data) | ||
|
||
rows = [] | ||
for chunk in chunks: | ||
row = { | ||
'key': chunk.key, | ||
'chunk_index': chunk.chunk_index, | ||
'start_index': chunk.start_index, | ||
'end_index': chunk.end_index, | ||
'start_datetime': chunk.start_datetime, | ||
'end_datetime': chunk.end_datetime, | ||
'period': 'analysis', | ||
} | ||
|
||
for column_name in self.column_names: | ||
for k, v in self._calculate_for_column(chunk.data, column_name).items(): | ||
row[f'{column_name}_{k}'] = v | ||
|
||
rows.append(row) | ||
|
||
result_index = _create_multilevel_index( | ||
column_names=self.column_names, | ||
) | ||
res = pd.DataFrame(rows) | ||
res.columns = result_index | ||
res = res.reset_index(drop=True) | ||
|
||
if self.result is None: | ||
self._set_metric_thresholds(res) | ||
res = self._populate_alert_thresholds(res) | ||
self.result = Result( | ||
results_data=res, | ||
column_names=self.column_names, | ||
data_quality_metric=self.data_quality_metric, | ||
timestamp_column_name=self.timestamp_column_name, | ||
chunker=self.chunker, | ||
) | ||
else: | ||
# TODO: review subclassing setup => superclass + '_filter' is screwing up typing. | ||
# Dropping the intermediate '_filter' and directly returning the correct 'Result' class works OK | ||
# but this causes us to lose the "common behavior" in the top level 'filter' method when overriding. | ||
# Applicable here but to many of the base classes as well (e.g. fitting and calculating) | ||
res = self._populate_alert_thresholds(res) | ||
self.result = self.result.filter(period='reference') | ||
self.result.data = pd.concat([self.result.data, res], ignore_index=True) | ||
|
||
return self.result | ||
|
||
def _calculate_for_column(self, data: pd.DataFrame, column_name: str) -> Dict[str, Any]: | ||
result = {} | ||
value_range = self._reference_value_ranges[column_name] | ||
value = self._calculate_out_of_range_stats(data[column_name], value_range[0], value_range[1]) | ||
result['value'] = value | ||
return result | ||
|
||
def _set_metric_thresholds(self, result_data: pd.DataFrame): | ||
for column_name in self.column_names: | ||
( | ||
self._lower_alert_thresholds[column_name], | ||
self._upper_alert_thresholds[column_name], | ||
) = calculate_threshold_values( # noqa: E501 | ||
threshold=self.threshold, | ||
data=result_data.loc[:, (column_name, 'value')], | ||
lower_threshold_value_limit=self.lower_threshold_value_limit, | ||
upper_threshold_value_limit=self.upper_threshold_value_limit, | ||
logger=self._logger, | ||
) | ||
|
||
def _populate_alert_thresholds(self, result_data: pd.DataFrame) -> pd.DataFrame: | ||
for column_name in self.column_names: | ||
result_data[(column_name, 'upper_threshold')] = self._upper_alert_thresholds[column_name] | ||
result_data[(column_name, 'lower_threshold')] = self._lower_alert_thresholds[column_name] | ||
result_data[(column_name, 'alert')] = result_data.apply( | ||
lambda row: True | ||
if ( | ||
row[(column_name, 'value')] | ||
> ( | ||
np.inf | ||
if row[(column_name, 'upper_threshold')] is None | ||
else row[(column_name, 'upper_threshold')] # noqa: E501 | ||
) | ||
or row[(column_name, 'value')] | ||
< ( | ||
-np.inf | ||
if row[(column_name, 'lower_threshold')] is None | ||
else row[(column_name, 'lower_threshold')] # noqa: E501 | ||
) | ||
) | ||
else False, | ||
axis=1, | ||
) | ||
return result_data | ||
|
||
|
||
def _create_multilevel_index( | ||
column_names, | ||
): | ||
chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] | ||
chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names] | ||
column_tuples = [ | ||
(column_name, 'value') | ||
for column_name in column_names | ||
# for el in ['value', 'upper_threshold', 'lower_threshold', 'alert'] | ||
] | ||
tuples = chunk_tuples + column_tuples | ||
return MultiIndex.from_tuples(tuples) |
Oops, something went wrong.