Skip to content

Commit

Permalink
Changed api rest so it only instanciates one deep learning model of e…
Browse files Browse the repository at this point in the history
…ach type and implemented other optimizations.
  • Loading branch information
Pablo Rodríguez Flores committed Feb 1, 2024
1 parent e1c798d commit b8318c3
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 108 deletions.
5 changes: 4 additions & 1 deletion resources/src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ def run_production_server(self):
options = {
'bind': f"{__binding_host__}:{__binding_port__}",
'workers': gunicorn_workers,
'threads': gunicorn_threads
'threads': gunicorn_threads,
'worker_class': 'gthread',
'max_requests': 100,
'max_worker_lifetime': 3600
}
self.server = APIServer()
self.app = GunicornApp(self.server, options)
Expand Down
38 changes: 17 additions & 21 deletions resources/src/ai/outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def rescale(self, data):
(numpy.ndarray): Rescaled data as a numpy array.
"""
num_metrics = len(self.metrics)
rescaled=data.copy()
rescaled=data
rescaled[..., 0:num_metrics]=np.tanh(np.log1p(rescaled[..., 0:num_metrics])/32)
rescaled[..., num_metrics]=rescaled[..., num_metrics]/1440
return rescaled
Expand All @@ -142,12 +142,13 @@ def descale(self, data):
(numpy.ndarray): Descaled data as a numpy array.
"""
num_metrics = len(self.metrics)
descaled = data.copy()
descaled = data
descaled = np.where(descaled > 1.0, 1.0, np.where(descaled < -1.0, -1.0, descaled))
descaled[..., 0:num_metrics] = np.expm1(32*np.arctanh(descaled[..., 0:num_metrics]))
descaled[..., num_metrics]=descaled[..., num_metrics]*1440
return descaled

@tf.function
def model_loss(self, y_true, y_pred, single_value=True):
"""
Calculate the weighted loss for the model.
Expand Down Expand Up @@ -267,7 +268,7 @@ def compute_json(self, metric, raw_json):
threshold = self.avg_loss+5*self.std_loss
data, timestamps = self.input_json(raw_json)
if self.window_size*self.num_window > len(data):
error_msg = (f"Too few datapoints for current model. The model "
error_msg = ("Too few datapoints for current model. The model "
f"needs at least {self.window_size*self.num_window } "
f"datapoints but only {len(data)} were inputted.")
logger.logger.error(error_msg)
Expand Down Expand Up @@ -308,18 +309,17 @@ def input_json(self, raw_json):
"""
data = pd.json_normalize(raw_json)
data["granularity"] = self.granularity_from_dataframe(data)
metrics_dict = {f"result.{metric}": metric for metric in self.metrics}
data.rename(columns=metrics_dict, inplace=True)
timestamps = data['timestamp'].copy()
data['timestamp'] = pd.to_datetime(data['timestamp'])
data['minute'] = data['timestamp'].dt.minute + 60 * data['timestamp'].dt.hour
data['weekday']= data['timestamp'].dt.weekday
data.rename(columns={f"result.{metric}": metric for metric in self.metrics}, inplace=True)
timestamps = data['timestamp']
timestamp_dt = pd.to_datetime(timestamps)
data['timestamp'] = timestamp_dt
data['minute'] = timestamp_dt.dt.minute + 60 * timestamp_dt.dt.hour
data['weekday'] = timestamp_dt.dt.weekday
data = pd.get_dummies(data, columns=['weekday'], prefix=['weekday'], drop_first=True)
missing_columns = set(self.columns) - set(data.columns)
data[list(missing_columns)] = 0
for missing_column in set(self.columns) - set(data.columns):
data[missing_column] = 0
data = data[self.columns].dropna().astype('float')
data_array = data.values
return data_array, timestamps
return data.values, timestamps

