diff --git a/resources/src/__main__.py b/resources/src/__main__.py index 6abd88e..3d7cb5a 100644 --- a/resources/src/__main__.py +++ b/resources/src/__main__.py @@ -80,7 +80,10 @@ def run_production_server(self): options = { 'bind': f"{__binding_host__}:{__binding_port__}", 'workers': gunicorn_workers, - 'threads': gunicorn_threads + 'threads': gunicorn_threads, + 'worker_class': 'gthread', + 'max_requests': 100, + 'max_worker_lifetime': 3600 } self.server = APIServer() self.app = GunicornApp(self.server, options) diff --git a/resources/src/ai/outliers.py b/resources/src/ai/outliers.py index 507a8eb..7983730 100644 --- a/resources/src/ai/outliers.py +++ b/resources/src/ai/outliers.py @@ -74,11 +74,11 @@ def __init__(self, model_file, model_config_file): self.columns = self.metrics + self.timestamp general_section = model_config['General'] self.avg_loss = float(general_section.get('AVG_LOSS', 0.0)) - self.std_loss = float(general_section.get('STD_LOSS', 0.0)) - self.window_size = int(general_section.get('WINDOW_SIZE', 0)) - self.num_window = int(general_section.get('NUM_WINDOWS', 0)) - self.loss_mult_metric = float(general_section.get('LOSS_MULT_METRIC', 0)) - self.loss_mult_minute = float(general_section.get('LOSS_MULT_MINUTE', 0)) + self.std_loss = float(general_section.get('STD_LOSS', 1.0)) + self.window_size = int(general_section.get('WINDOW_SIZE', 1)) + self.num_window = int(general_section.get('NUM_WINDOWS', 1)) + self.loss_mult_metric = float(general_section.get('LOSS_MULT_METRIC', 1.0)) + self.loss_mult_minute = float(general_section.get('LOSS_MULT_MINUTE', 1.0)) except Exception as e: logger.logger.error(f"Could not load model conif: {e}") raise e @@ -126,7 +126,7 @@ def rescale(self, data): (numpy.ndarray): Rescaled data as a numpy array. """ num_metrics = len(self.metrics) - rescaled=data.copy() + rescaled=data rescaled[..., 0:num_metrics]=np.tanh(np.log1p(rescaled[..., 0:num_metrics])/32) rescaled[..., num_metrics]=rescaled[..., num_metrics]/1440 return rescaled @@ -142,7 +142,7 @@ def descale(self, data): (numpy.ndarray): Descaled data as a numpy array. """ num_metrics = len(self.metrics) - descaled = data.copy() + descaled = data descaled = np.where(descaled > 1.0, 1.0, np.where(descaled < -1.0, -1.0, descaled)) descaled[..., 0:num_metrics] = np.expm1(32*np.arctanh(descaled[..., 0:num_metrics])) descaled[..., num_metrics]=descaled[..., num_metrics]*1440 @@ -166,44 +166,42 @@ def model_loss(self, y_true, y_pred, single_value=True): Returns: (tf.Tensor): Weighted loss value or a 3D loss array. """ - y_true = tf.cast(y_true, tf.float16) - y_pred = tf.cast(y_pred, tf.float16) + y_true = tf.cast(y_true, tf.bfloat16) + y_pred = tf.cast(y_pred, tf.bfloat16) num_metrics = len(self.metrics) num_features = len(self.columns) is_metric = (tf.range(num_features) < num_metrics) is_minute = (tf.range(num_features) == num_metrics) - mult_true = tf.where( - is_metric, self.loss_mult_metric * y_true, - tf.where(is_minute, self.loss_mult_minute * y_true, y_true) - ) - mult_pred = tf.where( - is_metric, self.loss_mult_metric * y_pred, - tf.where(is_minute, self.loss_mult_minute * y_pred, y_pred) - ) - standard_loss = tf.math.log(tf.cosh((mult_true - mult_pred))) + mult_true = tf.where(is_metric, self.loss_mult_metric * y_true, y_true) + mult_true = tf.where(is_minute, self.loss_mult_minute * mult_true, mult_true) + mult_pred = tf.where(is_metric, self.loss_mult_metric * y_pred, y_pred) + mult_pred = tf.where(is_minute, self.loss_mult_minute * mult_pred, mult_pred) + loss = tf.math.abs(mult_true-mult_pred) + loss = loss-tf.math.log(tf.cast(2.0, tf.bfloat16))+tf.math.log1p(tf.math.exp(-2.0*loss)) if single_value: - standard_loss = tf.reduce_mean(standard_loss) - return standard_loss + loss = tf.reduce_mean(loss) + return loss - def slice(self, data, index = []): + + def slice(self, data, index=None): """ - Transform a 2D numpy array into a 3D array readable by the model. + Transform a 2D numpy array into a 3D array readable by the model, with overlapping slices. Args: data (numpy.ndarray): 2D numpy array with the data to prepare. - index (list): Index in case you want only some of the slices returned. + index (list or numpy.ndarray): Index in case you want only some of the slices returned. Returns: (numpy.ndarray): 3D numpy array that can be processed by the model. """ _l = len(data) - sliced_data = [] slice_length = self.window_size * self.num_window - if len(index) == 0: - index = np.arange(0, _l-slice_length+1 , self.window_size) - for i in index: - sliced_data.append(data[i:i+slice_length]) - return np.array(sliced_data) + if index is None: + index = np.arange(0, _l - slice_length + 1, self.window_size) + sliced_data = np.zeros((len(index), slice_length) + data.shape[1:]) + for idx, start in enumerate(index): + sliced_data[idx] = data[start:start + slice_length] + return sliced_data def flatten(self, data): """ @@ -213,18 +211,15 @@ def flatten(self, data): Returns: (numpy.ndarray): 2D numpy array with the natural format of the data. """ - tsr = data.copy() - num_slices, slice_len, features = tsr.shape + num_slices, slice_len, features = data.shape flattened_len = (num_slices-1)*self.window_size + slice_len flattened_tensor = np.zeros([flattened_len, features]) scaling = np.zeros(flattened_len) - for i in range(num_slices): - left_pad = i*self.window_size - right_pad = left_pad+slice_len - flattened_tensor[left_pad:right_pad] += tsr[i] - scaling[left_pad:right_pad] +=1 - flattened_tensor = flattened_tensor / scaling[:, np.newaxis] - return flattened_tensor + indices = np.arange(num_slices)[:, None]*self.window_size + np.arange(slice_len) + np.add.at(flattened_tensor, indices.ravel(), data.reshape(-1, features)) + np.add.at(scaling, indices.ravel(), 1) + scaling[scaling == 0] = 1 + return flattened_tensor / scaling[:, np.newaxis] def calculate_predictions(self, data): """ @@ -268,6 +263,12 @@ def compute_json(self, metric, raw_json): raise ValueError(error_msg) threshold = self.avg_loss+5*self.std_loss data, timestamps = self.input_json(raw_json) + if self.window_size*self.num_window > len(data): + error_msg = ("Too few datapoints for current model. The model " + f"needs at least {self.window_size*self.num_window } " + f"datapoints but only {len(data)} were inputted.") + logger.logger.error(error_msg) + raise ValueError(error_msg) predicted, loss = self.calculate_predictions(data) predicted = pd.DataFrame(predicted, columns=self.columns) predicted['timestamp'] = timestamps @@ -304,18 +305,17 @@ def input_json(self, raw_json): """ data = pd.json_normalize(raw_json) data["granularity"] = self.granularity_from_dataframe(data) - metrics_dict = {f"result.{metric}": metric for metric in self.metrics} - data.rename(columns=metrics_dict, inplace=True) - timestamps = data['timestamp'].copy() - data['timestamp'] = pd.to_datetime(data['timestamp']) - data['minute'] = data['timestamp'].dt.minute + 60 * data['timestamp'].dt.hour - data['weekday']= data['timestamp'].dt.weekday + data.rename(columns={f"result.{metric}": metric for metric in self.metrics}, inplace=True) + timestamps = data['timestamp'] + timestamp_dt = pd.to_datetime(timestamps) + data['timestamp'] = timestamp_dt + data['minute'] = timestamp_dt.dt.minute + 60 * timestamp_dt.dt.hour + data['weekday'] = timestamp_dt.dt.weekday data = pd.get_dummies(data, columns=['weekday'], prefix=['weekday'], drop_first=True) - missing_columns = set(self.columns) - set(data.columns) - data[list(missing_columns)] = 0 + for missing_column in set(self.columns) - set(data.columns): + data[missing_column] = 0 data = data[self.columns].dropna().astype('float') - data_array = data.values - return data_array, timestamps + return data.values, timestamps def output_json(self, metric, anomalies, predicted): """ @@ -330,8 +330,6 @@ def output_json(self, metric, anomalies, predicted): (dict): deserialized Json with the anomalies and predictions for the data with RedBorder prediction Json format. """ - predicted = predicted.copy() - anomalies = anomalies.copy() predicted = predicted[[metric,'timestamp']].rename(columns={metric:"forecast"}) anomalies = anomalies[[metric,'timestamp']].rename(columns={metric:"expected"}) return { @@ -341,13 +339,15 @@ def output_json(self, metric, anomalies, predicted): } @staticmethod - def execute_prediction_model(data, metric, model_file, model_config): + def execute_prediction_model(autoencoder, data, metric): try: - autoencoder = Autoencoder(model_file, model_config) return autoencoder.compute_json(metric, data) except Exception as e: - logger.logger.error("Couldn't execute model") - return Autoencoder.return_error(e) + logger.logger.error("Could not execute deep learning model") + return autoencoder.return_error(e) + finally: + tf.keras.backend.clear_session() + @staticmethod def return_error(error="error"): """ diff --git a/resources/src/ai/shallow_outliers.py b/resources/src/ai/shallow_outliers.py index f8c8348..c57e6ac 100644 --- a/resources/src/ai/shallow_outliers.py +++ b/resources/src/ai/shallow_outliers.py @@ -61,15 +61,14 @@ def predict(self, arr): window_size = max(int(0.05 * len(arr)), min(len(arr), int(5 + np.log(len(arr))))) window_size += 1 if window_size % 2 == 0 else 0 - kernel = np.arange(1, window_size // 2 + 2, dtype=float) - kernel = np.concatenate((kernel, kernel[-2::-1])) - kernel[window_size // 2] = 0.25*window_size ** 2 - kernel /= np.sum(kernel) - padded_arr = np.pad(arr, (window_size // 2, window_size // 2), mode='edge') + half_size = window_size // 2 + kernel = np.linspace(1, half_size, half_size, dtype=float) + kernel = np.concatenate((kernel, [half_size**2 * 0.25], kernel[::-1])) + kernel /= kernel.sum() + padded_arr = np.pad(arr, half_size, mode='edge') smooth_arr = np.convolve(padded_arr, kernel, mode='valid') return smooth_arr - def get_outliers(self, arr, smoothed_arr): """ Given an array of data points and an aproximation of it, return a boolean array @@ -127,12 +126,10 @@ def compute_json(self, raw_json): @staticmethod def execute_prediction_model(data): try: - result = ShallowOutliers.compute_json(data) + return ShallowOutliers().compute_json(data) except Exception as e: - result = ShallowOutliers.return_error(e) - raise - finally: - return result + logger.logger.error("Could not execute shallow model") + return ShallowOutliers.return_error(e) @staticmethod def return_error(error="error"): diff --git a/resources/src/server/rest.py b/resources/src/server/rest.py index 43d8fda..eb950a2 100644 --- a/resources/src/server/rest.py +++ b/resources/src/server/rest.py @@ -64,60 +64,61 @@ def __init__(self): self.s3_sync_thread = None self.start_s3_sync_thread() self.app = Flask(__name__) + self.app.add_url_rule('/api/v1/outliers', view_func=self.calculate, methods=['POST']) self.exit_code = 0 self.ai_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ai") + self.deep_models={} - @self.app.route('/api/v1/outliers', methods=['POST']) - def calculate(): - """ - Handle POST requests to '/api/v1/outliers'. - The endpoint expects parameters either in the request body or as form parameters with - the following format: - { - "query": "", - "model": "" # Optional field - } - - Where: - - `query`: A base64 encoded JSON Druid query specifying the data for analysis. - - `model` (Optional): A base64 encoded string representing the path of the predictive - model to be used. If not provided or if the specified model is not found, a default - model is used. - - Returns: - A JSON response containing the prediction results or an error message. - """ - - druid_query = request.form.get('query') + def calculate(self): + """ + Handle POST requests to '/api/v1/outliers'. + The endpoint expects parameters either in the request body or as form parameters with + the following format: + { + "query": "", + "model": "" # Optional field + } + + Where: + - `query`: A base64 encoded JSON Druid query specifying the data for analysis. + - `model` (Optional): A base64 encoded string representing the path of the predictive + model to be used. If not provided or if the specified model is not found, a default + model is used. + + Returns: + A JSON response containing the prediction results or an error message. + """ + + druid_query = request.form.get('query') + try: + druid_query = base64.b64decode(druid_query).decode('utf-8') + druid_query = json.loads(druid_query) + except Exception as e: + error_message = "Error decoding query" + logger.logger.error(error_message + ": " + str(e)) + return self.return_error(error=error_message) + logger.logger.info("Druid query successfully decoded and loaded.") + + model = request.form.get('model') + if model is None: + logger.logger.info("No model requested") + model = 'default' + else: try: - druid_query = base64.b64decode(druid_query).decode('utf-8') - druid_query = json.loads(druid_query) + decoded_model = base64.b64decode(model).decode('utf-8') + model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras")) + if not model_path.startswith(os.path.normpath(self.ai_path)): + logger.logger.error(f"Attempted unauthorized file access: {decoded_model}") + model = 'default' + elif not os.path.isfile(model_path): + logger.logger.error(f"Model {decoded_model} does not exist") + model = 'default' + else: + model = decoded_model except Exception as e: - error_message = "Error decoding query" - logger.logger.error(error_message + " -> " + str(e)) - return self.return_error(error=error_message) - logger.logger.info("Druid query successfully decoded and loaded.") - - model = request.form.get('model') - if model is None: - logger.logger.info("No model requested") + logger.logger.error(f"Error decoding or checking model: {e}") model = 'default' - else: - try: - decoded_model = base64.b64decode(model).decode('utf-8') - model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras")) - if not model_path.startswith(os.path.normpath(self.ai_path)): - logger.logger.error(f"Attempted unauthorized file access: {decoded_model}") - model = 'default' - elif not os.path.isfile(model_path): - logger.logger.error(f"Model {decoded_model} does not exist") - model = 'default' - else: - model = decoded_model - except Exception as e: - logger.logger.error(f"Error decoding or checking model -> {e}") - model = 'default' - return self.execute_model(druid_query, config.get("Outliers","metric"), model) + return self.execute_model(druid_query, config.get("Outliers","metric"), model) def execute_model(self, druid_query, metric, model='default'): """ @@ -137,19 +138,30 @@ def execute_model(self, druid_query, metric, model='default'): druid_query = query_modifier.modify_aggregations(druid_query) else: logger.logger.info("Calculating predictions with default model") - data = druid_client.execute_query(druid_query) try: - if model != 'default': - return jsonify(outliers.Autoencoder.execute_prediction_model( - data, - metric, + data = druid_client.execute_query(druid_query) + except Exception as e: + error_message = "Could not execute druid query" + logger.logger.error(error_message + ": " + str(e)) + return self.return_error(error=error_message) + logger.logger.info("Druid query executed succesfully") + try: + if model == 'default': + return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data)) + if model not in self.deep_models: + logger.logger.info(f"Creating instance of model {model}") + self.deep_models[model]=outliers.Autoencoder( os.path.join(self.ai_path, f"{model}.keras"), os.path.join(self.ai_path, f"{model}.ini") - )) - return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data)) + ) + return jsonify(self.deep_models[model].execute_prediction_model( + self.deep_models[model], + data, + metric, + )) except Exception as e: error_message = "Error while calculating prediction model" - logger.logger.error(error_message + " -> " + str(e)) + logger.logger.error(error_message + ": " + str(e)) return self.return_error(error=error_message) def return_error(self, error="error"): diff --git a/resources/tests/test_outliers.py b/resources/tests/test_outliers.py index 7b59861..3c1c40b 100644 --- a/resources/tests/test_outliers.py +++ b/resources/tests/test_outliers.py @@ -20,9 +20,18 @@ import unittest import os +''' +Start of important OS Variables +''' +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1' +''' +End of important OS Variables +''' import sys import json import tempfile +import numpy as np +import tensorflow as tf from resources.src.ai.outliers import Autoencoder @@ -33,13 +42,16 @@ def setUp(self): data_file_path = os.path.join(current_dir, "outliers_test_data.json") with open(data_file_path, "r") as data_file: self.sample_data = json.load(data_file) + self.autoencoder=Autoencoder( + os.path.join(self.main_dir, "ai", "traffic.keras"), + os.path.join(self.main_dir, "ai", "traffic.ini") + ) def test_model_execution_with_no_data(self): result = Autoencoder.execute_prediction_model( + self.autoencoder, {}, - "bytes", - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") + "bytes" ) self.assertEqual( result['status'], @@ -47,10 +59,19 @@ def test_model_execution_with_no_data(self): ) def test_model_execution_with_no_metric(self): result = Autoencoder.execute_prediction_model( + self.autoencoder, self.sample_data, - "", - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") + "" + ) + self.assertEqual( + result['status'], + 'error' + ) + def test_model_execution_with_too_little_data(self): + result = Autoencoder.execute_prediction_model( + self.autoencoder, + self.sample_data[:10], + "bytes" ) self.assertEqual( result['status'], @@ -58,10 +79,9 @@ def test_model_execution_with_no_metric(self): ) def test_model_execution_with_sample_data(self): Autoencoder.execute_prediction_model( + self.autoencoder, self.sample_data, "bytes", - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") ) def test_invalid_model(self): with self.assertRaises(FileNotFoundError): @@ -85,7 +105,6 @@ def test_load_empty_model(self): os.path.join(temp_file_path), os.path.join(self.main_dir, "ai", "traffic.ini") ) - def test_load_empty_conifg(self): with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file_path = temp_file.name @@ -95,5 +114,43 @@ def test_load_empty_conifg(self): os.path.join(temp_file_path) ) + def test_flatten_slice_identity(self): + np.random.seed(0) + rand_data = np.random.rand(32, 3) + sliced_data = self.autoencoder.slice(rand_data) + flattened_data = self.autoencoder.flatten(sliced_data) + self.assertTrue(np.allclose(flattened_data, rand_data)) + + def test_scale_descale_identity(self): + np.random.seed(0) + rand_data = np.random.rand(32, len(self.autoencoder.columns)) + rescaled_data = self.autoencoder.rescale(rand_data.copy()) + descaled_data = self.autoencoder.descale(rescaled_data) + self.assertTrue(np.allclose(descaled_data, rand_data)) + + def test_loss_execution_single_value(self): + np.random.seed(0) + y_true = tf.random.uniform((32, len(self.autoencoder.columns)), dtype=tf.float16) + y_pred = tf.random.uniform((32, len(self.autoencoder.columns)), dtype=tf.float16) + try: + loss = self.autoencoder.model_loss(y_true, y_pred, single_value=True) + execution_success = True + except Exception as e: + execution_success = False + print(e) + self.assertTrue(execution_success, "model_loss execution failed with an exception.") + + def test_loss_execution_3d_array(self): + np.random.seed(0) + y_true = tf.random.uniform((32, len(self.autoencoder.columns)), dtype=tf.float16) + y_pred = tf.random.uniform((32, len(self.autoencoder.columns)), dtype=tf.float16) + try: + loss = self.autoencoder.model_loss(y_true, y_pred, single_value=False) + execution_success = True + except Exception as e: + execution_success = False + print(e) + self.assertTrue(execution_success, "model_loss execution failed with an exception.") + if __name__ == '__main__': unittest.main() diff --git a/resources/tests/test_rest.py b/resources/tests/test_rest.py index 04900fb..a73dd1f 100644 --- a/resources/tests/test_rest.py +++ b/resources/tests/test_rest.py @@ -55,6 +55,15 @@ def test_calculate_endpoint_invalid_query(self): {'msg': 'Error decoding query', 'status': 'error'} ) + def test_calculate_endpoint_druid_query_execution_malfunction(self): + data = {'model':'YXNkZg==', 'query':'eyJ0ZXN0IjoidGVzdCJ9'} + with self.api_server.app.test_client().post('/api/v1/outliers', data=data) as response: + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.get_json(), + {'msg': 'Could not execute druid query', 'status': 'error'} + ) + @patch('resources.src.druid.client.DruidClient.execute_query') @patch('resources.src.ai.shallow_outliers.ShallowOutliers.execute_prediction_model') @patch('os.path.isfile') @@ -110,7 +119,7 @@ def test_calculate_endpoint_valid_model(self, mock_isfile, mock_execute_model, m mock_execute_model.return_value = self.output_data mock_query.return_value = {} mock_isfile.return_value = True - data = {'model':'YXNkZg==', 'query':'eyJhc2RmIjoiYXNkZiJ9'} + data = {'model':'dHJhZmZpYw==', 'query':'eyJhc2RmIjoiYXNkZiJ9'} with self.api_server.app.test_client().post('/api/v1/outliers', data=data) as response: self.assertEqual(response.status_code, 200) self.assertEqual(response.get_json(), self.output_data)