Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tapisv3 #1

Merged
merged 21 commits into from
Nov 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP: Generate job description for MPM from Tapis App
kks32 committed Sep 30, 2024
commit 070290273865b854eec17e571f43f762b781586f
163 changes: 84 additions & 79 deletions dapi/jobs/jobs.py
Original file line number Diff line number Diff line change
@@ -2,58 +2,113 @@
from datetime import datetime, timedelta, timezone
from tqdm import tqdm
import logging
import json
from typing import Dict, Any, Optional

# Configuring the logging system
# logging.basicConfig(
# level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
# )


def get_status(ag, job_id, time_lapse=15):
def generate_job_description(
t: Any, # Tapis client
app_name: str,
input_uri: str,
input_file: str,
job_name: str = None,
max_minutes: Optional[int] = None,
node_count: Optional[int] = None,
cores_per_node: Optional[int] = None,
queue: Optional[str] = None,
allocation: Optional[str] = None,
) -> Dict[str, Any]:
"""
Retrieves and monitors the status of a job from Agave.

This function initially waits for the job to start, displaying its progress using
a tqdm progress bar. Once the job starts, it monitors the job's status up to
a maximum duration specified by the job's "maxHours". If the job completes or fails
before reaching this maximum duration, it returns the job's final status.
Generates a job description dictionary based on the provided application name, job name, input URI, input file, and optional allocation.

Args:
ag (object): The Agave job object used to interact with the job.
job_id (str): The unique identifier of the job to monitor.
time_lapse (int, optional): Time interval, in seconds, to wait between status
checks. Defaults to 15 seconds.
t (object): The Tapis API client object.
app_name (str): The name of the application to use for the job.
input_uri (str): The URI of the input data for the job.
input_file (str): The local file path to the input data for the job.
job_name (str, optional): The name of the job to be created. Defaults to None.
max_minutes (int, optional): The maximum number of minutes the job can run. Defaults to None.
node_count (int, optional): The number of nodes to use for the job. Defaults to None.
cores_per_node (int, optional): The number of cores per node for the job. Defaults to None.
queue (str, optional): The queue to use for the job. Defaults to None.
allocation (str, optional): The allocation to use for the job. Defaults to None.

Returns:
str: The final status of the job. Typical values include "FINISHED", "FAILED",
and "STOPPED".
dict: The job description dictionary.
"""

Raises:
No exceptions are explicitly raised, but potential exceptions raised by the Agave
job object or other called functions/methods will propagate.
# Fetch the latest app information
app_info = t.apps.getAppLatestVersion(appId=app_name)

# If job_name is not provided, use the app name and date
if not job_name:
job_name = f"{app_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

# Create the base job description
job_description = {
"name": job_name,
"appId": app_info.id,
"appVersion": app_info.version,
"execSystemId": app_info.jobAttributes.execSystemId,
"maxMinutes": max_minutes or app_info.jobAttributes.maxMinutes,
"archiveOnAppError": app_info.jobAttributes.archiveOnAppError,
"fileInputs": [{"name": "Input Directory", "sourceUrl": input_uri}],
"execSystemLogicalQueue": queue
or app_info.jobAttributes.execSystemLogicalQueue,
"nodeCount": node_count or 1, # Default to 1 if not specified
"coresPerNode": cores_per_node or 1, # Default to 1 if not specified
"parameterSet": {
"appArgs": [{"name": "Input Script", "arg": input_file}],
"schedulerOptions": [],
},
}

# Add TACC allocation if provided
if allocation:
job_description["parameterSet"]["schedulerOptions"].append(
{"name": "TACC Allocation", "arg": f"-A {allocation}"}
)

return job_description


def get_status(t, mjobUuid, tlapse=15):
"""
Retrieves and monitors the status of a job using Tapis API.
This function waits for the job to start, then monitors it for up to maxMinutes.

Args:
t (object): The Tapis API client object.
mjobUuid (str): The unique identifier of the job to monitor.
tlapse (int, optional): Time interval, in seconds, to wait between status checks. Defaults to 15 seconds.

Returns:
str: The final status of the job (FINISHED, FAILED, or STOPPED).
"""
previous_status = None
# Initially check if the job is already running
status = ag.jobs.getStatus(jobId=job_id)["status"]

job_details = ag.jobs.get(jobId=job_id)
max_hours = job_details["maxHours"]
status = t.jobs.getJobStatus(jobUuid=mjobUuid).status
max_minutes = t.jobs.getJob(jobUuid=mjobUuid).maxMinutes

# Using tqdm to provide visual feedback while waiting for job to start
with tqdm(desc="Waiting for job to start", dynamic_ncols=True) as pbar:
while status not in ["RUNNING", "FINISHED", "FAILED", "STOPPED"]:
time.sleep(time_lapse)
status = ag.jobs.getStatus(jobId=job_id)["status"]
time.sleep(tlapse)
status = t.jobs.getJobStatus(jobUuid=mjobUuid).status
pbar.update(1)
pbar.set_postfix_str(f"Status: {status}")

# Once the job is running, monitor it for up to maxHours
max_iterations = int(max_hours * 3600 // time_lapse)
# Once the job is running, monitor it for up to maxMinutes
max_iterations = int(max_minutes * 60 // tlapse)

# Using tqdm for progress bar
for _ in tqdm(range(max_iterations), desc="Monitoring job", ncols=100):
status = ag.jobs.getStatus(jobId=job_id)["status"]
status = t.jobs.getJobStatus(jobUuid=mjobUuid).status

# Print status if it has changed
if status != previous_status:
@@ -64,10 +119,12 @@ def get_status(ag, job_id, time_lapse=15):
if status in ["FINISHED", "FAILED", "STOPPED"]:
break

time.sleep(time_lapse)
time.sleep(tlapse)
else:
# This block will execute if the for loop completes without a 'break'
logging.warn("Warning: Maximum monitoring time reached!")
logging.warning(
f"Warning: Maximum monitoring time of {max_minutes} minutes reached!"
)

return status

@@ -129,58 +186,6 @@ def runtime_summary(ag, job_id, verbose=False):
print("---------------")


def generate_job_info(
ag,
appid: str,
jobname: str = "dsjob",
queue: str = "development",
nnodes: int = 1,
nprocessors: int = 1,
runtime: str = "00:10:00",
inputs=None,
parameters=None,
) -> dict:
"""Generate a job information dictionary based on provided arguments.

Args:
ag (object): The Agave object to interact with the platform.
appid (str): The application ID for the job.
jobname (str, optional): The name of the job. Defaults to 'dsjob'.
queue (str, optional): The batch queue name. Defaults to 'skx-dev'.
nnodes (int, optional): The number of nodes required. Defaults to 1.
nprocessors (int, optional): The number of processors per node. Defaults to 1.
runtime (str, optional): The maximum runtime in the format 'HH:MM:SS'. Defaults to '00:10:00'.
inputs (dict, optional): The inputs for the job. Defaults to None.
parameters (dict, optional): The parameters for the job. Defaults to None.

Returns:
dict: A dictionary containing the job information.

Raises:
ValueError: If the provided appid is not valid.
"""

try:
app = ag.apps.get(appId=appid)
except Exception:
raise ValueError(f"Invalid app ID: {appid}")

job_info = {
"appId": appid,
"name": jobname,
"batchQueue": queue,
"nodeCount": nnodes,
"processorsPerNode": nprocessors,
"memoryPerNode": "1",
"maxRunTime": runtime,
"archive": True,
"inputs": inputs,
"parameters": parameters,
}

return job_info


def get_archive_path(ag, job_id):
"""
Get the archive path for a given job ID and modifies the user directory