def output_json(self, metric, anomalies, predicted):
"""
Expand All @@ -334,8 +334,6 @@ def output_json(self, metric, anomalies, predicted):
(dict): deserialized Json with the anomalies and predictions for the data with RedBorder prediction
Json format.
"""
predicted = predicted.copy()
anomalies = anomalies.copy()
predicted = predicted[[metric,'timestamp']].rename(columns={metric:"forecast"})
anomalies = anomalies[[metric,'timestamp']].rename(columns={metric:"expected"})
return {
Expand All @@ -345,16 +343,14 @@ def output_json(self, metric, anomalies, predicted):
}

@staticmethod
def execute_prediction_model(data, metric, model_file, model_config):
def execute_prediction_model(autoencoder, data, metric):
try:
autoencoder = Autoencoder(model_file, model_config)
result = autoencoder.compute_json(metric, data)
return autoencoder.compute_json(metric, data)
except Exception as e:
logger.logger.error("Could not execute model")
result = Autoencoder.return_error(e)
logger.logger.error("Could not execute deep learning model")
return autoencoder.return_error(e)
finally:
tf.keras.backend.clear_session()
return result

@staticmethod
def return_error(error="error"):
Expand Down
19 changes: 8 additions & 11 deletions resources/src/ai/shallow_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ def predict(self, arr):

