diff --git a/ats/evaluators.py b/ats/evaluators.py index d4862cc..25d9f5e 100644 --- a/ats/evaluators.py +++ b/ats/evaluators.py @@ -114,9 +114,6 @@ def _copy_dataset(self,dataset,models): return dataset_copies def evaluate(self,models={},granularity='point',strategy='flags',breakdown=False): - if strategy != 'flags': - raise NotImplementedError(f'Evaluation strategy {strategy} is not implemented') - if not models: raise ValueError('There are no models to evaluate') if not self.test_data: @@ -138,11 +135,23 @@ def evaluate(self,models={},granularity='point',strategy='flags',breakdown=False flagged_dataset = _get_model_output(dataset_copies[j],model) for i,sample_df in enumerate(flagged_dataset): if granularity == 'point': - single_model_evaluation[f'sample_{i+1}'] = _point_granularity_evaluation(sample_df,anomaly_labels_list[i],breakdown=breakdown) + if strategy == 'flags': + single_model_evaluation[f'sample_{i+1}'] = _point_granularity_evaluation(sample_df,anomaly_labels_list[i],breakdown=breakdown) + elif strategy == 'events': + single_model_evaluation[f'sample_{i+1}'] = _point_eval_with_events_strategy(sample_df,anomaly_labels_list[i],breakdown=breakdown) + elif granularity == 'variable': - single_model_evaluation[f'sample_{i+1}'] = _variable_granularity_evaluation(sample_df,anomaly_labels_list[i], breakdown = breakdown) + if strategy == 'flags': + single_model_evaluation[f'sample_{i+1}'] = _variable_granularity_evaluation(sample_df,anomaly_labels_list[i], breakdown = breakdown) + elif strategy == 'events': + single_model_evaluation[f'sample_{i+1}'] = _variable_eval_with_events_strategy(sample_df,anomaly_labels_list[i],breakdown=breakdown) + elif granularity == 'series': - single_model_evaluation[f'sample_{i+1}'] = _series_granularity_evaluation(sample_df,anomaly_labels_list[i], breakdown = breakdown) + if strategy == 'flags': + single_model_evaluation[f'sample_{i+1}'] = _series_granularity_evaluation(sample_df,anomaly_labels_list[i], breakdown = breakdown) + elif strategy == 'events': + single_model_evaluation[f'sample_{i+1}'] = _series_eval_with_events_strategy(sample_df,anomaly_labels_list[i],breakdown=breakdown) + else: raise ValueError(f'Unknown granularity {granularity}') @@ -360,3 +369,88 @@ def _series_granularity_evaluation(flagged_timeseries_df,anomaly_labels_df,break return one_series_evaluation_result | breakdown_info else: return one_series_evaluation_result + +def _variable_eval_with_events_strategy(sample_df,anomaly_labels_df,breakdown=False): + raise NotImplementedError('Evaluation with events strategy and variable granularity not implemented') + +def _point_eval_with_events_strategy(flagged_timeseries_df,anomaly_labels_df,breakdown=False): + detected_events_n = 0 + false_positives_n = 0 + total_events_n = 0 + events_n, event_type_counts, event_time_slots = _count_anomalous_events(anomaly_labels_df) + evaluation_result = {} + breakdown_info = {} + + flags_df = flagged_timeseries_df.filter(like='anomaly') + previous_point_info = {'label': 0} + for timestamp in flagged_timeseries_df.index: + anomaly_label = anomaly_labels_df.loc[timestamp] + # True if there is at least 1 variable detected as anomalous + is_anomalous = flags_df.loc[timestamp].any() + point_info = {anomaly_label: is_anomalous} + if point_info != previous_point_info and is_anomalous: + if anomaly_label is None: + false_positives_n += 1 + previous_point_info = point_info + + evaluation_result['false_positives_count'] = false_positives_n + evaluation_result['false_positives_ratio'] = false_positives_n/len(anomaly_labels_df) + + for anomaly, anomaly_time_slots in event_time_slots.items(): + anomaly_n = int(len(anomaly_time_slots))/2 + total_events_n += anomaly_n + detected_anomaly_n = 0 + for i in range(0,len(anomaly_time_slots),2): + start = anomaly_time_slots[i] + stop = anomaly_time_slots[i+1] + is_detected = flags_df.loc[start:stop].any().any() + if is_detected: + detected_anomaly_n +=1 + breakdown_info[anomaly + '_true_positives_count'] = detected_anomaly_n + breakdown_info[anomaly + '_true_positives_rate'] = detected_anomaly_n/anomaly_n + detected_events_n += detected_anomaly_n + + evaluation_result['true_positives_count'] = detected_events_n + if events_n: + evaluation_result['true_positives_rate'] = detected_events_n/total_events_n + else: + evaluation_result['true_positives_rate'] = None + + if breakdown: + return evaluation_result | breakdown_info + else: + return evaluation_result + +def _series_eval_with_events_strategy(sample_df,anomaly_labels_df,breakdown=False): + raise NotImplementedError('Evaluation with events strategy and series granularity not implemented') + +def _count_anomalous_events(anomaly_labels_df): + events_n = 0 + event_type_counts = {} + event_time_slots = {} + previous_anomaly_label = None + for timestamp in anomaly_labels_df.index: + anomaly_label = anomaly_labels_df.loc[timestamp] + if anomaly_label != previous_anomaly_label: + if anomaly_label is not None: + events_n += 1 + # To manage series with adjoining anomalies + if previous_anomaly_label is not None: + event_time_slots[key].append(previous_timestamp) + key = anomaly_label + if anomaly_label in event_type_counts.keys(): + event_type_counts[key] +=1 + event_time_slots[key].append(timestamp) + else: + event_type_counts[key] =1 + event_time_slots[key] = [timestamp] + + else: + event_time_slots[key].append(previous_timestamp) + + previous_timestamp = timestamp + previous_anomaly_label = anomaly_label + # To manage series ending with an anomaly + if previous_anomaly_label is not None: + event_time_slots[key].append(previous_timestamp) + return events_n , event_type_counts, event_time_slots diff --git a/ats/tests/test_evaluators.py b/ats/tests/test_evaluators.py index 7d293c6..ca5c80d 100644 --- a/ats/tests/test_evaluators.py +++ b/ats/tests/test_evaluators.py @@ -11,6 +11,12 @@ from ..evaluators import _series_granularity_evaluation from ..evaluators import _get_breakdown_info from ..anomaly_detectors.stat.periodic_average import PeriodicAverageAnomalyDetector +from ats.anomaly_detectors.stat.robust import NHARAnomalyDetector +from ..evaluators import _count_anomalous_events +from ..evaluators import _point_eval_with_events_strategy +from ats.dataset_generators import HumiTempDatasetGenerator +from ..utils import ensure_full_reproducibility + import unittest import pandas as pd import random as rnd @@ -24,9 +30,7 @@ class TestEvaluators(unittest.TestCase): def setUp(self): - - rnd.seed(123) - np.random.seed(123) + ensure_full_reproducibility(123) self.series1 = generate_timeseries_df(entries=5, variables=2) self.series1['anomaly_label'] = [None, 'anomaly_2', 'anomaly_1', None, 'anomaly_1'] @@ -652,3 +656,300 @@ def test_evaluate_with_autofit_model(self): models={'paverage': PeriodicAverageAnomalyDetector() } evaluation_results = evaluator.evaluate(models=models,granularity='point') + def test_count_anomalous_events(self): + humi_temp_generator = HumiTempTimeseriesGenerator() + timeseries_df = humi_temp_generator.generate(include_effect_label=False, anomalies=['step_uv']) + anomalous_events,events_by_type, event_time_slots = _count_anomalous_events(timeseries_df.loc[:,'anomaly_label']) + self.assertEqual(anomalous_events,1) + self.assertIsInstance(events_by_type,dict) + self.assertEqual(events_by_type['step_uv'],1) + '''for timestamp in timeseries_df.index: + print(f"{timestamp} {timeseries_df.loc[timestamp,'anomaly_label']}")''' + # 1973-05-25 01:17:00+00:00 - 1973-06-01 02:02:00+00:00 + self.assertEqual(event_time_slots['step_uv'][0],pd.Timestamp('1973-05-25 01:17:00+00:00')) + self.assertEqual(event_time_slots['step_uv'][1],pd.Timestamp('1973-06-01 02:02:00+00:00')) + + def test_count_anomalous_events_with_point_anomaly(self): + humi_temp_generator = HumiTempTimeseriesGenerator() + timeseries_df = humi_temp_generator.generate(include_effect_label=False, anomalies=['spike_uv']) + anomalous_events,events_by_type, event_time_slots = _count_anomalous_events(timeseries_df.loc[:,'anomaly_label']) + self.assertEqual(anomalous_events,1) + self.assertEqual(events_by_type['spike_uv'],1) + '''for timestamp in timeseries_df.index: + print(f"{timestamp} {timeseries_df.loc[timestamp,'anomaly_label']}")''' + # 1973-05-02 15:47:00+00:00 - 1973-05-02 15:47:00+00:00 + self.assertEqual(event_time_slots['spike_uv'][0],pd.Timestamp('1973-05-02 15:47:00+00:00')) + self.assertEqual(event_time_slots['spike_uv'][1],pd.Timestamp('1973-05-02 15:47:00+00:00')) + + def test_point_eval_with_events_strategy(self): + # model output + series = generate_timeseries_df(entries=6, variables=1) + series['value_anomaly'] = [0,1,1,1,1,1] + + anomaly_labels = pd.Series([None, 'anomaly_1', 'anomaly_1', None, None,'anomaly_1']) + anomaly_labels.index = series.index + evaluation_result = _point_eval_with_events_strategy(series,anomaly_labels) + self.assertAlmostEqual(evaluation_result['true_positives_count'],2) + self.assertAlmostEqual(evaluation_result['true_positives_rate'],2/2) + self.assertAlmostEqual(evaluation_result['false_positives_count'],1) + self.assertAlmostEqual(evaluation_result['false_positives_ratio'],1/6) + + def test_point_eval_with_events_strategy_and_breakdown(self): + # model output + series = generate_timeseries_df(entries=6, variables=1) + series['value_anomaly'] = [0,1,1,1,1,1] + + anomaly_labels = pd.Series([None, 'anomaly_1', 'anomaly_1', None, None,'anomaly_1']) + anomaly_labels.index = series.index + evaluation_result = _point_eval_with_events_strategy(series,anomaly_labels,breakdown=True) + self.assertIn('anomaly_1_true_positives_count',evaluation_result.keys()) + self.assertAlmostEqual(evaluation_result['anomaly_1_true_positives_count'],2) + self.assertAlmostEqual(evaluation_result['anomaly_1_true_positives_rate'],1) + + def test_eval_point_granularity_events_strategy(self): + dataset = [self.series1, self.series2, self.series3] + minmax1 = MinMaxAnomalyDetector() + minmax2 = MinMaxAnomalyDetector() + minmax3 = MinMaxAnomalyDetector() + models={'detector_1': minmax1, + 'detector_2': minmax2, + 'detector_3': minmax3 + } + evaluator = Evaluator(test_data=dataset) + evaluation_results = evaluator.evaluate(models=models,granularity='point',strategy='events') + self.assertAlmostEqual(evaluation_results['detector_1']['true_positives_count'],6) + self.assertAlmostEqual(evaluation_results['detector_1']['true_positives_rate'],7/8) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],2) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],10/(21*3)) + + def test_eval_point_granularity_events_strategy_with_breakdown(self): + dataset = [self.series1, self.series2, self.series3] + minmax = MinMaxAnomalyDetector() + models={'detector_1': minmax} + evaluator = Evaluator(test_data=dataset) + evaluation_results = evaluator.evaluate(models=models,granularity='point',strategy='events',breakdown=True) + self.assertAlmostEqual(evaluation_results['detector_1']['true_positives_count'],6) + self.assertAlmostEqual(evaluation_results['detector_1']['true_positives_rate'],7/8) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_count'],2) + self.assertAlmostEqual(evaluation_results['detector_1']['false_positives_ratio'],10/(21*3)) + self.assertAlmostEqual(evaluation_results['detector_1']['anomaly_1_true_positives_count'],4) + self.assertAlmostEqual(evaluation_results['detector_1']['anomaly_1_true_positives_rate'],5/6) + self.assertAlmostEqual(evaluation_results['detector_1']['anomaly_2_true_positives_count'],2) + self.assertAlmostEqual(evaluation_results['detector_1']['anomaly_2_true_positives_rate'],1) + + def test_correct_counting_false_positives_with_events_strategy(self): + effects = ['noise', 'clouds'] + anomalies = ['spike_mv', 'step_mv'] + generator = HumiTempDatasetGenerator(sampling_interval='60m') + evaluation_dataset = generator.generate(n_series = 1, effects = effects, anomalies = anomalies, + time_span = '90D', max_anomalies_per_series = 3, + anomalies_ratio = 1.0, auto_repeat_anomalies=True) + models = {'minmax': MinMaxAnomalyDetector(), + 'nhar': NHARAnomalyDetector(), + 'p_avg': PeriodicAverageAnomalyDetector() + } + evaluator = Evaluator(test_data = evaluation_dataset) + + series = evaluation_dataset[0] + anomalous_events_n, events_by_type_n, event_time_slots= _count_anomalous_events(series.loc[:,'anomaly_label']) + evaluation_results = evaluator.evaluate(models=models,granularity='point',strategy='events',breakdown=False) + + for model in evaluation_results.keys(): + tp_n = evaluation_results[model]['true_positives_count'] + tp_rate = evaluation_results[model]['true_positives_rate'] + if tp_rate: + self.assertAlmostEqual(anomalous_events_n, tp_n / tp_rate) + + fp_n = evaluation_results[model]['false_positives_count'] + fp_ratio = evaluation_results[model]['false_positives_ratio'] + if fp_ratio: + self.assertAlmostEqual(len(series), fp_n / fp_ratio) + + def test_count_anomalous_events_on_synth_dataset(self): + anomalies = ['step_mv','pattern_mv','spike_mv'] + generator = HumiTempDatasetGenerator(sampling_interval='60m') + evaluation_dataset = generator.generate(n_series = 3, effects = ['noise'], anomalies = anomalies, + time_span = '90D', max_anomalies_per_series = 3, + anomalies_ratio = 1.0, auto_repeat_anomalies=True) + series_1 = evaluation_dataset[0] + '''for timestamp in series_1.index: + print(series_1.loc[timestamp,'anomaly_label'])''' + # series_1 + # 1 step + # 0 pattern + # 0 spike + anomalous_events_n, events_by_type_n ,event_time_slots= _count_anomalous_events(series_1.loc[:,'anomaly_label']) + self.assertIn('step_mv',events_by_type_n.keys()) + self.assertEqual(events_by_type_n['step_mv'],1) + self.assertEqual(anomalous_events_n,1) + + series_2 = evaluation_dataset[1] + '''for timestamp in series_2.index: + print(series_2.loc[timestamp,'anomaly_label'])''' + # series_2 + # 1 step + # 0 pattern + # 0 spike + anomalous_events_n_2, events_by_type_n_2,event_time_slots_2 = _count_anomalous_events(series_2.loc[:,'anomaly_label']) + self.assertIn('step_mv',events_by_type_n_2.keys()) + self.assertEqual(events_by_type_n_2['step_mv'],1) + self.assertEqual(anomalous_events_n_2,1) + + series_3 = evaluation_dataset[2] + '''for timestamp in series_3.index: + print(f"{timestamp} {series_3.loc[timestamp,'anomaly_label']}")''' + # series_3 + # 1 step + # 1 pattern + # 1 spike + anomalous_events_n_3, events_by_type_n_3 , event_time_slots_3= _count_anomalous_events(series_3.loc[:,'anomaly_label']) + self.assertIn('step_mv',events_by_type_n_3.keys()) + self.assertIn('pattern_mv',events_by_type_n_3.keys()) + self.assertEqual(events_by_type_n_3['step_mv'],1) + self.assertEqual(events_by_type_n_3['pattern_mv'],1) + self.assertEqual(events_by_type_n_3['spike_mv'],1) + self.assertEqual(anomalous_events_n_3,3) + # spike : 1976-01-02 14:50:00+00:00 - 1976-01-02 14:50:00+00:00 + self.assertEqual(event_time_slots_3['spike_mv'][0],pd.Timestamp('1976-01-02 14:50:00+00:00')) + self.assertEqual(event_time_slots_3['spike_mv'][1],pd.Timestamp('1976-01-02 14:50:00+00:00')) + # step : 1975-12-25 16:50:00+00:00 - 1976-01-02 03:50:00+00:00 + self.assertEqual(event_time_slots_3['step_mv'][0],pd.Timestamp('1975-12-25 16:50:00+00:00')) + self.assertEqual(event_time_slots_3['step_mv'][1],pd.Timestamp('1976-01-02 03:50:00+00:00')) + # pattern : 1975-11-13 04:50:00+00:00 - 1975-11-23 08:50:00+00:00 + self.assertEqual(event_time_slots_3['pattern_mv'][0],pd.Timestamp('1975-11-13 04:50:00+00:00')) + self.assertEqual(event_time_slots_3['pattern_mv'][1],pd.Timestamp('1975-11-23 08:50:00+00:00')) + + def test_event_eval_on_p_avg(self): + anomalies = ['step_mv'] + generator = HumiTempDatasetGenerator(sampling_interval='60m') + evaluation_dataset = generator.generate(n_series = 1, effects = ['noise'], anomalies = anomalies, + time_span = '90D', max_anomalies_per_series = 1, + anomalies_ratio = 1.0, auto_repeat_anomalies=True) + series = evaluation_dataset[0] + '''for timestamp in series.index: + print(series.loc[timestamp,'anomaly_label'])''' + # series + # 1 step + # 0 pattern + # 0 spike + anomalous_events_n, events_by_type_n ,event_time_slots= _count_anomalous_events(series.loc[:,'anomaly_label']) + self.assertIn('step_mv',events_by_type_n.keys()) + self.assertEqual(events_by_type_n['step_mv'],1) + self.assertEqual(anomalous_events_n,1) + + model = PeriodicAverageAnomalyDetector() + new_series = series.drop(columns=['anomaly_label'],inplace=False) + p_avg_output = model.apply(new_series) + '''for timestamp in p_avg_output.index: + print(f"{p_avg_output.loc[timestamp,'anomaly']} {series.loc[timestamp,'anomaly_label']}")''' + evaluator = Evaluator(evaluation_dataset) + evaluation_results = evaluator.evaluate(models={'p_avg':model},granularity='point',strategy='events',breakdown=False) + self.assertEqual(evaluation_results['p_avg']['true_positives_count'],1) + self.assertEqual(evaluation_results['p_avg']['true_positives_rate'],1) + + def test_event_eval_on_nhar(self): + anomalies = ['step_mv'] + generator = HumiTempDatasetGenerator(sampling_interval='60m') + evaluation_dataset = generator.generate(n_series = 1, effects = ['noise'], anomalies = anomalies, + time_span = '90D', max_anomalies_per_series = 1, + anomalies_ratio = 1.0, auto_repeat_anomalies=True) + series = evaluation_dataset[0] + '''for timestamp in series.index: + print(series.loc[timestamp,'anomaly_label'])''' + # series + # 1 step + # 0 pattern + # 0 spike + anomalous_events_n, events_by_type_n , event_time_slots= _count_anomalous_events(series.loc[:,'anomaly_label']) + self.assertIn('step_mv',events_by_type_n.keys()) + self.assertEqual(events_by_type_n['step_mv'],1) + self.assertEqual(anomalous_events_n,1) + + model = NHARAnomalyDetector() + new_series = series.drop(columns=['anomaly_label'],inplace=False) + nhara_output = model.apply(new_series) + '''for timestamp in nhara_output.index: + print(f"{nhara_output.loc[timestamp,'anomaly']} {series.loc[timestamp,'anomaly_label']}")''' + evaluator = Evaluator(evaluation_dataset) + evaluation_results = evaluator.evaluate(models={'nhar':model},granularity='point',strategy='events',breakdown=False) + self.assertEqual(evaluation_results['nhar']['true_positives_count'],1) + self.assertEqual(evaluation_results['nhar']['true_positives_rate'],1/1) + + def test_actual_anomalies_count(self): + # model output + series = generate_timeseries_df(entries=6, variables=1) + series['value_anomaly'] = [0,1,1,1,1,1] + + anomaly_labels = pd.Series([None, 'anomaly_1', 'anomaly_1', None, None,'anomaly_1']) + anomaly_labels.index = series.index + events_n , event_type_counts, event_time_slots = _count_anomalous_events(anomaly_labels) + self.assertEqual(len(event_time_slots['anomaly_1']),4) + self.assertEqual(event_time_slots['anomaly_1'][0],pd.Timestamp('2025-06-10 15:00:00+00:00')) + self.assertEqual(event_time_slots['anomaly_1'][1],pd.Timestamp('2025-06-10 16:00:00+00:00')) + self.assertEqual(event_time_slots['anomaly_1'][2],pd.Timestamp('2025-06-10 19:00:00+00:00')) + self.assertEqual(event_time_slots['anomaly_1'][3],pd.Timestamp('2025-06-10 19:00:00+00:00')) + '''for timestamp in anomaly_labels.index: + print(f'{timestamp}') + for key,value in event_time_slots.items(): + print(f'{key}: {value}')''' + + def test_event_eval_on_dataset(self): + generator = HumiTempDatasetGenerator(sampling_interval='2h') + evaluation_dataset = generator.generate( + n_series = 2, + effects = [], + anomalies = ['spike_mv'], + time_span = '5D', + max_anomalies_per_series = 3, + anomalies_ratio = 1.0, + auto_repeat_anomalies=True, + ) + series_1 = evaluation_dataset[0] + '''for timestamp in series_1.index: + print(f"{series_1.loc[timestamp,'anomaly_label']}")''' + # Anomalies in series_1 + # 1 spike + + series_2 = evaluation_dataset[1] + '''for timestamp in series_2.index: + print(f"{series_2.loc[timestamp,'anomaly_label']}")''' + # Anomalies in series_2 + # 1 spike + anomaly_detectors = {} + anomaly_detectors['minmax'] = MinMaxAnomalyDetector() + anomaly_detectors['nhar'] = NHARAnomalyDetector() + evaluator = Evaluator(evaluation_dataset) + evaluation = evaluator.evaluate(anomaly_detectors, + granularity='point', + strategy='events', + breakdown=True) + + new_series_1 = series_1.drop(columns=['anomaly_label'],inplace=False) + new_series_2 = series_2.drop(columns=['anomaly_label'],inplace=False) + new_dataset = [new_series_1,new_series_2] + + minmax_output = _get_model_output(new_dataset,anomaly_detectors['minmax']) + '''for timestamp in minmax_output[0].index: + print(f"{series_1.loc[timestamp,'anomaly_label']} {minmax_output[0].loc[timestamp,'temperature_anomaly']} {minmax_output[0].loc[timestamp,'humidity_anomaly']}")''' + '''for timestamp in minmax_output[1].index: + print(f"{series_2.loc[timestamp,'anomaly_label']} {minmax_output[1].loc[timestamp,'temperature_anomaly']} {minmax_output[1].loc[timestamp,'humidity_anomaly']}")''' + + self.assertEqual(evaluation['minmax']['true_positives_count'],2) + self.assertEqual(evaluation['minmax']['true_positives_rate'],1) + self.assertEqual(evaluation['minmax']['false_positives_count'],2) + self.assertEqual(evaluation['minmax']['false_positives_ratio'],1/60) + self.assertEqual(evaluation['minmax']['spike_mv_true_positives_count'],2) + self.assertEqual(evaluation['minmax']['spike_mv_true_positives_rate'],1) + + nhar_output = _get_model_output(new_dataset,anomaly_detectors['nhar']) + '''for timestamp in nhar_output[0].index: + print(f"{series_1.loc[timestamp,'anomaly_label']} {nhar_output[0].loc[timestamp,'anomaly']} {timestamp}") + for timestamp in nhar_output[1].index: + print(f"{series_2.loc[timestamp,'anomaly_label']} {nhar_output[1].loc[timestamp,'anomaly']} {timestamp}")''' + + self.assertEqual(evaluation['nhar']['true_positives_count'],2) + self.assertAlmostEqual(evaluation['nhar']['true_positives_rate'],1) + self.assertEqual(evaluation['nhar']['false_positives_count'],6) + self.assertAlmostEqual(evaluation['nhar']['false_positives_ratio'],6/120) + self.assertEqual(evaluation['nhar']['spike_mv_true_positives_count'],2) + self.assertAlmostEqual(evaluation['nhar']['spike_mv_true_positives_rate'],1)