diff --git a/ats/evaluators.py b/ats/evaluators.py index c85c98a..6a354e0 100644 --- a/ats/evaluators.py +++ b/ats/evaluators.py @@ -46,29 +46,35 @@ def evaluate_anomaly_detector(evaluated_timeseries_df, anomaly_labels, details=F return evaluation_results -def _calculate_model_scores(single_model_evaluation={},granularity='data_point'): +def _calculate_model_scores(single_model_evaluation={}): dataset_anomalies = set() for sample in single_model_evaluation.keys(): sample_anomalies = set(single_model_evaluation[sample].keys()) dataset_anomalies.update(sample_anomalies) - anomaly_scores = {} - for anomaly in dataset_anomalies: - anomaly_scores[anomaly] = 0 - if 'false_positives' not in dataset_anomalies: - anomaly_scores['false_positives'] = 0.0 - - for anomaly in dataset_anomalies: - for sample in single_model_evaluation.keys(): - if anomaly in single_model_evaluation[sample].keys(): - anomaly_scores[anomaly] += single_model_evaluation[sample][anomaly] - - if granularity == 'series': - samples_n = len(single_model_evaluation) - for key in anomaly_scores.keys(): - anomaly_scores[key] /= samples_n + model_scores = {} + anomalies_count = 0 + false_positives_count = 0 + anomalies_ratio = 0 + false_positives_ratio = 0 + anomalous_series_n = 0 + for sample in single_model_evaluation.keys(): + anomalies_count += single_model_evaluation[sample]['anomalies_count'] + if single_model_evaluation[sample]['anomalies_ratio'] is not None: + anomalies_ratio += single_model_evaluation[sample]['anomalies_ratio'] + anomalous_series_n += 1 + false_positives_count += single_model_evaluation[sample]['false_positives_count'] + false_positives_ratio += single_model_evaluation[sample]['false_positives_ratio'] + + model_scores['anomalies_count'] = anomalies_count + if anomalous_series_n: + model_scores['anomalies_ratio'] = anomalies_ratio/anomalous_series_n + else: + model_scores['anomalies_ratio'] = None + model_scores['false_positives_count'] = false_positives_count + model_scores['false_positives_ratio'] = false_positives_ratio/len(single_model_evaluation) - return anomaly_scores + return model_scores class Evaluator(): @@ -82,7 +88,9 @@ def _copy_dataset(self,dataset,models): dataset_copies.append(dataset_copy) return dataset_copies - def evaluate(self,models={},granularity='data_point'): + def evaluate(self,models={},granularity='point',strategy='flags'): + if strategy != 'flags': + raise NotImplementedError(f'Evaluation strategy {strategy} is not implemented') if not models: raise ValueError('There are no models to evaluate') if not self.test_data: @@ -103,14 +111,16 @@ def evaluate(self,models={},granularity='data_point'): single_model_evaluation = {} flagged_dataset = _get_model_output(dataset_copies[j],model) for i,sample_df in enumerate(flagged_dataset): - if granularity == 'data_point': + if granularity == 'point': single_model_evaluation[f'sample_{i+1}'] = _point_granularity_evaluation(sample_df,anomaly_labels_list[i]) - if granularity == 'variable': + elif granularity == 'variable': single_model_evaluation[f'sample_{i+1}'] = _variable_granularity_evaluation(sample_df,anomaly_labels_list[i]) - if granularity == 'series': + elif granularity == 'series': single_model_evaluation[f'sample_{i+1}'] = _series_granularity_evaluation(sample_df,anomaly_labels_list[i]) + else: + raise ValueError(f'Unknown granularity {granularity}') - models_scores[model_name] = _calculate_model_scores(single_model_evaluation,granularity=granularity) + models_scores[model_name] = _calculate_model_scores(single_model_evaluation) j+=1 return models_scores @@ -140,22 +150,41 @@ def _variable_granularity_evaluation(flagged_timeseries_df,anomaly_labels_df): raise ValueError('Variable granularity is not for this model') normalization_factor = variables_n * len(flagged_timeseries_df) + total_inserted_anomalies_n = 0 + total_detected_anomalies_n = 0 + detection_counts_by_anomaly_type = {} for anomaly,frequency in anomaly_labels_df.value_counts(dropna=False).items(): + if anomaly is not None: + total_inserted_anomalies_n += frequency anomaly_count = 0 for timestamp in flagged_timeseries_df.index: if anomaly_labels_df[timestamp] == anomaly: for column in flagged_timeseries_df.filter(like='anomaly').columns: anomaly_count += flagged_timeseries_df.loc[timestamp,column] - one_series_evaluation_result[anomaly] = anomaly_count / normalization_factor - - one_series_evaluation_result['false_positives'] = one_series_evaluation_result.pop(None) + if anomaly is not None: + total_detected_anomalies_n += anomaly_count + detection_counts_by_anomaly_type[anomaly] = anomaly_count + + total_inserted_anomalies_n *= variables_n + one_series_evaluation_result['false_positives_count'] = detection_counts_by_anomaly_type.pop(None) + one_series_evaluation_result['false_positives_ratio'] = one_series_evaluation_result['false_positives_count']/normalization_factor + one_series_evaluation_result['anomalies_count'] = total_detected_anomalies_n + if total_inserted_anomalies_n: + one_series_evaluation_result['anomalies_ratio'] = total_detected_anomalies_n/total_inserted_anomalies_n + else: + one_series_evaluation_result['anomalies_ratio'] = None return one_series_evaluation_result def _point_granularity_evaluation(flagged_timeseries_df,anomaly_labels_df): one_series_evaluation_result = {} normalization_factor = len(flagged_timeseries_df) + total_inserted_anomalies_n = 0 + total_detected_anomalies_n = 0 + detection_counts_by_anomaly_type = {} for anomaly,frequency in anomaly_labels_df.value_counts(dropna=False).items(): + if anomaly is not None: + total_inserted_anomalies_n += frequency anomaly_count = 0 for timestamp in flagged_timeseries_df.index: if anomaly_labels_df[timestamp] == anomaly: @@ -163,9 +192,18 @@ def _point_granularity_evaluation(flagged_timeseries_df,anomaly_labels_df): if flagged_timeseries_df.loc[timestamp,column]: anomaly_count += 1 break + if anomaly is not None: + total_detected_anomalies_n += anomaly_count + detection_counts_by_anomaly_type[anomaly] = anomaly_count one_series_evaluation_result[anomaly] = anomaly_count / normalization_factor - one_series_evaluation_result['false_positives'] = one_series_evaluation_result.pop(None) + one_series_evaluation_result['false_positives_count'] = detection_counts_by_anomaly_type.pop(None) + one_series_evaluation_result['false_positives_ratio'] = one_series_evaluation_result['false_positives_count']/normalization_factor + one_series_evaluation_result['anomalies_count'] = total_detected_anomalies_n + if total_inserted_anomalies_n: + one_series_evaluation_result['anomalies_ratio'] = total_detected_anomalies_n/total_inserted_anomalies_n + else: + one_series_evaluation_result['anomalies_ratio'] = None return one_series_evaluation_result def _series_granularity_evaluation(flagged_timeseries_df,anomaly_labels_df): @@ -173,9 +211,6 @@ def _series_granularity_evaluation(flagged_timeseries_df,anomaly_labels_df): for anomaly,frequency in anomaly_labels_df.value_counts(dropna=False).items(): if anomaly is not None: anomalies.append(anomaly) - anomalies_n = len(anomalies) - if anomalies_n > 1: - raise ValueError('Evaluation with series granularity supports series with only one anomaly') one_series_evaluation_result = {} is_series_anomalous = 0 @@ -184,11 +219,9 @@ def _series_granularity_evaluation(flagged_timeseries_df,anomaly_labels_df): if flagged_timeseries_df.loc[timestamp,column]: is_series_anomalous = 1 break - if is_series_anomalous and not anomalies: - one_series_evaluation_result['false_positives'] = 1 - elif is_series_anomalous and anomalies: - one_series_evaluation_result[anomalies[0]] = 1 - elif not is_series_anomalous and anomalies: - one_series_evaluation_result[anomalies[0]] = 0 + one_series_evaluation_result['false_positives_count'] = 1 if is_series_anomalous and not anomalies else 0 + one_series_evaluation_result['false_positives_ratio'] = one_series_evaluation_result['false_positives_count'] + one_series_evaluation_result['anomalies_count'] = 1 if is_series_anomalous and anomalies else 0 + one_series_evaluation_result['anomalies_ratio'] = one_series_evaluation_result['anomalies_count'] if anomalies else None return one_series_evaluation_result \ No newline at end of file diff --git a/ats/tests/test_evaluators.py b/ats/tests/test_evaluators.py index d8c9d86..e6a9de5 100644 --- a/ats/tests/test_evaluators.py +++ b/ats/tests/test_evaluators.py @@ -26,6 +26,29 @@ def setUp(self): rnd.seed(123) np.random.seed(123) + self.series1 = generate_timeseries_df(entries=5, variables=2) + self.series1['anomaly_label'] = [None, 'anomaly_2', 'anomaly_1', None, 'anomaly_1'] + # series1 + # timestamp value_1 value_2 anomaly_label + # 2025-06-10 14:00:00+00:00 0.000000 0.707107 None + # 2025-06-10 15:00:00+00:00 0.841471 0.977061 anomaly_2 + # 2025-06-10 16:00:00+00:00 0.909297 0.348710 anomaly_1 + # 2025-06-10 17:00:00+00:00 0.141120 -0.600243 None + # 2025-06-10 18:00:00+00:00 -0.756802 -0.997336 anomaly_1 + self.series2 = generate_timeseries_df(entries=7, variables=2) + self.series2['anomaly_label'] = ['anomaly_1', 'anomaly_2', 'anomaly_1', None, 'anomaly_1', None, None] + # series2 + # timestamp value_1 value_2 anomaly_label + # 2025-06-10 14:00:00+00:00 0.000000 0.707107 anomaly_1 + # 2025-06-10 15:00:00+00:00 0.841471 0.977061 anomaly_2 + # 2025-06-10 16:00:00+00:00 0.909297 0.348710 anomaly_1 + # 2025-06-10 17:00:00+00:00 0.141120 -0.600243 None + # 2025-06-10 18:00:00+00:00 -0.756802 -0.997336 anomaly_1 + # 2025-06-10 19:00:00+00:00 -0.958924 -0.477482 None + # 2025-06-10 20:00:00+00:00 -0.279415 0.481366 None + self.series3 = generate_timeseries_df(entries=3, variables=2) + self.series3['anomaly_label'] = [None, None, None] + def test_evaluate_anomaly_detector(self): min_max_anomaly_detector = MinMaxAnomalyDetector() @@ -240,73 +263,30 @@ def test_get_model_output(self): self.assertIn('humidity_anomaly',list(flagged_dataset[1].columns)) def test_calculate_model_scores(self): - single_model_evaluation = { - 'sample_1': { - 'anomaly_1': 0.5, - 'anomaly_2': 0.2, - 'false_positives': 0.6 - }, - 'sample_2': { - 'anomaly_1': 0.3, - 'anomaly_2': 0.4, - 'anomaly_3': 0.1, - 'false_positives': 0.2 - }, - } - model_scores = _calculate_model_scores(single_model_evaluation,granularity='data_point') - # model_scores: - # { 'anomaly_3': 0.1, - # 'anomaly_1': 0.8, - # 'false_positives': 0.8, - # 'anomaly_2': 0.6 - # } - self.assertEqual(len(model_scores),4) - self.assertIsInstance(model_scores,dict) - self.assertIn('anomaly_1',model_scores.keys()) - self.assertIn('anomaly_2',model_scores.keys()) - self.assertIn('anomaly_3',model_scores.keys()) - self.assertIn('false_positives',model_scores.keys()) - self.assertAlmostEqual(model_scores['anomaly_1'],0.8) - self.assertAlmostEqual(model_scores['anomaly_2'],0.6) - self.assertAlmostEqual(model_scores['anomaly_3'],0.1) - self.assertAlmostEqual(model_scores['false_positives'],0.8) - - def test_calculate_model_score_series_granularity(self): - single_model_evaluation = { - 'sample_1': { - 'anomaly_1': 1, - }, - 'sample_2': { - 'false_positives': 1 - }, - 'sample_3': { - 'anomaly_2': 1 - } - } - model_scores = _calculate_model_scores(single_model_evaluation,granularity='series') - # model_scores: - # { 'anomaly_1': 0.3333333333333333, - # 'false_positives': 0.3333333333333333, - # 'anomaly_2': 0.3333333333333333 - # } - self.assertEqual(len(model_scores),3) + single_model_evaluation = { 'sample_1': {'anomalies_count': 3, 'anomalies_ratio': 1.5, + 'false_positives_count': 1, + 'false_positives_ratio': 0.14}, + 'sample_2': {'anomalies_count': 3, 'anomalies_ratio': 1.5, + 'false_positives_count': 1, + 'false_positives_ratio': 0.14}, + 'sample_3': {'anomalies_count': 3, 'anomalies_ratio': 1.5, + 'false_positives_count': 1, + 'false_positives_ratio': 0.14} + } + model_scores = _calculate_model_scores(single_model_evaluation) self.assertIsInstance(model_scores,dict) - self.assertIn('anomaly_1',model_scores.keys()) - self.assertIn('anomaly_2',model_scores.keys()) - self.assertIn('false_positives',model_scores.keys()) - self.assertAlmostEqual(model_scores['anomaly_1'],0.3333333333333333) - self.assertAlmostEqual(model_scores['anomaly_2'],0.3333333333333333) - self.assertAlmostEqual(model_scores['false_positives'],0.333333333333333) + self.assertIn('anomalies_count',model_scores.keys()) + self.assertIn('anomalies_ratio',model_scores.keys()) + self.assertIn('false_positives_count',model_scores.keys()) + self.assertIn('false_positives_ratio',model_scores.keys()) + + self.assertAlmostEqual(model_scores['anomalies_count'],9) + self.assertAlmostEqual(model_scores['anomalies_ratio'],1.5) + self.assertAlmostEqual(model_scores['false_positives_count'],3) + self.assertAlmostEqual(model_scores['false_positives_ratio'],0.14) def test_evaluate_point_granularity(self): - anomalies = ['step_uv'] - effects = [] - # series with 2880 data points - series_generator = HumiTempTimeseriesGenerator() - series1 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects) - series2 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects) - dataset = [series1,series2] - evaluator = Evaluator(test_data=dataset) + dataset = [self.series1, self.series2, self.series3] minmax1 = MinMaxAnomalyDetector() minmax2 = MinMaxAnomalyDetector() minmax3 = MinMaxAnomalyDetector() @@ -314,28 +294,15 @@ def test_evaluate_point_granularity(self): 'detector_2': minmax2, 'detector_3': minmax3 } - evaluation_results = evaluator.evaluate(models=models,granularity='data_point') - # Evaluation_results: - # detector_1: {'step_uv': 0.000694444444444444, 'false_positives': 0.000694444444444444} - # detector_2: {'step_uv': 0.000694444444444444, 'false_positives': 0.000694444444444444} - # detector_3: {'step_uv': 0.000694444444444444, 'false_positives': 0.000694444444444444} - self.assertIsInstance(evaluation_results,dict) - self.assertEqual(len(evaluation_results),3) - self.assertEqual(len(evaluation_results['detector_1']),2) - self.assertEqual(len(evaluation_results['detector_2']),2) - self.assertEqual(len(evaluation_results['detector_3']),2) - self.assertAlmostEqual(evaluation_results['detector_1']['step_uv'],0.000694444444444444) - self.assertAlmostEqual(evaluation_results['detector_1']['false_positives'],0.000694444444444444) + evaluator = Evaluator(test_data=dataset) + evaluation_results = evaluator.evaluate(models=models,granularity='point') + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],6) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],7/8) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],4) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],8/21) def test_evaluate_variable_granularity(self): - anomalies = ['step_uv'] - effects = [] - # series with 2880 data points - series_generator = HumiTempTimeseriesGenerator() - series1 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects) - series2 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects) - dataset = [series1,series2] - evaluator = Evaluator(test_data=dataset) + dataset = [self.series1, self.series2, self.series3] minmax1 = MinMaxAnomalyDetector() minmax2 = MinMaxAnomalyDetector() minmax3 = MinMaxAnomalyDetector() @@ -343,30 +310,15 @@ def test_evaluate_variable_granularity(self): 'detector_2': minmax2, 'detector_3': minmax3 } + evaluator = Evaluator(test_data=dataset) evaluation_results = evaluator.evaluate(models=models,granularity='variable') - # Evaluation_results: - # detector_1: {'step_uv': 0.000694444444444444, 'false_positives': 0.000694444444444444} - # detector_2: {'step_uv': 0.000694444444444444, 'false_positives': 0.000694444444444444} - # detector_3: {'step_uv': 0.000694444444444444, 'false_positives': 0.000694444444444444} - - self.assertIsInstance(evaluation_results,dict) - self.assertEqual(len(evaluation_results),3) - self.assertEqual(len(evaluation_results['detector_1']),2) - self.assertEqual(len(evaluation_results['detector_2']),2) - self.assertEqual(len(evaluation_results['detector_3']),2) - self.assertAlmostEqual(evaluation_results['detector_1']['step_uv'],0.000694444444444444) - self.assertAlmostEqual(evaluation_results['detector_1']['false_positives'],0.000694444444444444) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],7) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],25/48) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],5) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],31/126) def test_evaluate_series_granularity(self): - anomalies = ['step_uv'] - effects = [] - series_generator = HumiTempTimeseriesGenerator() - # series1 will be a true anomaly for the minmax - series1 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects) - # series2 will be a false positive for minmax (it sees always 2 anomalous data points for each variable) - series2 = series_generator.generate(include_effect_label=True, anomalies=[],effects=effects) - dataset = [series1,series2] - evaluator = Evaluator(test_data=dataset) + dataset = [self.series1, self.series2, self.series3] minmax1 = MinMaxAnomalyDetector() minmax2 = MinMaxAnomalyDetector() minmax3 = MinMaxAnomalyDetector() @@ -374,67 +326,18 @@ def test_evaluate_series_granularity(self): 'detector_2': minmax2, 'detector_3': minmax3 } - evaluation_results = evaluator.evaluate(models=models,granularity='series') - # Evaluation_results: - # detector_1: {'step_uv': 0.5, 'false_positives': 0.5} - # detector_2: {'step_uv': 0.5, 'false_positives': 0.5} - # detector_3: {'step_uv': 0.5, 'false_positives': 0.5} - - self.assertIsInstance(evaluation_results,dict) - self.assertEqual(len(evaluation_results),3) - self.assertEqual(len(evaluation_results['detector_1']),2) - self.assertEqual(len(evaluation_results['detector_2']),2) - self.assertEqual(len(evaluation_results['detector_3']),2) - self.assertAlmostEqual(evaluation_results['detector_1']['step_uv'],0.5) - self.assertAlmostEqual(evaluation_results['detector_1']['false_positives'],0.5) - - def test_series_granularity_eval_with_non_detected_anomalies(self): - effects = [] - series_generator = HumiTempTimeseriesGenerator() - # series1 will be a true anomaly for the minmax - series1 = series_generator.generate(include_effect_label=True, anomalies=['step_uv'],effects=effects) - # series2 will be a false positive for minmax (it sees always 2 anomalous data points for each variable) - series2 = series_generator.generate(include_effect_label=True, anomalies=['pattern_uv'],effects=effects) - dataset = [series1,series2] evaluator = Evaluator(test_data=dataset) - minmax1 = MinMaxAnomalyDetector() - minmax2 = MinMaxAnomalyDetector() - minmax3 = MinMaxAnomalyDetector() - models={'detector_1': minmax1, - 'detector_2': minmax2, - 'detector_3': minmax3 - } evaluation_results = evaluator.evaluate(models=models,granularity='series') - # Evaluation_results: - # detector_1: {'step_uv': 0.5, 'pattern_uv': 0.5, 'false_positives': 0.0} - # detector_2: {'step_uv': 0.5, 'pattern_uv': 0.5, 'false_positives': 0.0} - # detector_3: {'step_uv': 0.5, 'pattern_uv': 0.5, 'false_positives': 0.0} - self.assertIsInstance(evaluation_results,dict) - self.assertEqual(len(evaluation_results),3) - self.assertEqual(len(evaluation_results['detector_1']),3) - self.assertEqual(len(evaluation_results['detector_2']),3) - self.assertEqual(len(evaluation_results['detector_3']),3) - self.assertAlmostEqual(evaluation_results['detector_1']['step_uv'],0.5) - self.assertAlmostEqual(evaluation_results['detector_1']['pattern_uv'],0.5) - self.assertAlmostEqual(evaluation_results['detector_1']['false_positives'],0.0) - - def test_raised_error_evaluation_series_granularity(self): - anomalies = ['step_uv','spike_uv'] - series_generator = HumiTempTimeseriesGenerator() - series = series_generator.generate(include_effect_label=True, anomalies=anomalies) - dataset = [series] - minmax = MinMaxAnomalyDetector() - evaluator = Evaluator(test_data=dataset) - try: - evaluation_result = evaluator.evaluate(models={'detector':minmax},granularity='series') - except Exception as e: - self.assertIsInstance(e,ValueError) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],2) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],2/2) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],1) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],1/3) def test_copy_dataset(self): series_generator = HumiTempTimeseriesGenerator() - series1 = series_generator.generate(include_effect_label=True, effects=['noise']) - series2 = series_generator.generate(include_effect_label=True, effects=['noise']) - dataset = [series1,series2] + series_1 = series_generator.generate(include_effect_label=True, effects=['noise']) + series_2 = series_generator.generate(include_effect_label=True, effects=['noise']) + dataset = [series_1,series_2] evaluator = Evaluator(test_data=dataset) minmax1 = MinMaxAnomalyDetector() minmax2 = MinMaxAnomalyDetector() @@ -447,76 +350,92 @@ def test_copy_dataset(self): self.assertEqual(len(dataset_copies[1]),2) def test_variable_granularity_evaluation(self): - series_generator = HumiTempTimeseriesGenerator() - series = series_generator.generate(include_effect_label=True, anomalies=['step_uv']) - minmax = MinMaxAnomalyDetector() - formatted_series,anomaly_labels = _format_for_anomaly_detector(series,synthetic=True) - flagged_series = minmax.apply(formatted_series) - evaluation_result = _variable_granularity_evaluation(flagged_series,anomaly_labels) - # evaluation_result: - # { 'step_uv': 0.00034722222222222224 - # 'false_positives': 0.00034722222222222224 - # } - self.assertEqual(len(evaluation_result),2) - self.assertAlmostEqual(evaluation_result['step_uv'],1/len(series)) - self.assertAlmostEqual(evaluation_result['false_positives'],1/len(series)) + dataset = [self.series1] + evaluator = Evaluator(test_data=dataset) + minmax1 = MinMaxAnomalyDetector() + models={'detector_1': minmax1} + evaluator = Evaluator(test_data=dataset) + evaluation_results = evaluator.evaluate(models=models,granularity='variable') + self.assertIn('detector_1',evaluation_results.keys()) + self.assertIn('anomalies_count',evaluation_results['detector_1'].keys()) + self.assertIn('anomalies_ratio',evaluation_results['detector_1'].keys()) + self.assertIn('false_positives_count',evaluation_results['detector_1'].keys()) + self.assertIn('false_positives_ratio',evaluation_results['detector_1'].keys()) + + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],4) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],4/6) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],0) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],0) + + dataset1 = [self.series2] + evaluator1 = Evaluator(test_data=dataset1) + evaluation_results = evaluator1.evaluate(models=models,granularity='variable') + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],3) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],3/8) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],1) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],1/(7*2)) def test_point_granularity_evaluation(self): - series_generator = HumiTempTimeseriesGenerator() - series = series_generator.generate(include_effect_label=True, anomalies=['step_uv']) - minmax = MinMaxAnomalyDetector() - formatted_series,anomaly_labels = _format_for_anomaly_detector(series,synthetic=True) - flagged_series = minmax.apply(formatted_series) - evaluation_result = _point_granularity_evaluation(flagged_series,anomaly_labels) - # evaluation_result: - # { 'step_uv': 0.0006944444444444445 - # 'false_positives': 0.0006944444444444445 - # } - self.assertEqual(len(evaluation_result),2) - self.assertAlmostEqual(evaluation_result['step_uv'],2/len(series)) - self.assertAlmostEqual(evaluation_result['false_positives'],2/len(series)) + dataset = [self.series1] + evaluator = Evaluator(test_data=dataset) + minmax1 = MinMaxAnomalyDetector() + models={'detector_1': minmax1} + evaluator = Evaluator(test_data=dataset) + evaluation_results = evaluator.evaluate(models=models,granularity='point') + self.assertIn('detector_1',evaluation_results.keys()) + self.assertIn('anomalies_count',evaluation_results['detector_1'].keys()) + self.assertIn('anomalies_ratio',evaluation_results['detector_1'].keys()) + self.assertIn('false_positives_count',evaluation_results['detector_1'].keys()) + self.assertIn('false_positives_ratio',evaluation_results['detector_1'].keys()) + + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],3) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],3/3) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],0) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],0) + + dataset1 = [self.series2] + evaluator1 = Evaluator(test_data=dataset1) + evaluation_results = evaluator1.evaluate(models=models,granularity='point') + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],3) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],3/4) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],1) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],1/7) def test_series_granularity_evaluation(self): - series_generator = HumiTempTimeseriesGenerator() - series = series_generator.generate(include_effect_label=True, anomalies=['step_uv']) - minmax = MinMaxAnomalyDetector() - formatted_series,anomaly_labels = _format_for_anomaly_detector(series,synthetic=True) - flagged_series = minmax.apply(formatted_series) - evaluation_result = _series_granularity_evaluation(flagged_series,anomaly_labels) - # evaluation_result: - # { 'step_uv': 1 - # } - self.assertEqual(len(evaluation_result),1) - self.assertAlmostEqual(evaluation_result['step_uv'],1) - - series1 = series_generator.generate(include_effect_label=True, anomalies=[]) + dataset = [self.series1] + evaluator = Evaluator(test_data=dataset) minmax1 = MinMaxAnomalyDetector() - formatted_series1,anomaly_labels1 = _format_for_anomaly_detector(series1,synthetic=True) - flagged_series1 = minmax.apply(formatted_series1) - evaluation_result1 = _series_granularity_evaluation(flagged_series1,anomaly_labels1) - self.assertEqual(len(evaluation_result1),1) - self.assertAlmostEqual(evaluation_result1['false_positives'],1) - # evaluation_result1: - # { 'false_positives': 1 - # } - - try: - series2 = series_generator.generate(include_effect_label=True, anomalies=['spike_uv','step_uv']) - formatted_series2,anomaly_labels2 = _format_for_anomaly_detector(series2,synthetic=True) - flagged_series2 = minmax.apply(formatted_series2) - evaluation_result2 = _series_granularity_evaluation(flagged_series2,anomaly_labels2) - except Exception as e: - self.assertIsInstance(e,ValueError) + models={'detector_1': minmax1} + evaluator = Evaluator(test_data=dataset) + evaluation_results = evaluator.evaluate(models=models,granularity='series') + self.assertIn('detector_1',evaluation_results.keys()) + self.assertIn('anomalies_count',evaluation_results['detector_1'].keys()) + self.assertIn('anomalies_ratio',evaluation_results['detector_1'].keys()) + self.assertIn('false_positives_count',evaluation_results['detector_1'].keys()) + self.assertIn('false_positives_ratio',evaluation_results['detector_1'].keys()) + + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],1) + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_ratio'],1) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],0) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],0) + + dataset1 = [self.series3] + evaluator1 = Evaluator(test_data=dataset1) + evaluation_results = evaluator1.evaluate(models=models,granularity='series') + self.assertAlmostEqual(evaluation_results['detector_1']['anomalies_count'],0) + self.assertIsNone(evaluation_results['detector_1']['anomalies_ratio']) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],1) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],1) def test_double_evaluator(self): anomalies = ['step_uv'] effects = [] series_generator = HumiTempTimeseriesGenerator() - # series1 will be a true anomaly for the minmax - series1 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects) - # series2 will be a false positive for minmax (it sees always 2 anomalous data points for each variable) - series2 = series_generator.generate(include_effect_label=True, anomalies=[],effects=effects) - dataset = [series1,series2] + # series_1 will be a true anomaly for the minmax + series_1 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects) + # series_2 will be a false positive for minmax (it sees always 2 anomalous data points for each variable) + series_2 = series_generator.generate(include_effect_label=True, anomalies=[],effects=effects) + dataset = [series_1,series_2] evaluator = Evaluator(test_data=dataset) minmax1 = MinMaxAnomalyDetector() minmax2 = MinMaxAnomalyDetector()