window_size = max(int(0.05 * len(arr)), min(len(arr), int(5 + np.log(len(arr)))))
window_size += 1 if window_size % 2 == 0 else 0
kernel = np.arange(1, window_size // 2 + 2, dtype=float)
kernel = np.concatenate((kernel, kernel[-2::-1]))
kernel[window_size // 2] = 0.25*window_size ** 2
kernel /= np.sum(kernel)
padded_arr = np.pad(arr, (window_size // 2, window_size // 2), mode='edge')
half_size = window_size // 2
kernel = np.linspace(1, half_size, half_size, dtype=float)
kernel = np.concatenate((kernel, [half_size**2 * 0.25], kernel[::-1]))
kernel /= kernel.sum()
padded_arr = np.pad(arr, half_size, mode='edge')
smooth_arr = np.convolve(padded_arr, kernel, mode='valid')
return smooth_arr


def get_outliers(self, arr, smoothed_arr):
"""
Given an array of data points and an aproximation of it, return a boolean array
Expand Down Expand Up @@ -127,12 +126,10 @@ def compute_json(self, raw_json):
@staticmethod
def execute_prediction_model(data):
try:
result = ShallowOutliers.compute_json(data)
return ShallowOutliers().compute_json(data)
except Exception as e:
logger.logger.error("Could not execute model")
result = ShallowOutliers.return_error(e)
finally:
return result
logger.logger.error("Could not execute shallow model")
return ShallowOutliers.return_error(e)

@staticmethod
def return_error(error="error"):
Expand Down
6 changes: 3 additions & 3 deletions resources/src/config.ini
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[OutliersServerProduction]
outliers_binding_address=0.0.0.0
outliers_server_port=39091
outliers_server_port=39093
outliers_server_workers=4
outliers_server_threads=20

[OutliersServerTesting]
outliers_binding_address=0.0.0.0
outliers_server_port=39091
outliers_server_port=39093

[Outliers]
metric=bytes
Expand All @@ -18,7 +18,7 @@ backup_path=./backups/
model_names=traffic

[Druid]
druid_endpoint=http://x.x.x.x:8080/druid/v2/
druid_endpoint=http://10.1.209.110:8080/druid/v2/

[Logger]
log_file=./outliers.log
Expand Down
115 changes: 61 additions & 54 deletions resources/src/server/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,60 +64,61 @@ def __init__(self):
self.s3_sync_thread = None
self.start_s3_sync_thread()
self.app = Flask(__name__)
self.app.add_url_rule('/api/v1/outliers', view_func=self.calculate, methods=['POST'])
self.exit_code = 0
self.ai_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ai")
self.deep_models={}

@self.app.route('/api/v1/outliers', methods=['POST'])
def calculate():
"""
Handle POST requests to '/api/v1/outliers'.
The endpoint expects parameters either in the request body or as form parameters with
the following format:
{
"query": "<base64_encoded_json_druid_query>",
"model": "<base64_encoded_model_name>" # Optional field
}
Where:
- `query`: A base64 encoded JSON Druid query specifying the data for analysis.
- `model` (Optional): A base64 encoded string representing the path of the predictive
model to be used. If not provided or if the specified model is not found, a default
model is used.
Returns:
A JSON response containing the prediction results or an error message.
"""

druid_query = request.form.get('query')
def calculate(self):
"""
Handle POST requests to '/api/v1/outliers'.
The endpoint expects parameters either in the request body or as form parameters with
the following format:
{
"query": "<base64_encoded_json_druid_query>",
"model": "<base64_encoded_model_name>" # Optional field
}
Where:
- `query`: A base64 encoded JSON Druid query specifying the data for analysis.
- `model` (Optional): A base64 encoded string representing the path of the predictive
model to be used. If not provided or if the specified model is not found, a default
model is used.
Returns:
A JSON response containing the prediction results or an error message.
"""

druid_query = request.form.get('query')
try:
druid_query = base64.b64decode(druid_query).decode('utf-8')
druid_query = json.loads(druid_query)
except Exception as e:
error_message = "Error decoding query"
logger.logger.error(error_message + ": " + str(e))
return self.return_error(error=error_message)
logger.logger.info("Druid query successfully decoded and loaded.")

model = request.form.get('model')
if model is None:
logger.logger.info("No model requested")
model = 'default'
else:
try:
druid_query = base64.b64decode(druid_query).decode('utf-8')
druid_query = json.loads(druid_query)
decoded_model = base64.b64decode(model).decode('utf-8')
model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras"))
if not model_path.startswith(os.path.normpath(self.ai_path)):
logger.logger.error(f"Attempted unauthorized file access: {decoded_model}")
model = 'default'
elif not os.path.isfile(model_path):
logger.logger.error(f"Model {decoded_model} does not exist")
model = 'default'
else:
model = decoded_model
except Exception as e:
error_message = "Error decoding query"
logger.logger.error(error_message + ": " + str(e))
return self.return_error(error=error_message)
logger.logger.info("Druid query successfully decoded and loaded.")

model = request.form.get('model')
if model is None:
logger.logger.info("No model requested")
logger.logger.error(f"Error decoding or checking model: {e}")
model = 'default'
else:
try:
decoded_model = base64.b64decode(model).decode('utf-8')
model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras"))
if not model_path.startswith(os.path.normpath(self.ai_path)):
logger.logger.error(f"Attempted unauthorized file access: {decoded_model}")
model = 'default'
elif not os.path.isfile(model_path):
logger.logger.error(f"Model {decoded_model} does not exist")
model = 'default'
else:
model = decoded_model
except Exception as e:
logger.logger.error(f"Error decoding or checking model: {e}")
model = 'default'
return self.execute_model(druid_query, config.get("Outliers","metric"), model)
return self.execute_model(druid_query, config.get("Outliers","metric"), model)

def execute_model(self, druid_query, metric, model='default'):
"""
Expand All @@ -143,15 +144,21 @@ def execute_model(self, druid_query, metric, model='default'):
error_message = "Could not execute druid query"
logger.logger.error(error_message + ": " + str(e))
return self.return_error(error=error_message)
logger.logger.info("Druid query executed succesfully")
try:
if model != 'default':
return jsonify(outliers.Autoencoder.execute_prediction_model(
data,
metric,
if model == 'default':
return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data))
if model not in self.deep_models:
logger.logger.info(f"Creating instance of model {model}")
self.deep_models[model]=outliers.Autoencoder(
os.path.join(self.ai_path, f"{model}.keras"),
os.path.join(self.ai_path, f"{model}.ini")
))
return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data))
)
return jsonify(self.deep_models[model].execute_prediction_model(
self.deep_models[model],
data,
metric,
))
except Exception as e:
error_message = "Error while calculating prediction model"
logger.logger.error(error_message + ": " + str(e))
Expand Down
37 changes: 20 additions & 17 deletions resources/tests/test_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,46 @@ def setUp(self):
data_file_path = os.path.join(current_dir, "outliers_test_data.json")
with open(data_file_path, "r") as data_file:
self.sample_data = json.load(data_file)
self.autoencoder=Autoencoder(
os.path.join(self.main_dir, "ai", "traffic.keras"),
os.path.join(self.main_dir, "ai", "traffic.ini")
)

