diff --git a/resources/src/__main__.py b/resources/src/__main__.py index 6abd88e..3d7cb5a 100644 --- a/resources/src/__main__.py +++ b/resources/src/__main__.py @@ -80,7 +80,10 @@ def run_production_server(self): options = { 'bind': f"{__binding_host__}:{__binding_port__}", 'workers': gunicorn_workers, - 'threads': gunicorn_threads + 'threads': gunicorn_threads, + 'worker_class': 'gthread', + 'max_requests': 100, + 'max_worker_lifetime': 3600 } self.server = APIServer() self.app = GunicornApp(self.server, options) diff --git a/resources/src/ai/outliers.py b/resources/src/ai/outliers.py index 0f4e1a1..9314359 100644 --- a/resources/src/ai/outliers.py +++ b/resources/src/ai/outliers.py @@ -126,7 +126,7 @@ def rescale(self, data): (numpy.ndarray): Rescaled data as a numpy array. """ num_metrics = len(self.metrics) - rescaled=data.copy() + rescaled=data rescaled[..., 0:num_metrics]=np.tanh(np.log1p(rescaled[..., 0:num_metrics])/32) rescaled[..., num_metrics]=rescaled[..., num_metrics]/1440 return rescaled @@ -142,12 +142,13 @@ def descale(self, data): (numpy.ndarray): Descaled data as a numpy array. """ num_metrics = len(self.metrics) - descaled = data.copy() + descaled = data descaled = np.where(descaled > 1.0, 1.0, np.where(descaled < -1.0, -1.0, descaled)) descaled[..., 0:num_metrics] = np.expm1(32*np.arctanh(descaled[..., 0:num_metrics])) descaled[..., num_metrics]=descaled[..., num_metrics]*1440 return descaled + @tf.function def model_loss(self, y_true, y_pred, single_value=True): """ Calculate the weighted loss for the model. @@ -267,7 +268,7 @@ def compute_json(self, metric, raw_json): threshold = self.avg_loss+5*self.std_loss data, timestamps = self.input_json(raw_json) if self.window_size*self.num_window > len(data): - error_msg = (f"Too few datapoints for current model. The model " + error_msg = ("Too few datapoints for current model. The model " f"needs at least {self.window_size*self.num_window } " f"datapoints but only {len(data)} were inputted.") logger.logger.error(error_msg) @@ -308,18 +309,17 @@ def input_json(self, raw_json): """ data = pd.json_normalize(raw_json) data["granularity"] = self.granularity_from_dataframe(data) - metrics_dict = {f"result.{metric}": metric for metric in self.metrics} - data.rename(columns=metrics_dict, inplace=True) - timestamps = data['timestamp'].copy() - data['timestamp'] = pd.to_datetime(data['timestamp']) - data['minute'] = data['timestamp'].dt.minute + 60 * data['timestamp'].dt.hour - data['weekday']= data['timestamp'].dt.weekday + data.rename(columns={f"result.{metric}": metric for metric in self.metrics}, inplace=True) + timestamps = data['timestamp'] + timestamp_dt = pd.to_datetime(timestamps) + data['timestamp'] = timestamp_dt + data['minute'] = timestamp_dt.dt.minute + 60 * timestamp_dt.dt.hour + data['weekday'] = timestamp_dt.dt.weekday data = pd.get_dummies(data, columns=['weekday'], prefix=['weekday'], drop_first=True) - missing_columns = set(self.columns) - set(data.columns) - data[list(missing_columns)] = 0 + for missing_column in set(self.columns) - set(data.columns): + data[missing_column] = 0 data = data[self.columns].dropna().astype('float') - data_array = data.values - return data_array, timestamps + return data.values, timestamps def output_json(self, metric, anomalies, predicted): """ @@ -334,8 +334,6 @@ def output_json(self, metric, anomalies, predicted): (dict): deserialized Json with the anomalies and predictions for the data with RedBorder prediction Json format. """ - predicted = predicted.copy() - anomalies = anomalies.copy() predicted = predicted[[metric,'timestamp']].rename(columns={metric:"forecast"}) anomalies = anomalies[[metric,'timestamp']].rename(columns={metric:"expected"}) return { @@ -345,16 +343,14 @@ def output_json(self, metric, anomalies, predicted): } @staticmethod - def execute_prediction_model(data, metric, model_file, model_config): + def execute_prediction_model(autoencoder, data, metric): try: - autoencoder = Autoencoder(model_file, model_config) - result = autoencoder.compute_json(metric, data) + return autoencoder.compute_json(metric, data) except Exception as e: - logger.logger.error("Could not execute model") - result = Autoencoder.return_error(e) + logger.logger.error("Could not execute deep learning model") + return autoencoder.return_error(e) finally: tf.keras.backend.clear_session() - return result @staticmethod def return_error(error="error"): diff --git a/resources/src/ai/shallow_outliers.py b/resources/src/ai/shallow_outliers.py index db76514..c57e6ac 100644 --- a/resources/src/ai/shallow_outliers.py +++ b/resources/src/ai/shallow_outliers.py @@ -61,15 +61,14 @@ def predict(self, arr): window_size = max(int(0.05 * len(arr)), min(len(arr), int(5 + np.log(len(arr))))) window_size += 1 if window_size % 2 == 0 else 0 - kernel = np.arange(1, window_size // 2 + 2, dtype=float) - kernel = np.concatenate((kernel, kernel[-2::-1])) - kernel[window_size // 2] = 0.25*window_size ** 2 - kernel /= np.sum(kernel) - padded_arr = np.pad(arr, (window_size // 2, window_size // 2), mode='edge') + half_size = window_size // 2 + kernel = np.linspace(1, half_size, half_size, dtype=float) + kernel = np.concatenate((kernel, [half_size**2 * 0.25], kernel[::-1])) + kernel /= kernel.sum() + padded_arr = np.pad(arr, half_size, mode='edge') smooth_arr = np.convolve(padded_arr, kernel, mode='valid') return smooth_arr - def get_outliers(self, arr, smoothed_arr): """ Given an array of data points and an aproximation of it, return a boolean array @@ -127,12 +126,10 @@ def compute_json(self, raw_json): @staticmethod def execute_prediction_model(data): try: - result = ShallowOutliers.compute_json(data) + return ShallowOutliers().compute_json(data) except Exception as e: - logger.logger.error("Could not execute model") - result = ShallowOutliers.return_error(e) - finally: - return result + logger.logger.error("Could not execute shallow model") + return ShallowOutliers.return_error(e) @staticmethod def return_error(error="error"): diff --git a/resources/src/config.ini b/resources/src/config.ini index 440b595..3fd4f10 100644 --- a/resources/src/config.ini +++ b/resources/src/config.ini @@ -1,12 +1,12 @@ [OutliersServerProduction] outliers_binding_address=0.0.0.0 -outliers_server_port=39091 +outliers_server_port=39093 outliers_server_workers=4 outliers_server_threads=20 [OutliersServerTesting] outliers_binding_address=0.0.0.0 -outliers_server_port=39091 +outliers_server_port=39093 [Outliers] metric=bytes @@ -18,7 +18,7 @@ backup_path=./backups/ model_names=traffic [Druid] -druid_endpoint=http://x.x.x.x:8080/druid/v2/ +druid_endpoint=http://10.1.209.110:8080/druid/v2/ [Logger] log_file=./outliers.log diff --git a/resources/src/server/rest.py b/resources/src/server/rest.py index e49ac83..eb950a2 100644 --- a/resources/src/server/rest.py +++ b/resources/src/server/rest.py @@ -64,60 +64,61 @@ def __init__(self): self.s3_sync_thread = None self.start_s3_sync_thread() self.app = Flask(__name__) + self.app.add_url_rule('/api/v1/outliers', view_func=self.calculate, methods=['POST']) self.exit_code = 0 self.ai_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ai") + self.deep_models={} - @self.app.route('/api/v1/outliers', methods=['POST']) - def calculate(): - """ - Handle POST requests to '/api/v1/outliers'. - The endpoint expects parameters either in the request body or as form parameters with - the following format: - { - "query": "", - "model": "" # Optional field - } - - Where: - - `query`: A base64 encoded JSON Druid query specifying the data for analysis. - - `model` (Optional): A base64 encoded string representing the path of the predictive - model to be used. If not provided or if the specified model is not found, a default - model is used. - - Returns: - A JSON response containing the prediction results or an error message. - """ - - druid_query = request.form.get('query') + def calculate(self): + """ + Handle POST requests to '/api/v1/outliers'. + The endpoint expects parameters either in the request body or as form parameters with + the following format: + { + "query": "", + "model": "" # Optional field + } + + Where: + - `query`: A base64 encoded JSON Druid query specifying the data for analysis. + - `model` (Optional): A base64 encoded string representing the path of the predictive + model to be used. If not provided or if the specified model is not found, a default + model is used. + + Returns: + A JSON response containing the prediction results or an error message. + """ + + druid_query = request.form.get('query') + try: + druid_query = base64.b64decode(druid_query).decode('utf-8') + druid_query = json.loads(druid_query) + except Exception as e: + error_message = "Error decoding query" + logger.logger.error(error_message + ": " + str(e)) + return self.return_error(error=error_message) + logger.logger.info("Druid query successfully decoded and loaded.") + + model = request.form.get('model') + if model is None: + logger.logger.info("No model requested") + model = 'default' + else: try: - druid_query = base64.b64decode(druid_query).decode('utf-8') - druid_query = json.loads(druid_query) + decoded_model = base64.b64decode(model).decode('utf-8') + model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras")) + if not model_path.startswith(os.path.normpath(self.ai_path)): + logger.logger.error(f"Attempted unauthorized file access: {decoded_model}") + model = 'default' + elif not os.path.isfile(model_path): + logger.logger.error(f"Model {decoded_model} does not exist") + model = 'default' + else: + model = decoded_model except Exception as e: - error_message = "Error decoding query" - logger.logger.error(error_message + ": " + str(e)) - return self.return_error(error=error_message) - logger.logger.info("Druid query successfully decoded and loaded.") - - model = request.form.get('model') - if model is None: - logger.logger.info("No model requested") + logger.logger.error(f"Error decoding or checking model: {e}") model = 'default' - else: - try: - decoded_model = base64.b64decode(model).decode('utf-8') - model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras")) - if not model_path.startswith(os.path.normpath(self.ai_path)): - logger.logger.error(f"Attempted unauthorized file access: {decoded_model}") - model = 'default' - elif not os.path.isfile(model_path): - logger.logger.error(f"Model {decoded_model} does not exist") - model = 'default' - else: - model = decoded_model - except Exception as e: - logger.logger.error(f"Error decoding or checking model: {e}") - model = 'default' - return self.execute_model(druid_query, config.get("Outliers","metric"), model) + return self.execute_model(druid_query, config.get("Outliers","metric"), model) def execute_model(self, druid_query, metric, model='default'): """ @@ -143,15 +144,21 @@ def execute_model(self, druid_query, metric, model='default'): error_message = "Could not execute druid query" logger.logger.error(error_message + ": " + str(e)) return self.return_error(error=error_message) + logger.logger.info("Druid query executed succesfully") try: - if model != 'default': - return jsonify(outliers.Autoencoder.execute_prediction_model( - data, - metric, + if model == 'default': + return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data)) + if model not in self.deep_models: + logger.logger.info(f"Creating instance of model {model}") + self.deep_models[model]=outliers.Autoencoder( os.path.join(self.ai_path, f"{model}.keras"), os.path.join(self.ai_path, f"{model}.ini") - )) - return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data)) + ) + return jsonify(self.deep_models[model].execute_prediction_model( + self.deep_models[model], + data, + metric, + )) except Exception as e: error_message = "Error while calculating prediction model" logger.logger.error(error_message + ": " + str(e)) diff --git a/resources/tests/test_outliers.py b/resources/tests/test_outliers.py index 617a7ba..0d6deef 100644 --- a/resources/tests/test_outliers.py +++ b/resources/tests/test_outliers.py @@ -34,13 +34,16 @@ def setUp(self): data_file_path = os.path.join(current_dir, "outliers_test_data.json") with open(data_file_path, "r") as data_file: self.sample_data = json.load(data_file) + self.autoencoder=Autoencoder( + os.path.join(self.main_dir, "ai", "traffic.keras"), + os.path.join(self.main_dir, "ai", "traffic.ini") + ) def test_model_execution_with_no_data(self): result = Autoencoder.execute_prediction_model( + self.autoencoder, {}, - "bytes", - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") + "bytes" ) self.assertEqual( result['status'], @@ -48,10 +51,9 @@ def test_model_execution_with_no_data(self): ) def test_model_execution_with_no_metric(self): result = Autoencoder.execute_prediction_model( + self.autoencoder, self.sample_data, - "", - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") + "" ) self.assertEqual( result['status'], @@ -59,10 +61,9 @@ def test_model_execution_with_no_metric(self): ) def test_model_execution_with_too_little_data(self): result = Autoencoder.execute_prediction_model( + self.autoencoder, self.sample_data[:10], - "bytes", - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") + "bytes" ) self.assertEqual( result['status'], @@ -70,10 +71,9 @@ def test_model_execution_with_too_little_data(self): ) def test_model_execution_with_sample_data(self): Autoencoder.execute_prediction_model( + self.autoencoder, self.sample_data, "bytes", - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") ) def test_invalid_model(self): with self.assertRaises(FileNotFoundError): @@ -107,15 +107,18 @@ def test_load_empty_conifg(self): ) def test_flatten_slice_identity(self): - TestAutoencoder = Autoencoder( - os.path.join(self.main_dir, "ai", "traffic.keras"), - os.path.join(self.main_dir, "ai", "traffic.ini") - ) np.random.seed(0) rand_data = np.random.rand(32, 3) - sliced_data = TestAutoencoder.slice(rand_data) - flattened_data = TestAutoencoder.flatten(sliced_data) + sliced_data = self.autoencoder.slice(rand_data) + flattened_data = self.autoencoder.flatten(sliced_data) self.assertTrue(np.allclose(flattened_data, rand_data)) + def test_scale_descale_identity(self): + np.random.seed(0) + rand_data = np.random.rand(32, len(TestAutoencoder.columns)) + rescaled_data = self.autoencoder.rescale(rand_data.copy()) + descaled_data = self.autoencoder.descale(rescaled_data) + self.assertTrue(np.allclose(descaled_data, rand_data)) + if __name__ == '__main__': unittest.main() diff --git a/resources/tests/test_rest.py b/resources/tests/test_rest.py index a15e16f..a73dd1f 100644 --- a/resources/tests/test_rest.py +++ b/resources/tests/test_rest.py @@ -119,7 +119,7 @@ def test_calculate_endpoint_valid_model(self, mock_isfile, mock_execute_model, m mock_execute_model.return_value = self.output_data mock_query.return_value = {} mock_isfile.return_value = True - data = {'model':'YXNkZg==', 'query':'eyJhc2RmIjoiYXNkZiJ9'} + data = {'model':'dHJhZmZpYw==', 'query':'eyJhc2RmIjoiYXNkZiJ9'} with self.api_server.app.test_client().post('/api/v1/outliers', data=data) as response: self.assertEqual(response.status_code, 200) self.assertEqual(response.get_json(), self.output_data)