diff --git a/resources/src/ai/outliers_identifier.py b/resources/src/ai/outliers_identifier.py new file mode 100644 index 0000000..b957e2a --- /dev/null +++ b/resources/src/ai/outliers_identifier.py @@ -0,0 +1,141 @@ +# Copyright (C) 2024 Eneo Tecnologia S.L. +# +# Authors: +# Miguel Álvarez Adsuara +# +# This program is free software: you can redistribute it and/or modify it under the terms of the +# GNU Affero General Public License as published by the Free Software Foundation, either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without +# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. +# If not, see . + +import json +import pandas as pd +from resources.src.logger import logger +from sklearn.ensemble import IsolationForest + +class OutlierIdentifier: + def __init__(self): + self.df = None + self.model = None + + def prepare_data(self, all_ips_data): + """ + Prepare the data by flattening the input data, extracting relevant features, + and computing rolling statistics. + + Args: + all_ips_data (dict): Dictionary containing time-series data for each IP. + """ + flattened_data = [] + for ip, ip_data in all_ips_data.items(): + for entry in ip_data: + flattened_data.append({ + "ip": ip, + "timestamp": entry.get("timestamp"), + "bytes": entry.get("result", {}).get("bytes", 0), + }) + + self.df = pd.DataFrame(flattened_data) + self.df['timestamp'] = pd.to_datetime(self.df['timestamp']) + self.df['hour'] = self.df['timestamp'].dt.hour + self.df['minute'] = self.df['timestamp'].dt.minute + self.df['day'] = self.df['timestamp'].dt.day + self.df['dayofweek'] = self.df['timestamp'].dt.dayofweek + self.df['dayofyear'] = self.df['timestamp'].dt.dayofyear + + self.df['rolling_mean'] = self.df['bytes'].rolling(window=5, min_periods=1).mean() + self.df['rolling_std'] = self.df['bytes'].rolling(window=5, min_periods=1).std() + + self.df['rolling_mean'] = self.df['rolling_mean'].fillna(0) + self.df['rolling_std'] = self.df['rolling_std'].fillna(0) + + self.df['low_traffic'] = self.df['bytes'] == 0 + + def train_model(self, X_train): + """ + Train the Isolation Forest model on the provided training data. + + Args: + X_train (DataFrame): The training set features. + """ + self.model = IsolationForest(contamination=0.05, random_state=42) + self.model.fit(X_train) + + def identify_implicated_ips(self, outliers): + """ + Identify IPs that contributed to the outlier events. + + Args: + outliers (list): A list of outlier events with timestamps and expected values. + + Returns: + dict: A dictionary with implicated IPs for each outlier event. + """ + self.df['outlier'] = self.model.predict(self.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']]) + self.df['outlier'] = self.df['outlier'].apply(lambda x: 'anomaly' if x == -1 else 'normal') + + implicated_ips = {"ips": []} + for outlier in outliers: + timestamp = outlier["timestamp"] + outlier_data = self.df[self.df['timestamp'] == timestamp] + + implicated_ips["ips"].append({ + "caused_by": list(outlier_data[outlier_data['outlier'] == 'anomaly']['ip']) + }) + + return implicated_ips + + def execute(self, outliers, all_ips_data): + """ + Execute the full pipeline for detecting outliers and identifying implicated IPs. + + Args: + outliers (list): A list of outlier events. + all_ips_data (dict): Dictionary containing time-series data for each IP. + + Returns: + json: A JSON string with the implicated IPs and outlier information. + """ + self.prepare_data(all_ips_data) + self.train_model(self.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']]) + + implicated_ips = self.identify_implicated_ips(outliers) + + logger.logger.error(implicated_ips) + + return json.dumps(implicated_ips) if implicated_ips else {"ips": []} + + def train_and_execute_model(self, outliers, all_ips_data): + """ + Wrapper function to handle errors during model training and execution. + + Args: + outliers (list): A list of outliers to process. + all_ips_data (dict): Dictionary of IP data. + + Returns: + json: A JSON response with the result or error message. + """ + try: + return self.execute(outliers, all_ips_data) + except Exception as e: + logger.logger.error("Could not execute anomaly detection") + return self.return_error(e) + + def return_error(self, error="error"): + """ + Return a JSON formatted error message. + + Args: + error (str): The error message to return. + + Returns: + dict: A dictionary containing the error status and message. + """ + return { "status": "error", "msg": error } \ No newline at end of file diff --git a/resources/src/server/rest.py b/resources/src/server/rest.py index 02c12f6..b6fa676 100644 --- a/resources/src/server/rest.py +++ b/resources/src/server/rest.py @@ -25,12 +25,11 @@ from flask import Flask, jsonify, request from resources.src.redborder.s3 import S3 -from resources.src.ai import outliers, shallow_outliers +from resources.src.ai import outliers, shallow_outliers, outliers_identifier from resources.src.druid import client, query_builder from resources.src.logger import logger from resources.src.config import configmanager - ''' Init local variables ''' @@ -63,11 +62,13 @@ def __init__(self): self.start_s3_sync_thread() self.app = Flask(__name__) self.app.add_url_rule('/api/v1/outliers', view_func=self.calculate, methods=['POST']) + self.app.add_url_rule('/api/v1/ip_identifier', view_func=self.identify_ip, methods=['POST']) self.exit_code = 0 self.shallow = shallow_outliers.ShallowOutliers( sensitivity = config.get("ShallowOutliers", "sensitivity"), contamination = config.get("ShallowOutliers", "contamination") ) + self.identifier = outliers_identifier.OutlierIdentifier() self.ai_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ai") self.deep_models={} @@ -116,6 +117,33 @@ def calculate(self): logger.logger.info("Starting outliers execution") return self.execute_model(data, config.get("Outliers","metric"), model) + def identify_ip(self): + """ + Process the incoming request to identify implicated IPs based on outlier data. + + Returns: + Response: A JSON response with implicated IPs or an error message. + """ + try: + payload = json.loads(request.form.get('payload', '{}')) + + outliers = payload.get('outliers', []) + all_ips_data = payload.get('all_ips_data', {}) + + if not isinstance(outliers, list) or not isinstance(all_ips_data, dict): + return jsonify({"error": "Invalid data format"}), 400 + + result = self.identifier.train_and_execute_model(outliers, all_ips_data) + + logger.logger.error(result) + + return jsonify(result), 200 + + except Exception as e: + logger.logger.error(f"Exception in identify_ip: {e}") + return jsonify({"error": "An internal error has occurred!"}), 500 + + def decode_b64_json(self, b64_json): """ Decode a base64 json into a python dictionary. diff --git a/resources/tests/test_ip_identifier.py b/resources/tests/test_ip_identifier.py new file mode 100644 index 0000000..6dc26ec --- /dev/null +++ b/resources/tests/test_ip_identifier.py @@ -0,0 +1,127 @@ +# Copyright (C) 2024 Eneo Tecnologia S.L. +# +# Authors: +# Miguel Álvarez Adsuara +# +# This program is free software: you can redistribute it and/or modify it under the terms of the +# GNU Affero General Public License as published by the Free Software Foundation, either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without +# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. +# If not, see . + +import json +import unittest +import pandas as pd +from resources.src.ai.outliers_identifier import OutlierIdentifier + +class TestOutlierIdentifier(unittest.TestCase): + + def setUp(self): + self.identifier = OutlierIdentifier() + + def test_prepare_data_valid_input(self): + data = { + "192.168.1.1": [ + {"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 500}}, + {"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 300}} + ] + } + self.identifier.prepare_data(data) + self.assertIsInstance(self.identifier.df, pd.DataFrame) + self.assertIn('timestamp', self.identifier.df.columns) + self.assertIn('bytes', self.identifier.df.columns) + self.assertEqual(len(self.identifier.df), 2) + + def test_prepare_data_missing_bytes(self): + data = { + "192.168.1.1": [ + {"timestamp": "2024-11-14T12:00:00", "result": {}}, + {"timestamp": "2024-11-14T12:05:00", "result": {}} + ] + } + self.identifier.prepare_data(data) + self.assertEqual(self.identifier.df['bytes'].sum(), 0) + + def test_prepare_data_empty_input(self): + data = {} + self.identifier.prepare_data(data) + self.assertTrue(self.identifier.df.empty) + + def test_train_model_valid_data(self): + data = { + "192.168.1.1": [ + {"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 500}}, + {"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 300}}, + {"timestamp": "2024-11-14T12:10:00", "result": {"bytes": 1000}}, + {"timestamp": "2024-11-14T12:15:00", "result": {"bytes": 700}}, + {"timestamp": "2024-11-14T12:20:00", "result": {"bytes": 0}} + ] + } + self.identifier.prepare_data(data) + try: + self.identifier.train_model(self.identifier.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']]) + except Exception as e: + self.fail(f"Training failed with exception: {e}") + + def test_identify_implicated_ips_no_outliers(self): + data = { + "192.168.1.1": [ + {"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 100}}, + {"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 100}} + ] + } + outliers = [] + self.identifier.prepare_data(data) + self.identifier.train_model(self.identifier.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']]) + result = self.identifier.identify_implicated_ips(outliers) + self.assertEqual(result, {"ips": []}) + + def test_identify_implicated_ips_with_outliers(self): + data = { + "192.168.1.1": [ + {"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 100}}, + {"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 1000}}, # Anomalous traffic + {"timestamp": "2024-11-14T12:10:00", "result": {"bytes": 100}}, + ] + } + outliers = [{"timestamp": "2024-11-14T12:05:00"}] + self.identifier.prepare_data(data) + self.identifier.train_model(self.identifier.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']]) + result = self.identifier.identify_implicated_ips(outliers) + self.assertIn("192.168.1.1", result["ips"][0]["caused_by"]) + + def test_execute_with_valid_input(self): + data = { + "192.168.1.1": [ + {"timestamp": "2024-11-14T12:00:00", "result": {"bytes": 500}}, + {"timestamp": "2024-11-14T12:05:00", "result": {"bytes": 300}}, + {"timestamp": "2024-11-14T12:10:00", "result": {"bytes": 1000}} + ] + } + outliers = [{"timestamp": "2024-11-14T12:10:00"}] + result = self.identifier.execute(outliers, data) + self.assertIsInstance(result, str) + parsed_result = json.loads(result) + self.assertIn("ips", parsed_result) + self.assertEqual(len(parsed_result["ips"]), 1) + + def test_train_and_execute_model_error_handling(self): + data = None # Invalid data + outliers = [{"timestamp": "2024-11-14T12:10:00"}] + result = self.identifier.train_and_execute_model(outliers, data) + self.assertIn("status", result) + self.assertEqual(result["status"], "error") + + def test_return_error(self): + error_message = "Test error" + result = self.identifier.return_error(error_message) + self.assertEqual(result["status"], "error") + self.assertEqual(result["msg"], error_message) + +if __name__ == "__main__": + unittest.main()