Skip to content

Commit

Permalink
Merge pull request #74 from redBorder/feature/ip_identifier
Browse files Browse the repository at this point in the history
PR-63: Outliers identifier
  • Loading branch information
malvads authored Nov 19, 2024
2 parents e49f5eb + cd864ff commit 5f6c48c
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 2 deletions.
141 changes: 141 additions & 0 deletions resources/src/ai/outliers_identifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright (C) 2024 Eneo Tecnologia S.L.
#
# Authors:
# Miguel Álvarez Adsuara <malvarez@redborder.com>
#
# This program is free software: you can redistribute it and/or modify it under the terms of the
# GNU Affero General Public License as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.

import json
import pandas as pd
from resources.src.logger import logger
from sklearn.ensemble import IsolationForest

class OutlierIdentifier:
def __init__(self):
self.df = None
self.model = None

def prepare_data(self, all_ips_data):
"""
Prepare the data by flattening the input data, extracting relevant features,
and computing rolling statistics.
Args:
all_ips_data (dict): Dictionary containing time-series data for each IP.
"""
flattened_data = []
for ip, ip_data in all_ips_data.items():
for entry in ip_data:
flattened_data.append({
"ip": ip,
"timestamp": entry.get("timestamp"),
"bytes": entry.get("result", {}).get("bytes", 0),
})

self.df = pd.DataFrame(flattened_data)
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
self.df['hour'] = self.df['timestamp'].dt.hour
self.df['minute'] = self.df['timestamp'].dt.minute
self.df['day'] = self.df['timestamp'].dt.day
self.df['dayofweek'] = self.df['timestamp'].dt.dayofweek
self.df['dayofyear'] = self.df['timestamp'].dt.dayofyear

self.df['rolling_mean'] = self.df['bytes'].rolling(window=5, min_periods=1).mean()
self.df['rolling_std'] = self.df['bytes'].rolling(window=5, min_periods=1).std()

self.df['rolling_mean'] = self.df['rolling_mean'].fillna(0)
self.df['rolling_std'] = self.df['rolling_std'].fillna(0)

self.df['low_traffic'] = self.df['bytes'] == 0

def train_model(self, X_train):
"""
Train the Isolation Forest model on the provided training data.
Args:
X_train (DataFrame): The training set features.
"""
self.model = IsolationForest(contamination=0.05, random_state=42)
self.model.fit(X_train)

