Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource Allocation data in Redis #3430

Merged
merged 13 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion qiita_db/meta_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@
from re import sub
from json import loads, dump, dumps

from qiita_db.util import create_nested_path
from qiita_db.util import create_nested_path, retrieve_resource_data
from qiita_db.util import resource_allocation_plot
from qiita_core.qiita_settings import qiita_config, r_client
from qiita_core.configuration_manager import ConfigurationManager
import qiita_db as qdb

# global constant list used in resource_allocation_page
COLUMNS = [
"sName", "sVersion", "cID", "cName", "processing_job_id",
"parameters", "samples", "columns", "input_size", "extra_info",
"MaxRSSRaw", "ElapsedRaw", "Start", "node_name", "node_model"]


def _get_data_fpids(constructor, object_id):
"""Small function for getting filepath IDS associated with data object
Expand Down Expand Up @@ -546,3 +553,101 @@ def generate_plugin_releases():
# important to "flush" variables to avoid errors
r_client.delete(redis_key)
f(redis_key, v)


def get_software_commands(active):
software_list = [s for s in qdb.software.Software.iter(active=active)]
software_commands = defaultdict(lambda: defaultdict(list))

for software in software_list:
sname = software.name
sversion = software.version
commands = software.commands

for command in commands:
software_commands[sname][sversion].append(command.name)

return dict(software_commands)


def update_resource_allocation_redis(active=True):
"""Updates redis with plots and information about current software.

Parameters
----------
active: boolean, optional
Defaults to True. Should only be False when testing.

