From ff7c16368c4ba98ce15d547860116b7c341bfdc4 Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Sun, 29 Sep 2024 21:06:19 -0500 Subject: [PATCH] Add tests for generate job info and get status --- dapi/jobs/jobs.py | 14 +-- tests/jobs/test_job_gen_jobinfo.py | 120 ++++++++++++++++-------- tests/jobs/test_job_get_archive_path.py | 34 ------- tests/jobs/test_job_runtime.py | 114 ---------------------- tests/jobs/test_job_status.py | 56 +++++++---- 5 files changed, 125 insertions(+), 213 deletions(-) delete mode 100644 tests/jobs/test_job_get_archive_path.py delete mode 100644 tests/jobs/test_job_runtime.py diff --git a/dapi/jobs/jobs.py b/dapi/jobs/jobs.py index 4989a30..34971bc 100644 --- a/dapi/jobs/jobs.py +++ b/dapi/jobs/jobs.py @@ -11,7 +11,7 @@ # ) -def generate_job_description( +def generate_job_info( t: Any, # Tapis client app_name: str, input_uri: str, @@ -24,7 +24,7 @@ def generate_job_description( allocation: Optional[str] = None, ) -> Dict[str, Any]: """ - Generates a job description dictionary based on the provided application name, job name, input URI, input file, and optional allocation. + Generates a job info dictionary based on the provided application name, job name, input URI, input file, and optional allocation. Args: t (object): The Tapis API client object. @@ -39,7 +39,7 @@ def generate_job_description( allocation (str, optional): The allocation to use for the job. Defaults to None. Returns: - dict: The job description dictionary. + dict: The job info dictionary. """ # Fetch the latest app information @@ -49,8 +49,8 @@ def generate_job_description( if not job_name: job_name = f"{app_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - # Create the base job description - job_description = { + # Create the base job info + job_info = { "name": job_name, "appId": app_info.id, "appVersion": app_info.version, @@ -70,11 +70,11 @@ def generate_job_description( # Add TACC allocation if provided if allocation: - job_description["parameterSet"]["schedulerOptions"].append( + job_info["parameterSet"]["schedulerOptions"].append( {"name": "TACC Allocation", "arg": f"-A {allocation}"} ) - return job_description + return job_info def get_status(t, mjobUuid, tlapse=15): diff --git a/tests/jobs/test_job_gen_jobinfo.py b/tests/jobs/test_job_gen_jobinfo.py index 4b3779d..9128d21 100644 --- a/tests/jobs/test_job_gen_jobinfo.py +++ b/tests/jobs/test_job_gen_jobinfo.py @@ -1,51 +1,91 @@ import unittest from unittest.mock import Mock, patch from dapi.jobs import jobs +from datetime import datetime class TestGenerateJobInfo(unittest.TestCase): def setUp(self): - # Mock the ag object - self.ag_mock = Mock() - self.appid_valid = "valid-appid" - self.appid_invalid = "invalid-appid" - - # Define the behavior of the mocked method - def mock_get_method(appId): - if appId == self.appid_valid: - return True - raise Exception("Invalid app ID") - - # Set the side_effect for the mock to the defined behavior - self.ag_mock.apps.get.side_effect = mock_get_method - - def test_valid_appid(self): - """Test with a valid app ID.""" - result = jobs.generate_job_info(self.ag_mock, self.appid_valid) - self.assertEqual(result["appId"], self.appid_valid) - - def test_invalid_appid(self): - """Test with an invalid app ID.""" - try: - result = jobs.generate_job_info(self.ag_mock, self.appid_invalid) - print("Result:", result) - except Exception as e: - print("Exception raised:", e) - - with self.assertRaises(ValueError): - jobs.generate_job_info(self.ag_mock, self.appid_invalid) - - def test_default_values(self): - """Test with default values.""" - result = jobs.generate_job_info(self.ag_mock, self.appid_valid) - self.assertEqual(result["name"], "dsjob") - self.assertEqual(result["batchQueue"], "development") + self.t_mock = Mock() + self.app_name = "test-app" + self.input_uri = "tapis://test-system/input/data" + self.input_file = "input.txt" + + # Mock the getAppLatestVersion method + self.app_info_mock = Mock() + self.app_info_mock.id = self.app_name + self.app_info_mock.version = "1.0" + self.app_info_mock.jobAttributes.execSystemId = "test-exec-system" + self.app_info_mock.jobAttributes.maxMinutes = 60 + self.app_info_mock.jobAttributes.archiveOnAppError = True + self.app_info_mock.jobAttributes.execSystemLogicalQueue = "normal" + self.t_mock.apps.getAppLatestVersion.return_value = self.app_info_mock + + @patch("dapi.jobs.jobs.datetime") + def test_generate_job_info_default(self, mock_datetime): + mock_datetime.now.return_value = datetime(2023, 5, 1, 12, 0, 0) + + result = jobs.generate_job_info( + self.t_mock, self.app_name, self.input_uri, self.input_file + ) + + self.assertEqual(result["name"], f"{self.app_name}_20230501_120000") + self.assertEqual(result["appId"], self.app_name) + self.assertEqual(result["appVersion"], "1.0") + self.assertEqual(result["execSystemId"], "test-exec-system") + self.assertEqual(result["maxMinutes"], 60) + self.assertTrue(result["archiveOnAppError"]) + self.assertEqual( + result["fileInputs"], + [{"name": "Input Directory", "sourceUrl": self.input_uri}], + ) + self.assertEqual(result["execSystemLogicalQueue"], "normal") self.assertEqual(result["nodeCount"], 1) - self.assertEqual(result["processorsPerNode"], 1) - self.assertEqual(result["maxRunTime"], "00:10:00") - self.assertTrue(result["archive"]) - self.assertIsNone(result["inputs"]) - self.assertIsNone(result["parameters"]) + self.assertEqual(result["coresPerNode"], 1) + self.assertEqual( + result["parameterSet"]["appArgs"], + [{"name": "Input Script", "arg": self.input_file}], + ) + self.assertEqual(result["parameterSet"]["schedulerOptions"], []) + + def test_generate_job_info_custom(self): + custom_job_name = "custom-job" + custom_max_minutes = 120 + custom_node_count = 2 + custom_cores_per_node = 4 + custom_queue = "high-priority" + custom_allocation = "project123" + + result = jobs.generate_job_info( + self.t_mock, + self.app_name, + self.input_uri, + self.input_file, + job_name=custom_job_name, + max_minutes=custom_max_minutes, + node_count=custom_node_count, + cores_per_node=custom_cores_per_node, + queue=custom_queue, + allocation=custom_allocation, + ) + + self.assertEqual(result["name"], custom_job_name) + self.assertEqual(result["maxMinutes"], custom_max_minutes) + self.assertEqual(result["nodeCount"], custom_node_count) + self.assertEqual(result["coresPerNode"], custom_cores_per_node) + self.assertEqual(result["execSystemLogicalQueue"], custom_queue) + self.assertEqual( + result["parameterSet"]["schedulerOptions"], + [{"name": "TACC Allocation", "arg": f"-A {custom_allocation}"}], + ) + + def test_generate_job_info_invalid_app(self): + self.t_mock.apps.getAppLatestVersion.side_effect = Exception("Invalid app") + + with self.assertRaises(Exception): + jobs.generate_job_info( + self.t_mock, "invalid-app", self.input_uri, self.input_file + ) if __name__ == "__main__": diff --git a/tests/jobs/test_job_get_archive_path.py b/tests/jobs/test_job_get_archive_path.py deleted file mode 100644 index 7f2da79..0000000 --- a/tests/jobs/test_job_get_archive_path.py +++ /dev/null @@ -1,34 +0,0 @@ -import unittest -from unittest.mock import patch, Mock -from dapi.jobs import get_archive_path - - -class TestGetArchivePath(unittest.TestCase): - def test_get_archive_path(self): - # Create a mock Agave object and its method return value - mock_ag = Mock() - mock_job_info = Mock() - mock_job_info.archivePath = "user123/jobdata/somefile" - mock_ag.jobs.get.return_value = mock_job_info - - # Call the function - result = get_archive_path(mock_ag, "dummy_job_id") - - # Check the result - expected_path = "/home/jupyter/MyData/jobdata/somefile" - self.assertEqual(result, expected_path) - - def test_get_archive_path_invalid_format(self): - # Create a mock Agave object with an unexpected format return - mock_ag = Mock() - mock_job_info = Mock() - mock_job_info.archivePath = "invalid_format_path" - mock_ag.jobs.get.return_value = mock_job_info - - # Check if the function raises a ValueError as expected - with self.assertRaises(ValueError): - get_archive_path(mock_ag, "dummy_job_id") - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/jobs/test_job_runtime.py b/tests/jobs/test_job_runtime.py deleted file mode 100644 index 9be9f53..0000000 --- a/tests/jobs/test_job_runtime.py +++ /dev/null @@ -1,114 +0,0 @@ -import unittest -from unittest.mock import Mock -from io import StringIO -import sys -from datetime import datetime, timedelta - -import dapi as ds - - -class TestRuntimeSummary(unittest.TestCase): - def setUp(self): - super().setUp() - self.ag_mock = Mock() - start_time = datetime.now() - timedelta(minutes=10) - self.job_history = [ - {"status": "PENDING", "created": start_time}, - { - "status": "PROCESSING_INPUTS", - "created": start_time + timedelta(seconds=3), - }, - { - "status": "STAGING_INPUTS", - "created": start_time + timedelta(seconds=7), - }, # 3 + 4 - { - "status": "STAGED", - "created": start_time + timedelta(seconds=11), - }, # 7 + 4 - { - "status": "STAGING_JOB", - "created": start_time + timedelta(seconds=18), - }, # 11 + 7 - { - "status": "SUBMITTING", - "created": start_time + timedelta(seconds=30), - }, # 18 + 12 - { - "status": "QUEUED", - "created": start_time + timedelta(seconds=48), - }, # 30 + 18 - { - "status": "RUNNING", - "created": start_time + timedelta(minutes=1, seconds=14), - }, # 48 seconds + 26 seconds - { - "status": "CLEANING_UP", - "created": start_time + timedelta(minutes=1, seconds=22), - }, # 74 seconds + 22 seconds - { - "status": "ARCHIVING", - "created": start_time + timedelta(minutes=1, seconds=22), - }, # no change - {"status": "FINISHED", "created": start_time + timedelta(minutes=11)}, - {"status": "EMPTY", "created": start_time + timedelta(minutes=11)}, - ] - - def capture_output(self, ag_mock, job_id, verbose): - saved_stdout = sys.stdout - try: - out = StringIO() - sys.stdout = out - ds.jobs.runtime_summary(ag_mock, job_id, verbose) - return out.getvalue().strip() - finally: - sys.stdout = saved_stdout - - def test_runtime_summary_verbose_true(self): - self.ag_mock.jobs.getHistory.return_value = self.job_history - output = self.capture_output(self.ag_mock, "mock_id", True) - - # Expected output based on the mock data - expected_output = """ -Runtime Summary ---------------- -PENDING time: 0:00:03 -PROCESSING_INPUTS time: 0:00:04 -STAGING_INPUTS time: 0:00:04 -STAGED time: 0:00:07 -STAGING_JOB time: 0:00:12 -SUBMITTING time: 0:00:18 -QUEUED time: 0:00:26 -RUNNING time: 0:00:08 -CLEANING_UP time: 0:00:00 -ARCHIVING time: 0:09:38 -FINISHED time: 0:00:00 -TOTAL time: 0:11:00 ---------------- -""".strip() - - self.assertEqual(output, expected_output) - - def test_runtime_summary_verbose_false(self): - self.ag_mock.jobs.getHistory.return_value = self.job_history - output = self.capture_output(self.ag_mock, "mock_id", False) - - # Expected output based on the mock data when verbose=False - expected_output_verbose_false = """ -Runtime Summary ---------------- -PENDING time: 0:00:03 -QUEUED time: 0:00:26 -RUNNING time: 0:00:08 -FINISHED time: 0:00:00 -TOTAL time: 0:11:00 ---------------- -""".strip() - - self.assertEqual(output, expected_output_verbose_false) - - # Additional tests... - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/jobs/test_job_status.py b/tests/jobs/test_job_status.py index 542a045..4960ad8 100644 --- a/tests/jobs/test_job_status.py +++ b/tests/jobs/test_job_status.py @@ -1,4 +1,3 @@ -import time import unittest from unittest.mock import Mock, patch import dapi as ds @@ -7,32 +6,53 @@ class TestGetStatus(unittest.TestCase): @patch("time.sleep", Mock()) # Mocks the sleep function def test_get_status(self): - # Mock the Agave job object - mock_agave = Mock() - - # Define behavior for getStatus method - mock_agave.jobs.getStatus.side_effect = [ - {"status": "PENDING"}, - {"status": "PENDING"}, - {"status": "RUNNING"}, - {"status": "RUNNING"}, - {"status": "FINISHED"}, + # Mock the Tapis client object + mock_tapis = Mock() + + # Define behavior for getJobStatus method + mock_tapis.jobs.getJobStatus.side_effect = [ + Mock(status="PENDING"), + Mock(status="PENDING"), + Mock(status="RUNNING"), + Mock(status="RUNNING"), + Mock(status="FINISHED"), ] - # Define behavior for get method - # Equivalent to 36 seconds - mock_agave.jobs.get.return_value = {"maxHours": 0.01} + # Define behavior for getJob method + mock_tapis.jobs.getJob.return_value = Mock(maxMinutes=1) # Call get_status - status = ds.jobs.get_status(mock_agave, "some_job_id", time_lapse=1) + status = ds.jobs.get_status(mock_tapis, "some_job_uuid", tlapse=1) # Assert that the final status is "FINISHED" self.assertEqual(status, "FINISHED") # Assert the methods were called the expected number of times - mock_agave.jobs.getStatus.assert_called_with(jobId="some_job_id") - self.assertEqual(mock_agave.jobs.getStatus.call_count, 5) - mock_agave.jobs.get.assert_called_once_with(jobId="some_job_id") + mock_tapis.jobs.getJobStatus.assert_called_with(jobUuid="some_job_uuid") + self.assertEqual(mock_tapis.jobs.getJobStatus.call_count, 5) + mock_tapis.jobs.getJob.assert_called_once_with(jobUuid="some_job_uuid") + + @patch("time.sleep", Mock()) + def test_get_status_timeout(self): + # Mock the Tapis client object + mock_tapis = Mock() + + # Define behavior for getJobStatus method to simulate a job that doesn't finish + mock_tapis.jobs.getJobStatus.return_value = Mock(status="RUNNING") + + # Define behavior for getJob method + mock_tapis.jobs.getJob.return_value = Mock(maxMinutes=1) + + # Call get_status + status = ds.jobs.get_status(mock_tapis, "some_job_uuid", tlapse=1) + + # Assert that the final status is still "RUNNING" due to timeout + self.assertEqual(status, "RUNNING") + + # Assert the methods were called the expected number of times + expected_calls = 60 # 1 minute = 60 seconds, with tlapse=1 + self.assertGreaterEqual(mock_tapis.jobs.getJobStatus.call_count, expected_calls) + mock_tapis.jobs.getJob.assert_called_once_with(jobUuid="some_job_uuid") if __name__ == "__main__":