Skip to content

Commit

Permalink
Merge pull request #46 from redBorder/fix_performance_issues
Browse files Browse the repository at this point in the history
PR-42: Fix performance issues
  • Loading branch information
malvads authored Feb 22, 2024
2 parents 46992a7 + 277113a commit 8037e82
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 132 deletions.
5 changes: 4 additions & 1 deletion resources/src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ def run_production_server(self):
options = {
'bind': f"{__binding_host__}:{__binding_port__}",
'workers': gunicorn_workers,
'threads': gunicorn_threads
'threads': gunicorn_threads,
'worker_class': 'gthread',
'max_requests': 100,
'max_worker_lifetime': 3600
}
self.server = APIServer()
self.app = GunicornApp(self.server, options)
Expand Down
108 changes: 54 additions & 54 deletions resources/src/ai/outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ def __init__(self, model_file, model_config_file):
self.columns = self.metrics + self.timestamp
general_section = model_config['General']
self.avg_loss = float(general_section.get('AVG_LOSS', 0.0))
self.std_loss = float(general_section.get('STD_LOSS', 0.0))
self.window_size = int(general_section.get('WINDOW_SIZE', 0))
self.num_window = int(general_section.get('NUM_WINDOWS', 0))
self.loss_mult_metric = float(general_section.get('LOSS_MULT_METRIC', 0))
self.loss_mult_minute = float(general_section.get('LOSS_MULT_MINUTE', 0))
self.std_loss = float(general_section.get('STD_LOSS', 1.0))
self.window_size = int(general_section.get('WINDOW_SIZE', 1))
self.num_window = int(general_section.get('NUM_WINDOWS', 1))
self.loss_mult_metric = float(general_section.get('LOSS_MULT_METRIC', 1.0))
self.loss_mult_minute = float(general_section.get('LOSS_MULT_MINUTE', 1.0))
except Exception as e:
logger.logger.error(f"Could not load model conif: {e}")
raise e
Expand Down Expand Up @@ -126,7 +126,7 @@ def rescale(self, data):
(numpy.ndarray): Rescaled data as a numpy array.
"""
num_metrics = len(self.metrics)
rescaled=data.copy()
rescaled=data
rescaled[..., 0:num_metrics]=np.tanh(np.log1p(rescaled[..., 0:num_metrics])/32)
rescaled[..., num_metrics]=rescaled[..., num_metrics]/1440
return rescaled
Expand All @@ -142,7 +142,7 @@ def descale(self, data):
(numpy.ndarray): Descaled data as a numpy array.
"""
num_metrics = len(self.metrics)
descaled = data.copy()
descaled = data
descaled = np.where(descaled > 1.0, 1.0, np.where(descaled < -1.0, -1.0, descaled))
descaled[..., 0:num_metrics] = np.expm1(32*np.arctanh(descaled[..., 0:num_metrics]))
descaled[..., num_metrics]=descaled[..., num_metrics]*1440
Expand All @@ -166,44 +166,42 @@ def model_loss(self, y_true, y_pred, single_value=True):
Returns:
(tf.Tensor): Weighted loss value or a 3D loss array.
"""
y_true = tf.cast(y_true, tf.float16)
y_pred = tf.cast(y_pred, tf.float16)
y_true = tf.cast(y_true, tf.bfloat16)
y_pred = tf.cast(y_pred, tf.bfloat16)
num_metrics = len(self.metrics)
num_features = len(self.columns)
is_metric = (tf.range(num_features) < num_metrics)
is_minute = (tf.range(num_features) == num_metrics)
mult_true = tf.where(
is_metric, self.loss_mult_metric * y_true,
tf.where(is_minute, self.loss_mult_minute * y_true, y_true)
)
mult_pred = tf.where(
is_metric, self.loss_mult_metric * y_pred,
tf.where(is_minute, self.loss_mult_minute * y_pred, y_pred)
)
standard_loss = tf.math.log(tf.cosh((mult_true - mult_pred)))
mult_true = tf.where(is_metric, self.loss_mult_metric * y_true, y_true)
mult_true = tf.where(is_minute, self.loss_mult_minute * mult_true, mult_true)
mult_pred = tf.where(is_metric, self.loss_mult_metric * y_pred, y_pred)
mult_pred = tf.where(is_minute, self.loss_mult_minute * mult_pred, mult_pred)
loss = tf.math.abs(mult_true-mult_pred)
loss = loss-tf.math.log(tf.cast(2.0, tf.bfloat16))+tf.math.log1p(tf.math.exp(-2.0*loss))
if single_value:
standard_loss = tf.reduce_mean(standard_loss)
return standard_loss
loss = tf.reduce_mean(loss)
return loss

def slice(self, data, index = []):

def slice(self, data, index=None):
"""
Transform a 2D numpy array into a 3D array readable by the model.
Transform a 2D numpy array into a 3D array readable by the model, with overlapping slices.
Args:
data (numpy.ndarray): 2D numpy array with the data to prepare.
index (list): Index in case you want only some of the slices returned.
index (list or numpy.ndarray): Index in case you want only some of the slices returned.
Returns:
(numpy.ndarray): 3D numpy array that can be processed by the model.
"""
_l = len(data)
sliced_data = []
slice_length = self.window_size * self.num_window
if len(index) == 0:
index = np.arange(0, _l-slice_length+1 , self.window_size)
for i in index:
sliced_data.append(data[i:i+slice_length])
return np.array(sliced_data)
if index is None:
index = np.arange(0, _l - slice_length + 1, self.window_size)
sliced_data = np.zeros((len(index), slice_length) + data.shape[1:])
for idx, start in enumerate(index):
sliced_data[idx] = data[start:start + slice_length]
return sliced_data

def flatten(self, data):
"""
Expand All @@ -213,18 +211,15 @@ def flatten(self, data):
Returns:
(numpy.ndarray): 2D numpy array with the natural format of the data.
"""
tsr = data.copy()
num_slices, slice_len, features = tsr.shape
num_slices, slice_len, features = data.shape
flattened_len = (num_slices-1)*self.window_size + slice_len
flattened_tensor = np.zeros([flattened_len, features])
scaling = np.zeros(flattened_len)
for i in range(num_slices):
left_pad = i*self.window_size
right_pad = left_pad+slice_len
flattened_tensor[left_pad:right_pad] += tsr[i]
scaling[left_pad:right_pad] +=1
flattened_tensor = flattened_tensor / scaling[:, np.newaxis]
return flattened_tensor
indices = np.arange(num_slices)[:, None]*self.window_size + np.arange(slice_len)
np.add.at(flattened_tensor, indices.ravel(), data.reshape(-1, features))
np.add.at(scaling, indices.ravel(), 1)
scaling[scaling == 0] = 1
return flattened_tensor / scaling[:, np.newaxis]

def calculate_predictions(self, data):
"""
Expand Down Expand Up @@ -268,6 +263,12 @@ def compute_json(self, metric, raw_json):
raise ValueError(error_msg)
threshold = self.avg_loss+5*self.std_loss
data, timestamps = self.input_json(raw_json)
if self.window_size*self.num_window > len(data):
error_msg = ("Too few datapoints for current model. The model "
f"needs at least {self.window_size*self.num_window } "
f"datapoints but only {len(data)} were inputted.")
logger.logger.error(error_msg)
raise ValueError(error_msg)
predicted, loss = self.calculate_predictions(data)
predicted = pd.DataFrame(predicted, columns=self.columns)
predicted['timestamp'] = timestamps
Expand Down Expand Up @@ -304,18 +305,17 @@ def input_json(self, raw_json):
"""
data = pd.json_normalize(raw_json)
data["granularity"] = self.granularity_from_dataframe(data)
metrics_dict = {f"result.{metric}": metric for metric in self.metrics}
data.rename(columns=metrics_dict, inplace=True)
timestamps = data['timestamp'].copy()
data['timestamp'] = pd.to_datetime(data['timestamp'])
data['minute'] = data['timestamp'].dt.minute + 60 * data['timestamp'].dt.hour
data['weekday']= data['timestamp'].dt.weekday
data.rename(columns={f"result.{metric}": metric for metric in self.metrics}, inplace=True)
timestamps = data['timestamp']
timestamp_dt = pd.to_datetime(timestamps)
data['timestamp'] = timestamp_dt
data['minute'] = timestamp_dt.dt.minute + 60 * timestamp_dt.dt.hour
data['weekday'] = timestamp_dt.dt.weekday
data = pd.get_dummies(data, columns=['weekday'], prefix=['weekday'], drop_first=True)
missing_columns = set(self.columns) - set(data.columns)
data[list(missing_columns)] = 0
for missing_column in set(self.columns) - set(data.columns):
data[missing_column] = 0
data = data[self.columns].dropna().astype('float')
data_array = data.values
return data_array, timestamps
return data.values, timestamps

def output_json(self, metric, anomalies, predicted):
"""
Expand All @@ -330,8 +330,6 @@ def output_json(self, metric, anomalies, predicted):
(dict): deserialized Json with the anomalies and predictions for the data with RedBorder prediction
Json format.
"""
predicted = predicted.copy()
anomalies = anomalies.copy()
predicted = predicted[[metric,'timestamp']].rename(columns={metric:"forecast"})
anomalies = anomalies[[metric,'timestamp']].rename(columns={metric:"expected"})
return {
Expand All @@ -341,13 +339,15 @@ def output_json(self, metric, anomalies, predicted):
}

@staticmethod
def execute_prediction_model(data, metric, model_file, model_config):
def execute_prediction_model(autoencoder, data, metric):
try:
autoencoder = Autoencoder(model_file, model_config)
return autoencoder.compute_json(metric, data)
except Exception as e:
logger.logger.error("Couldn't execute model")
return Autoencoder.return_error(e)
logger.logger.error("Could not execute deep learning model")
return autoencoder.return_error(e)
finally:
tf.keras.backend.clear_session()

@staticmethod
def return_error(error="error"):
"""
Expand Down
19 changes: 8 additions & 11 deletions resources/src/ai/shallow_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ def predict(self, arr):