def test_model_execution_with_no_data(self):
result = Autoencoder.execute_prediction_model(
self.autoencoder,
{},
"bytes",
os.path.join(self.main_dir, "ai", "traffic.keras"),
os.path.join(self.main_dir, "ai", "traffic.ini")
"bytes"
)
self.assertEqual(
result['status'],
'error'
)
def test_model_execution_with_no_metric(self):
result = Autoencoder.execute_prediction_model(
self.autoencoder,
self.sample_data,
"",
os.path.join(self.main_dir, "ai", "traffic.keras"),
os.path.join(self.main_dir, "ai", "traffic.ini")
""
)
self.assertEqual(
result['status'],
'error'
)
def test_model_execution_with_too_little_data(self):
result = Autoencoder.execute_prediction_model(
self.autoencoder,
self.sample_data[:10],
"bytes",
os.path.join(self.main_dir, "ai", "traffic.keras"),
os.path.join(self.main_dir, "ai", "traffic.ini")
"bytes"
)
self.assertEqual(
result['status'],
'error'
)
def test_model_execution_with_sample_data(self):
Autoencoder.execute_prediction_model(
self.autoencoder,
self.sample_data,
"bytes",
os.path.join(self.main_dir, "ai", "traffic.keras"),
os.path.join(self.main_dir, "ai", "traffic.ini")
)
def test_invalid_model(self):
with self.assertRaises(FileNotFoundError):
Expand Down Expand Up @@ -107,15 +107,18 @@ def test_load_empty_conifg(self):
)

def test_flatten_slice_identity(self):
TestAutoencoder = Autoencoder(
os.path.join(self.main_dir, "ai", "traffic.keras"),
os.path.join(self.main_dir, "ai", "traffic.ini")
)
np.random.seed(0)
rand_data = np.random.rand(32, 3)
sliced_data = TestAutoencoder.slice(rand_data)
flattened_data = TestAutoencoder.flatten(sliced_data)
sliced_data = self.autoencoder.slice(rand_data)
flattened_data = self.autoencoder.flatten(sliced_data)
self.assertTrue(np.allclose(flattened_data, rand_data))

def test_scale_descale_identity(self):
np.random.seed(0)
rand_data = np.random.rand(32, len(TestAutoencoder.columns))
rescaled_data = self.autoencoder.rescale(rand_data.copy())
descaled_data = self.autoencoder.descale(rescaled_data)
self.assertTrue(np.allclose(descaled_data, rand_data))

if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion resources/tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def test_calculate_endpoint_valid_model(self, mock_isfile, mock_execute_model, m
mock_execute_model.return_value = self.output_data
mock_query.return_value = {}
mock_isfile.return_value = True
data = {'model':'YXNkZg==', 'query':'eyJhc2RmIjoiYXNkZiJ9'}
data = {'model':'dHJhZmZpYw==', 'query':'eyJhc2RmIjoiYXNkZiJ9'}
with self.api_server.app.test_client().post('/api/v1/outliers', data=data) as response:
self.assertEqual(response.status_code, 200)
self.assertEqual(response.get_json(), self.output_data)
Expand Down

0 comments on commit b8318c3

Please sign in to comment.