Skip to content

Commit

Permalink
Merge pull request galaxyproject#18579 from mvdbeek/container_job_met…
Browse files Browse the repository at this point in the history
…rics

Record container id and type in core job metrics
  • Loading branch information
jmchilton authored Jul 29, 2024
2 parents c1ce4d4 + d4ae33a commit 9d975b5
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 11 deletions.
18 changes: 18 additions & 0 deletions lib/galaxy/job_metrics/instrumenters/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""The module describes the ``core`` job metrics plugin."""

import json
import logging
import time
from typing import (
Expand All @@ -23,10 +24,16 @@
START_EPOCH_KEY = "start_epoch"
END_EPOCH_KEY = "end_epoch"
RUNTIME_SECONDS_KEY = "runtime_seconds"
CONTAINER_ID = "container_id"
CONTAINER_TYPE = "container_type"


class CorePluginFormatter(JobMetricFormatter):
def format(self, key: str, value: Any) -> FormattedMetric:
if key == CONTAINER_ID:
return FormattedMetric("Container ID", value)
if key == CONTAINER_TYPE:
return FormattedMetric("Container Type", value)
value = int(value)
if key == GALAXY_SLOTS_KEY:
return FormattedMetric("Cores Allocated", "%d" % value)
Expand Down Expand Up @@ -73,12 +80,23 @@ def job_properties(self, job_id, job_directory: str) -> Dict[str, Any]:
properties[GALAXY_MEMORY_MB_KEY] = self.__read_integer(galaxy_memory_mb_file)
start = self.__read_seconds_since_epoch(job_directory, "start")
end = self.__read_seconds_since_epoch(job_directory, "end")
properties.update(self.__read_container_details(job_directory))
if start is not None and end is not None:
properties[START_EPOCH_KEY] = start
properties[END_EPOCH_KEY] = end
properties[RUNTIME_SECONDS_KEY] = end - start
return properties

def get_container_file_path(self, job_directory):
return self._instrument_file_path(job_directory, "container")

def __read_container_details(self, job_directory) -> Dict[str, str]:
try:
with open(self.get_container_file_path(job_directory)) as fh:
return json.load(fh)
except FileNotFoundError:
return {}

def __record_galaxy_slots_command(self, job_directory):
galaxy_slots_file = self.__galaxy_slots_file(job_directory)
return f"""echo "$GALAXY_SLOTS" > '{galaxy_slots_file}' """
Expand Down
15 changes: 13 additions & 2 deletions lib/galaxy/jobs/command_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import typing
from logging import getLogger
from os import getcwd
from os import (
getcwd,
makedirs,
)
from os.path import (
abspath,
join,
Expand Down Expand Up @@ -81,8 +84,16 @@ def build_command(
__handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params)

__handle_task_splitting(commands_builder, job_wrapper)

for_pulsar = "pulsar_version" in remote_command_params
if container:
if core_job_metric_plugin := runner.app.job_metrics.default_job_instrumenter.get_configured_plugin("core"):
directory = join(job_wrapper.working_directory, "metadata") if for_pulsar else job_wrapper.working_directory
makedirs(directory, exist_ok=True)
container_file_path = core_job_metric_plugin.get_container_file_path(directory)
with open(container_file_path, "w") as container_file:
container_file.write(
json.dumps({"container_id": container.container_id, "container_type": container.container_type})
)
if (container and modify_command_for_container) or job_wrapper.commands_in_new_shell:
if container and modify_command_for_container:
# Many Docker containers do not have /bin/bash.
Expand Down
33 changes: 24 additions & 9 deletions test/integration/test_containerized_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,47 @@ class MulledJobTestCases:
"""

dataset_populator: DatasetPopulator
container_type: str

def _run_and_get_contents(self, tool_id: str, history_id: str):
run_response = self.dataset_populator.run_tool(tool_id, {}, history_id)
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id=job_id, assert_ok=True, timeout=EXTENDED_TIMEOUT)
job_metrics = self.dataset_populator._get(f"/api/jobs/{job_id}/metrics").json()
# would be nice if it wasn't just a list of unpredictable order ...
container_id = None
container_type = None
for metric in job_metrics:
if metric["name"] == "container_id":
container_id = metric["value"]
if metric["name"] == "container_type":
container_type = metric["value"]
assert container_id, "Job metrics did not include container_id"
assert container_type, "Job metrics did not include container_type"
assert container_type == self.container_type
return self.dataset_populator.get_history_dataset_content(
history_id, content_id=run_response["outputs"][0]["id"]
)

def test_explicit(self, history_id: str) -> None:
"""
tool having one package + one explicit container requirement
"""
self.dataset_populator.run_tool("mulled_example_explicit", {}, history_id)
self.dataset_populator.wait_for_history(history_id, assert_ok=True, timeout=EXTENDED_TIMEOUT)
output = self.dataset_populator.get_history_dataset_content(history_id)
output = self._run_and_get_contents("mulled_example_explicit", history_id)
assert "0.7.15-r1140" in output

def test_mulled_simple(self, history_id: str) -> None:
"""
tool having one package requirement
"""
self.dataset_populator.run_tool("mulled_example_simple", {}, history_id)
self.dataset_populator.wait_for_history(history_id, assert_ok=True, timeout=EXTENDED_TIMEOUT)
output = self.dataset_populator.get_history_dataset_content(history_id)
output = self._run_and_get_contents("mulled_example_simple", history_id)
assert "0.7.15-r1140" in output

def test_mulled_explicit_invalid_case(self, history_id: str) -> None:
"""
tool having one package + one (invalid? due to capitalization) explicit container requirement
"""
self.dataset_populator.run_tool("mulled_example_invalid_case", {}, history_id)
self.dataset_populator.wait_for_history(history_id, assert_ok=True, timeout=EXTENDED_TIMEOUT)
output = self.dataset_populator.get_history_dataset_content(history_id)
output = self._run_and_get_contents("mulled_example_invalid_case", history_id)
assert "0.7.15-r1140" in output


Expand Down
1 change: 1 addition & 0 deletions test/integration/test_kubernetes_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class TestKubernetesIntegration(BaseJobEnvironmentIntegrationTestCase, MulledJob
jobs_directory: str
persistent_volume_claims: List[KubeSetupConfigTuple]
persistent_volumes: List[KubeSetupConfigTuple]
container_type = "docker"

def setUp(self) -> None:
super().setUp()
Expand Down

0 comments on commit 9d975b5

Please sign in to comment.