window_size = max(int(0.05 * len(arr)), min(len(arr), int(5 + np.log(len(arr)))))
window_size += 1 if window_size % 2 == 0 else 0
kernel = np.arange(1, window_size // 2 + 2, dtype=float)
kernel = np.concatenate((kernel, kernel[-2::-1]))
kernel[window_size // 2] = 0.25*window_size ** 2
kernel /= np.sum(kernel)
padded_arr = np.pad(arr, (window_size // 2, window_size // 2), mode='edge')
half_size = window_size // 2
kernel = np.linspace(1, half_size, half_size, dtype=float)
kernel = np.concatenate((kernel, [half_size**2 * 0.25], kernel[::-1]))
kernel /= kernel.sum()
padded_arr = np.pad(arr, half_size, mode='edge')
smooth_arr = np.convolve(padded_arr, kernel, mode='valid')
return smooth_arr


def get_outliers(self, arr, smoothed_arr):
"""
Given an array of data points and an aproximation of it, return a boolean array
Expand Down Expand Up @@ -127,12 +126,10 @@ def compute_json(self, raw_json):
@staticmethod
def execute_prediction_model(data):
try:
result = ShallowOutliers.compute_json(data)
return ShallowOutliers().compute_json(data)
except Exception as e:
result = ShallowOutliers.return_error(e)
raise
finally:
return result
logger.logger.error("Could not execute shallow model")
return ShallowOutliers.return_error(e)

@staticmethod
def return_error(error="error"):
Expand Down
124 changes: 68 additions & 56 deletions resources/src/server/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,60 +64,61 @@ def __init__(self):
self.s3_sync_thread = None
self.start_s3_sync_thread()
self.app = Flask(__name__)
self.app.add_url_rule('/api/v1/outliers', view_func=self.calculate, methods=['POST'])
self.exit_code = 0
self.ai_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ai")
self.deep_models={}

@self.app.route('/api/v1/outliers', methods=['POST'])
def calculate():
"""
Handle POST requests to '/api/v1/outliers'.
The endpoint expects parameters either in the request body or as form parameters with
the following format:
{
"query": "<base64_encoded_json_druid_query>",
"model": "<base64_encoded_model_name>" # Optional field
}
Where:
- `query`: A base64 encoded JSON Druid query specifying the data for analysis.
- `model` (Optional): A base64 encoded string representing the path of the predictive
model to be used. If not provided or if the specified model is not found, a default
model is used.
Returns:
A JSON response containing the prediction results or an error message.
"""

druid_query = request.form.get('query')
def calculate(self):
"""
Handle POST requests to '/api/v1/outliers'.
The endpoint expects parameters either in the request body or as form parameters with
the following format:
{
"query": "<base64_encoded_json_druid_query>",
"model": "<base64_encoded_model_name>" # Optional field
}
Where:
- `query`: A base64 encoded JSON Druid query specifying the data for analysis.
- `model` (Optional): A base64 encoded string representing the path of the predictive
model to be used. If not provided or if the specified model is not found, a default
model is used.
Returns:
A JSON response containing the prediction results or an error message.
"""

druid_query = request.form.get('query')
try:
druid_query = base64.b64decode(druid_query).decode('utf-8')
druid_query = json.loads(druid_query)
except Exception as e:
error_message = "Error decoding query"
logger.logger.error(error_message + ": " + str(e))
return self.return_error(error=error_message)
logger.logger.info("Druid query successfully decoded and loaded.")

model = request.form.get('model')
if model is None:
logger.logger.info("No model requested")
model = 'default'
else:
try:
druid_query = base64.b64decode(druid_query).decode('utf-8')
druid_query = json.loads(druid_query)
decoded_model = base64.b64decode(model).decode('utf-8')
model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras"))
if not model_path.startswith(os.path.normpath(self.ai_path)):
logger.logger.error(f"Attempted unauthorized file access: {decoded_model}")
model = 'default'
elif not os.path.isfile(model_path):
logger.logger.error(f"Model {decoded_model} does not exist")
model = 'default'
else:
model = decoded_model
except Exception as e:
error_message = "Error decoding query"
logger.logger.error(error_message + " -> " + str(e))
return self.return_error(error=error_message)
logger.logger.info("Druid query successfully decoded and loaded.")

model = request.form.get('model')
if model is None:
logger.logger.info("No model requested")
logger.logger.error(f"Error decoding or checking model: {e}")
model = 'default'
else:
try:
decoded_model = base64.b64decode(model).decode('utf-8')
model_path = os.path.normpath(os.path.join(self.ai_path, f"{decoded_model}.keras"))
if not model_path.startswith(os.path.normpath(self.ai_path)):
logger.logger.error(f"Attempted unauthorized file access: {decoded_model}")
model = 'default'
elif not os.path.isfile(model_path):
logger.logger.error(f"Model {decoded_model} does not exist")
model = 'default'
else:
model = decoded_model
except Exception as e:
logger.logger.error(f"Error decoding or checking model -> {e}")
model = 'default'
return self.execute_model(druid_query, config.get("Outliers","metric"), model)
return self.execute_model(druid_query, config.get("Outliers","metric"), model)

def execute_model(self, druid_query, metric, model='default'):
"""
Expand All @@ -137,19 +138,30 @@ def execute_model(self, druid_query, metric, model='default'):
druid_query = query_modifier.modify_aggregations(druid_query)
else:
logger.logger.info("Calculating predictions with default model")
data = druid_client.execute_query(druid_query)
try:
if model != 'default':
return jsonify(outliers.Autoencoder.execute_prediction_model(
data,
metric,
data = druid_client.execute_query(druid_query)
except Exception as e:
error_message = "Could not execute druid query"
logger.logger.error(error_message + ": " + str(e))
return self.return_error(error=error_message)
logger.logger.info("Druid query executed succesfully")
try:
if model == 'default':
return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data))
if model not in self.deep_models:
logger.logger.info(f"Creating instance of model {model}")
self.deep_models[model]=outliers.Autoencoder(
os.path.join(self.ai_path, f"{model}.keras"),
os.path.join(self.ai_path, f"{model}.ini")
))
return jsonify(shallow_outliers.ShallowOutliers.execute_prediction_model(data))
)
return jsonify(self.deep_models[model].execute_prediction_model(
self.deep_models[model],
data,
metric,
))
except Exception as e:
error_message = "Error while calculating prediction model"
logger.logger.error(error_message + " -> " + str(e))
logger.logger.error(error_message + ": " + str(e))
return self.return_error(error=error_message)

def return_error(self, error="error"):
Expand Down
Loading

0 comments on commit 8037e82

Please sign in to comment.