Skip to content

Commit

Permalink
Improved anomaly detection so it doesn't always detect an anomaly
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Rodríguez Flores committed May 31, 2024
1 parent 0942d37 commit a9f7f3a
Showing 1 changed file with 34 additions and 8 deletions.
42 changes: 34 additions & 8 deletions resources/src/ai/shallow_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ def predict(self, arr):
smooth_arr = np.convolve(padded_arr, kernel, mode='valid')
return smooth_arr

def get_outliers(self, arr, smoothed_arr):
def get_outliers(self, arr, smoothed_arr, other=None):
"""
Given an array of data points and an aproximation of it, return a boolean array
with the same shape as the original array which is True when the data point is
an outlier and False otherwise.
The method used for outlier detection is an isolation forest, which will look for
the 0.3% most isolated points when taking into account the original value, the
smoothed valued, the diference between them (error) and the squared diference
between them.
the 1% most isolated points when taking into account the original value, the
smoothed valued, the absolute diference between them (MAE) and the sign of the
difference between them.
Args:
arr (numpy.ndarray): 1D numpy array where the outliers shall be detected.
Expand All @@ -87,13 +87,38 @@ def get_outliers(self, arr, smoothed_arr):
numpy.ndarray: 1D numpy array with the smoothed data.
"""
error = arr-smoothed_arr
loss = error**2
data = np.stack((arr,smoothed_arr,error,loss), axis = 1)
model = IsolationForest(n_estimators=100, contamination=0.003)
sign = np.sign(error)
data = np.stack((smoothed_arr, np.abs(error), sign), axis=1)
if other is not None:
data = np.concatenate([data, other], axis=1)
model = IsolationForest(n_estimators=100, contamination=0.01, random_state=42)
model.fit(data)
model.offset_=-0.05+0.95*model.offset_
outliers = model.predict(data)==-1
return outliers

def encode_timestamp(self, timestamp):
"""
Takes a pandas Series of timestamps and returns a numpy array with a sine encoding for the
hour of day and a cosine encoding for the day of the week. This encoding helps the model to
learn periodic patterns in the data while maintaining simplicity.
Parameters:
timestamps (pd.Series): A Pandas Series of timestamps.
Returns:
pd.DataFrame: A DataFrame with sine-cosine encodings for daily and weekly periods.
"""
if not isinstance(timestamp, pd.Series):
raise ValueError("Input must be a Pandas Series")
timestamp = pd.to_datetime(timestamp)
hour_of_day = timestamp.dt.hour + timestamp.dt.minute/60
day_of_week = timestamp.dt.dayofweek + hour_of_day/24
daily_sin = np.sin(2*np.pi*hour_of_day/24)
weekly_cos = np.cos(2*np.pi*day_of_week/7)
encoded = np.stack((daily_sin, weekly_cos), axis=1)
return encoded

def compute_json(self, raw_json):
"""
Main method used for anomaly detection.
Expand All @@ -111,7 +136,8 @@ def compute_json(self, raw_json):
data = pd.json_normalize(raw_json)
arr = data.iloc[:, 1].values
smoothed_arr = self.predict(arr)
outliers = self.get_outliers(arr, smoothed_arr)
encoded_timestamp = self.encode_timestamp(data["timestamp"])
outliers = self.get_outliers(arr, smoothed_arr, other=encoded_timestamp)
data["smooth"] = smoothed_arr
predicted = data[["timestamp","smooth"]].rename(columns={"smooth":"forecast"})
anomalies = data[["timestamp","smooth"]].rename(columns={"smooth":"expected"}).loc[outliers]
Expand Down

0 comments on commit a9f7f3a

Please sign in to comment.