Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 111 additions & 29 deletions ats/anomaly_detectors/base.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,106 @@
# -*- coding: utf-8 -*-
"""Anomaly detectors"""

import copy
import functools
import pandas as pd
from ats.utils import convert_timeseries_df_to_timeseries, convert_timeseries_to_timeseries_df
from ..utils import convert_timeseries_df_to_timeseries, convert_timeseries_to_timeseries_df

# Setup logging
import logging
logger = logging.getLogger(__name__)

class AnomalyDetector():

def fit(self, data, *args, **kwargs):
#========================
# Helpers
#========================
def set_apply_params(self, **kwargs):
"""
Fit the anomaly detector on some (time series) data.
Set parameters for the apply method of the timeseria model.
"""
self.apply_params = kwargs

Args:
data (pd.DataFrame or list[pd.DataFrame]): A single time series (as a pandas DataFrame) or a list of time series (each as a pandas DataFrame).
The index of the each DataFrame must be named "timestamp", and each column should represents a variable.
def get_apply_params(self):
"""
raise NotImplementedError()
Get parameters for the apply method of the timeseria model.
"""
try:
return self.apply_params
except:
return {}

def set_fit_params(self, **kwargs):
"""
Set parameters for the fit method of the timeseria model.
"""
self.fit_params = kwargs

def get_fit_params(self):
"""
Get parameters for the fit method of the timeseria model.
"""
try:
return self.fit_params
except:
return {}


#========================
# Decorators
#========================

@staticmethod
def fit_method(fit_method):
""":meta private:"""
@functools.wraps(fit_method)
def do_fit(self, data, **kwargs):

# Set fit parameters
fit_params = self.get_fit_params()
fit_params.update(kwargs)

# Call fit logic
fit_method(self, data, **fit_params)

# Mark as fitted
self.fitted = True
return do_fit

@staticmethod
def apply_method(apply_method):
""":meta private:"""
@functools.wraps(apply_method)
def do_apply(self, data, **kwargs):

# Set apply parameters
apply_params = self.get_apply_params()
apply_params.update(kwargs)

# Should we auto-fit?
if not hasattr(self, 'fitted') or hasattr(self, 'fitted') and not self.fitted:
# Model is not fitted, try to auto-fit if supported
try:
apply_model = copy.deepcopy(self)
apply_model.fit(data, **self.get_fit_params())
except NotImplementedError:
apply_model = self
else:
logger.info('Auto-fitted the anomaly detector')
else:
apply_model = self

return apply_method(apply_model, data, **apply_params)

return do_apply


#========================
# Interface
#========================

def apply(self, data, *args, **kwargs):

"""
Apply the anomaly detector on some (time series) data.

Expand All @@ -34,57 +113,60 @@ def apply(self, data, *args, **kwargs):
"""
raise NotImplementedError()


def fit(self, data, *args, **kwargs):
"""
Fit the anomaly detector on some (time series) data.

Args:
data (pd.DataFrame or list[pd.DataFrame]): A single time series (as a pandas DataFrame) or a list of time series (each as a pandas DataFrame).
The index of the each DataFrame must be named "timestamp", and each column should represents a variable.
"""
raise NotImplementedError()



class TimeseriaAnomalyDetector(AnomalyDetector):
"""
Base class for anomaly detectors wrapped from the timeseria library.
"""
"""

model_class = None

def __init__(self, *args, **kwargs):
if self.model_class is None:
raise NotImplementedError('Subclasses must define a timeseria model')
self.model = self.model_class(*args, **kwargs)
self.apply_params = kwargs.get('apply_params', {})

def set_apply_params(self, **kwargs):
"""
Set parameters for the apply method of the timeseria model.
"""
self.apply_params = kwargs

def get_apply_params(self):
"""
Get parameters for the apply method of the timeseria model.
"""
return self.apply_params

def fit(self, data, *args, **kwargs):
@AnomalyDetector.fit_method
def fit(self, data, **kwargs):
"""
Fit the timeseria anomaly detector model.
"""
"""
if not isinstance(data,pd.DataFrame):
raise NotImplementedError('Not yet implemented for non DataFrame inputs')
timeseries_df = data

# Using timeseria to fit the model
timeseries = convert_timeseries_df_to_timeseries(timeseries_df)
self.model.fit(timeseries, *args, **kwargs)
self.model.fit(timeseries, **kwargs)