def identify_implicated_ips(self, outliers):
"""
Identify IPs that contributed to the outlier events.
Args:
outliers (list): A list of outlier events with timestamps and expected values.
Returns:
dict: A dictionary with implicated IPs for each outlier event.
"""
self.df['outlier'] = self.model.predict(self.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']])
self.df['outlier'] = self.df['outlier'].apply(lambda x: 'anomaly' if x == -1 else 'normal')

implicated_ips = {"ips": []}
for outlier in outliers:
timestamp = outlier["timestamp"]
outlier_data = self.df[self.df['timestamp'] == timestamp]

implicated_ips["ips"].append({
"caused_by": list(outlier_data[outlier_data['outlier'] == 'anomaly']['ip'])
})

return implicated_ips

def execute(self, outliers, all_ips_data):
"""
Execute the full pipeline for detecting outliers and identifying implicated IPs.
Args:
outliers (list): A list of outlier events.
all_ips_data (dict): Dictionary containing time-series data for each IP.
Returns:
json: A JSON string with the implicated IPs and outlier information.
"""
self.prepare_data(all_ips_data)
self.train_model(self.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']])

implicated_ips = self.identify_implicated_ips(outliers)

logger.logger.error(implicated_ips)

return json.dumps(implicated_ips) if implicated_ips else {"ips": []}

def train_and_execute_model(self, outliers, all_ips_data):
"""
Wrapper function to handle errors during model training and execution.
Args:
outliers (list): A list of outliers to process.
all_ips_data (dict): Dictionary of IP data.
Returns:
json: A JSON response with the result or error message.
"""
try:
return self.execute(outliers, all_ips_data)
except Exception as e:
logger.logger.error("Could not execute anomaly detection")
return self.return_error(e)

def return_error(self, error="error"):
"""
Return a JSON formatted error message.
Args:
error (str): The error message to return.
Returns:
dict: A dictionary containing the error status and message.
"""
return { "status": "error", "msg": error }
32 changes: 30 additions & 2 deletions resources/src/server/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
from flask import Flask, jsonify, request

from resources.src.redborder.s3 import S3
from resources.src.ai import outliers, shallow_outliers
from resources.src.ai import outliers, shallow_outliers, outliers_identifier
from resources.src.druid import client, query_builder
from resources.src.logger import logger
from resources.src.config import configmanager


'''
Init local variables
'''
Expand Down Expand Up @@ -63,11 +62,13 @@ def __init__(self):
self.start_s3_sync_thread()
self.app = Flask(__name__)
self.app.add_url_rule('/api/v1/outliers', view_func=self.calculate, methods=['POST'])
self.app.add_url_rule('/api/v1/ip_identifier', view_func=self.identify_ip, methods=['POST'])
self.exit_code = 0
self.shallow = shallow_outliers.ShallowOutliers(
sensitivity = config.get("ShallowOutliers", "sensitivity"),
contamination = config.get("ShallowOutliers", "contamination")
)
self.identifier = outliers_identifier.OutlierIdentifier()
self.ai_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ai")
self.deep_models={}

Expand Down Expand Up @@ -116,6 +117,33 @@ def calculate(self):
logger.logger.info("Starting outliers execution")
return self.execute_model(data, config.get("Outliers","metric"), model)

def identify_ip(self):
"""
Process the incoming request to identify implicated IPs based on outlier data.
Returns:
Response: A JSON response with implicated IPs or an error message.
"""
try:
payload = json.loads(request.form.get('payload', '{}'))

outliers = payload.get('outliers', [])
all_ips_data = payload.get('all_ips_data', {})

if not isinstance(outliers, list) or not isinstance(all_ips_data, dict):
return jsonify({"error": "Invalid data format"}), 400

result = self.identifier.train_and_execute_model(outliers, all_ips_data)

logger.logger.error(result)

return jsonify(result), 200

except Exception as e:
logger.logger.error(f"Exception in identify_ip: {e}")
return jsonify({"error": "An internal error has occurred!"}), 500


def decode_b64_json(self, b64_json):
"""
Decode a base64 json into a python dictionary.
Expand Down
127 changes: 127 additions & 0 deletions resources/tests/test_ip_identifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (C) 2024 Eneo Tecnologia S.L.
#
# Authors:
# Miguel Álvarez Adsuara <malvarez@redborder.com>
#
# This program is free software: you can redistribute it and/or modify it under the terms of the
# GNU Affero General Public License as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.

import json
import unittest
import pandas as pd
from resources.src.ai.outliers_identifier import OutlierIdentifier

class TestOutlierIdentifier(unittest.TestCase):

def setUp(self):
self.identifier = OutlierIdentifier()

def test_prepare_data_valid_input(self):
data = {
"192.168.1.1": [
{"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 500}},
{"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 300}}
]
}
self.identifier.prepare_data(data)
self.assertIsInstance(self.identifier.df, pd.DataFrame)
self.assertIn('timestamp', self.identifier.df.columns)
self.assertIn('bytes', self.identifier.df.columns)
self.assertEqual(len(self.identifier.df), 2)

def test_prepare_data_missing_bytes(self):
data = {
"192.168.1.1": [
{"timestamp": "2024-11-14T12:00:00", "result": {}},
{"timestamp": "2024-11-14T12:05:00", "result": {}}
]
}
self.identifier.prepare_data(data)
self.assertEqual(self.identifier.df['bytes'].sum(), 0)

def test_prepare_data_empty_input(self):
data = {}
self.identifier.prepare_data(data)
self.assertTrue(self.identifier.df.empty)

def test_train_model_valid_data(self):
data = {
"192.168.1.1": [
{"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 500}},
{"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 300}},
{"timestamp": "2024-11-14T12:10:00", "result": {"bytes": 1000}},
{"timestamp": "2024-11-14T12:15:00", "result": {"bytes": 700}},
{"timestamp": "2024-11-14T12:20:00", "result": {"bytes": 0}}
]
}
self.identifier.prepare_data(data)
try:
self.identifier.train_model(self.identifier.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']])
except Exception as e:
self.fail(f"Training failed with exception: {e}")

def test_identify_implicated_ips_no_outliers(self):
data = {
"192.168.1.1": [
{"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 100}},
{"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 100}}
]
}
outliers = []
self.identifier.prepare_data(data)
self.identifier.train_model(self.identifier.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']])
result = self.identifier.identify_implicated_ips(outliers)
self.assertEqual(result, {"ips": []})

def test_identify_implicated_ips_with_outliers(self):
data = {
"192.168.1.1": [
{"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 100}},
{"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 1000}}, # Anomalous traffic
{"timestamp": "2024-11-14T12:10:00", "result": {"bytes": 100}},
]
}
outliers = [{"timestamp": "2024-11-14T12:05:00"}]
self.identifier.prepare_data(data)
self.identifier.train_model(self.identifier.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']])
result = self.identifier.identify_implicated_ips(outliers)
self.assertIn("192.168.1.1", result["ips"][0]["caused_by"])

def test_execute_with_valid_input(self):
data = {
"192.168.1.1": [
{"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 500}},
{"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 300}},
{"timestamp": "2024-11-14T12:10:00", "result": {"bytes": 1000}}
]
}
outliers = [{"timestamp": "2024-11-14T12:10:00"}]
result = self.identifier.execute(outliers, data)
self.assertIsInstance(result, str)
parsed_result = json.loads(result)
self.assertIn("ips", parsed_result)
self.assertEqual(len(parsed_result["ips"]), 1)

def test_train_and_execute_model_error_handling(self):
data = None # Invalid data
outliers = [{"timestamp": "2024-11-14T12:10:00"}]
result = self.identifier.train_and_execute_model(outliers, data)
self.assertIn("status", result)
self.assertEqual(result["status"], "error")

def test_return_error(self):
error_message = "Test error"
result = self.identifier.return_error(error_message)
self.assertEqual(result["status"], "error")
self.assertEqual(result["msg"], error_message)

if __name__ == "__main__":
unittest.main()

0 comments on commit 5f6c48c

Please sign in to comment.