"""
time = datetime.now().strftime('%m-%d-%y')
scommands = get_software_commands(active)
redis_key = 'resources:commands'
r_client.set(redis_key, str(scommands))

for sname, versions in scommands.items():
for version, commands in versions.items():
for cname in commands:

col_name = "samples * columns"
df = retrieve_resource_data(cname, sname, version, COLUMNS)
if len(df) == 0:
continue

fig, axs = resource_allocation_plot(df, cname, sname, col_name)
titles = [0, 0]
images = [0, 0]

# Splitting 1 image plot into 2 separate for better layout.
for i, ax in enumerate(axs):
titles[i] = ax.get_title()
ax.set_title("")
# new_fig, new_ax – copy with either only memory plot or
# only time
new_fig = plt.figure()
new_ax = new_fig.add_subplot(111)

scatter_data = ax.collections[0]
new_ax.scatter(scatter_data.get_offsets()[:, 0],
scatter_data.get_offsets()[:, 1],
s=scatter_data.get_sizes(), label="data")

line = ax.lines[0]
new_ax.plot(line.get_xdata(), line.get_ydata(),
linewidth=1, color='orange')

if len(ax.collections) > 1:
failure_data = ax.collections[1]
new_ax.scatter(failure_data.get_offsets()[:, 0],
failure_data.get_offsets()[:, 1],
color='red', s=3, label="failures")

new_ax.set_xscale('log')
new_ax.set_yscale('log')
new_ax.set_xlabel(ax.get_xlabel())
new_ax.set_ylabel(ax.get_ylabel())
new_ax.legend(loc='upper left')

new_fig.tight_layout()
plot = BytesIO()
new_fig.savefig(plot, format='png')
plot.seek(0)
img = 'data:image/png;base64,' + quote(
b64encode(plot.getvalue()).decode('ascii'))
images[i] = img
plt.close(new_fig)
plt.close(fig)

# SID, CID, col_name
values = [
("img_mem", images[0], r_client.set),
("img_time", images[1], r_client.set),
('time', time, r_client.set),
("title_mem", titles[0], r_client.set),
("title_time", titles[1], r_client.set)
]

for k, v, f in values:
redis_key = 'resources$#%s$#%s$#%s$#%s:%s' % (
cname, sname, version, col_name, k)
r_client.delete(redis_key)
f(redis_key, v)
21 changes: 21 additions & 0 deletions qiita_db/test/test_meta_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,27 @@ def test_generate_plugin_releases(self):
'-', '').replace(':', '').replace(' ', '-')
self.assertEqual(tgz_obs, [time])

def test_update_resource_allocation_redis(self):
cname = "Split libraries FASTQ"
sname = "QIIMEq2"
col_name = "samples * columns"
version = "1.9.1"
qdb.meta_util.update_resource_allocation_redis(False)
title_mem_str = 'resources$#%s$#%s$#%s$#%s:%s' % (
cname, sname, version, col_name, 'title_mem')
title_mem = str(r_client.get(title_mem_str))
self.assertTrue(
"model: "
"k * log(x) + "
"b * log(x)^2 + "
"a * log(x)^3" in title_mem
)

title_time_str = 'resources$#%s$#%s$#%s$#%s:%s' % (
cname, sname, version, col_name, 'title_time')
title_time = str(r_client.get(title_time_str))
self.assertTrue("model: a + b + log(x) * k" in title_time)


if __name__ == '__main__':
main()
21 changes: 11 additions & 10 deletions qiita_db/test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1311,8 +1311,9 @@ def test_quick_mounts_purge(self):

class ResourceAllocationPlotTests(TestCase):
def setUp(self):
self.CNAME = "Split libraries FASTQ"
self.SNAME = "QIIMEq2"
self.cname = "Split libraries FASTQ"
self.sname = "QIIMEq2"
self.version = "1.9.1"
self.col_name = 'samples * columns'
self.columns = [
"sName", "sVersion", "cID", "cName", "processing_job_id",
Expand All @@ -1321,13 +1322,13 @@ def setUp(self):

# df is a dataframe that represents a table with columns specified in
# self.columns
self.df = qdb.util._retrieve_resource_data(
self.CNAME, self.SNAME, self.columns)
self.df = qdb.util.retrieve_resource_data(
self.cname, self.sname, self.version, self.columns)

def test_plot_return(self):
# check the plot returns correct objects
fig1, axs1 = qdb.util.resource_allocation_plot(
self.df, self.CNAME, self.SNAME, self.col_name)
self.df, self.cname, self.sname, self.col_name)
self.assertIsInstance(
fig1, Figure,
"Returned object fig1 is not a Matplotlib Figure")
Expand All @@ -1338,13 +1339,13 @@ def test_plot_return(self):

def test_minimize_const(self):
self.df = self.df[
(self.df.cName == self.CNAME) & (self.df.sName == self.SNAME)]
(self.df.cName == self.cname) & (self.df.sName == self.sname)]
self.df.dropna(subset=['samples', 'columns'], inplace=True)
self.df[self.col_name] = self.df.samples * self.df['columns']
fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False)

bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[0], self.CNAME, self.SNAME, 'MaxRSSRaw',
self.df, axs[0], self.cname, self.sname, 'MaxRSSRaw',
qdb.util.MODELS_MEM, self.col_name)
# check that the algorithm chooses correct model for MaxRSSRaw and
# has 0 failures
Expand All @@ -1366,7 +1367,7 @@ def test_minimize_const(self):
# check that the algorithm chooses correct model for ElapsedRaw and
# has 1 failure
bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[1], self.CNAME, self.SNAME, 'ElapsedRaw',
self.df, axs[1], self.cname, self.sname, 'ElapsedRaw',
qdb.util.MODELS_TIME, self.col_name)
k, a, b = options.x
failures_df = qdb.util._resource_allocation_failures(
Expand Down Expand Up @@ -1422,8 +1423,8 @@ def test_db_update(self):
qdb.util.update_resource_allocation_table(test=test_data)

for curr_cname, ids in types.items():
updated_df = qdb.util._retrieve_resource_data(
curr_cname, self.SNAME, self.columns)
updated_df = qdb.util.retrieve_resource_data(
curr_cname, self.sname, self.version, self.columns)
updated_ids_set = set(updated_df['processing_job_id'])
previous_ids_set = set(self.df['processing_job_id'])
for id in ids:
Expand Down
44 changes: 36 additions & 8 deletions qiita_db/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@
MODELS_TIME = [time_model1, time_model2, time_model3, time_model4]


def get_model_name(model):
if model == mem_model1:
return "k * log(x) + x * a + b"
elif model == mem_model2:
return "k * log(x) + b * log(x)^2 + a"
elif model == mem_model3:
return "k * log(x) + b * log(x)^2 + a * log(x)^3"
elif model == mem_model4:
return "k * log(x) + b * log(x)^2 + a * log(x)^2.5"
elif model == time_model1:
return "a + b + log(x) * k"
elif model == time_model2:
return "a + b * x + log(x) * k"
elif model == time_model3:
return "a + b * log(x)^2 + log(x) * k"
elif model == time_model4:
return "a * log(x)^3 + b * log(x)^2 + log(x) * k"
else:
return "Unknown model"


def scrub_data(s):
r"""Scrubs data fields of characters not allowed by PostgreSQL