def apply(self, data):
@AnomalyDetector.apply_method
def apply(self, data, **kwargs):
"""
Apply the timeseria anomaly detector model.
"""
"""
if not isinstance(data,pd.DataFrame):
raise NotImplementedError('Not yet implemented for non DataFrame inputs')
timeseries_df = data

# Using timeseria to fit and apply the model
timeseries = convert_timeseries_df_to_timeseries(timeseries_df)
timeseries = self.model.apply(timeseries, **self.get_apply_params())
timeseries = self.model.apply(timeseries, **kwargs)

# Convert back to DataFrame
timeseries_df = convert_timeseries_to_timeseries_df(timeseries)
timeseries_df['anomaly'] = (timeseries_df['anomaly'].astype(float) != 0).astype(int)

return timeseries_df
return timeseries_df

3 changes: 2 additions & 1 deletion ats/anomaly_detectors/ml/ifsom.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def _compute_features_fats_printid(row, a, sel_col):

return pd.Series(a_calc.result(method='array'), index=a_calc.result(method='features'))

@AnomalyDetector.fit_method
def fit(self, data, som_size_x=9, som_size_y=9, sigma_som=1.0, learning_rate_som=0.5, random_seed_som=29,
n_iterations_som=1000, neighborhood_function_som='gaussian', n_estimators_if='100', max_samples_if='auto',
contamination_if=0.05, max_features_if=1, random_state_if=29, exclude_extra_features=None, caching=True,
Expand Down Expand Up @@ -277,7 +278,7 @@ def fit(self, data, som_size_x=9, som_size_y=9, sigma_som=1.0, learning_rate_som
self.data['df_features'] = df_features
self.data['df_features_scaled'] = df_features_scaled


@AnomalyDetector.apply_method
def apply(self, data, *args, **kwargs):

#==============================
Expand Down
1 change: 1 addition & 0 deletions ats/anomaly_detectors/naive/minmax.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

class MinMaxAnomalyDetector(AnomalyDetector):

@AnomalyDetector.apply_method
def apply(self, data, inplace=False):

logger.info(f'Applying MinMaxAnomalyDetector with inplace={inplace}')
Expand Down
5 changes: 3 additions & 2 deletions ats/anomaly_detectors/stat/robust.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import pandas as pd
from .support_functions import detect_outliers_on_data
from ...utils import list_of_timeseries_df_to_timeseries_df, timeseries_df_to_list_of_timeseries_df
from ..base import AnomalyDetector

# Setup logging
import logging
logger = logging.getLogger(__name__)


class _COMNHARAnomalyDetector:
class _COMNHARAnomalyDetector(AnomalyDetector):
"""
Statistically robust anomaly detector based on COM, HAR, and NHAR methodologies.
"""
Expand All @@ -19,7 +20,7 @@ def __init__(self, fq=2 * np.pi / 30, fw=2 * np.pi / 7, trend=2, methods=('COM',
self.trend = trend
self.methods = methods


@AnomalyDetector.apply_method
def apply(self, data, *args, **kwargs):
"""
Apply statistical anomaly detection on time series data.
Expand Down
16 changes: 16 additions & 0 deletions ats/tests/test_evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ..evaluators import _point_granularity_evaluation
from ..evaluators import _series_granularity_evaluation
from ..evaluators import _get_breakdown_info
from ..anomaly_detectors.stat.periodic_average import PeriodicAverageAnomalyDetector
import unittest
import pandas as pd
import random as rnd
Expand Down Expand Up @@ -636,3 +637,18 @@ def test_double_evaluator(self):
}
evaluation_results = evaluator.evaluate(models=models,granularity='series')
evaluation_results = evaluator.evaluate(models=models,granularity='series')

def test_evaluate_with_autofit_model(self):

anomalies = ['step_uv']
effects = []
series_generator = HumiTempTimeseriesGenerator()
# series_1 will be a true anomaly for the minmax
series_1 = series_generator.generate(include_effect_label=True, anomalies=anomalies,effects=effects)
# series_2 will be a false positive for minmax (it sees always 2 anomalous data points for each variable)
series_2 = series_generator.generate(include_effect_label=True, anomalies=[],effects=effects)
dataset = [series_1,series_2]
evaluator = Evaluator(test_data=dataset)
models={'paverage': PeriodicAverageAnomalyDetector() }
evaluation_results = evaluator.evaluate(models=models,granularity='point')