Expand Down Expand Up @@ -2381,7 +2402,7 @@ def resource_allocation_plot(df, cname, sname, col_name):
return fig, axs


def _retrieve_resource_data(cname, sname, columns):
def retrieve_resource_data(cname, sname, version, columns):
with qdb.sql_connection.TRN:
sql = """
SELECT
Expand Down Expand Up @@ -2411,9 +2432,10 @@ def _retrieve_resource_data(cname, sname, columns):
ON pr.processing_job_id = sra.processing_job_id
WHERE
sc.name = %s
AND s.name = %s;
AND s.name = %s
AND s.version = %s
"""
qdb.sql_connection.TRN.add(sql, sql_args=[cname, sname])
qdb.sql_connection.TRN.add(sql, sql_args=[cname, sname, version])
res = qdb.sql_connection.TRN.execute_fetchindex()
df = pd.DataFrame(res, columns=columns)
return df
Expand Down Expand Up @@ -2482,15 +2504,18 @@ def _resource_allocation_plot_helper(
y_plot = best_model(x_plot, k, a, b)
ax.plot(x_plot, y_plot, linewidth=1, color='orange')

cmin_value = min(y_plot)
cmax_value = max(y_plot)

maxi = naturalsize(df[curr].max(), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(df[curr].max()))
cmax = naturalsize(max(y_plot), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(max(y_plot)))
cmax = naturalsize(cmax_value, gnu=True) if curr == "MaxRSSRaw" else \
str(timedelta(seconds=round(cmax_value, 2))).rstrip('0').rstrip('.')

mini = naturalsize(df[curr].min(), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(df[curr].min()))
cmin = naturalsize(min(y_plot), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(min(y_plot)))
cmin = naturalsize(cmin_value, gnu=True) if curr == "MaxRSSRaw" else \
str(timedelta(seconds=round(cmin_value, 2))).rstrip('0').rstrip('.')

x_plot = np.array(df[col_name])
failures_df = _resource_allocation_failures(
Expand All @@ -2500,7 +2525,10 @@ def _resource_allocation_plot_helper(
ax.scatter(failures_df[col_name], failures_df[curr], color='red', s=3,
label="failures")

ax.set_title(f'{cname}: {sname}\n real: {mini} || {maxi}\n'
ax.set_title(
f'k||a||b: {k}||{a}||{b}\n'
f'model: {get_model_name(best_model)}\n'
f'real: {mini} || {maxi}\n'
f'calculated: {cmin} || {cmax}\n'
f'failures: {failures}')
ax.legend(loc='upper left')
Expand Down
1 change: 1 addition & 0 deletions scripts/all-qiita-cron-job
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ qiita-cron-job empty-trash-upload-folder
qiita-cron-job generate-biom-and-metadata-release
qiita-cron-job purge-filepaths
qiita-cron-job update-redis-stats
qiita-cron-job update-resource-allocation-redis
qiita-cron-job generate-plugin-releases
qiita-cron-job purge-json-web-tokens
6 changes: 6 additions & 0 deletions scripts/qiita-cron-job
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ from qiita_db.util import (
quick_mounts_purge as qiita_quick_mounts_purge)
from qiita_db.meta_util import (
update_redis_stats as qiita_update_redis_stats,
update_resource_allocation_redis as qiita_update_resource_allocation_redis,
generate_biom_and_metadata_release as
qiita_generate_biom_and_metadata_release,
generate_plugin_releases as qiita_generate_plugin_releases)
Expand Down Expand Up @@ -48,6 +49,11 @@ def update_redis_stats():
qiita_update_redis_stats()


@commands.command()
def update_resource_allocation_redis():
qiita_update_resource_allocation_redis()


@commands.command()
def generate_biom_and_metadata_release():
qiita_generate_biom_and_metadata_release('public')
Expand Down
Loading