Skip to content

Commit

Permalink
Merge pull request #73 from isi-vista/48-pegasus-5-upgrade
Browse files Browse the repository at this point in the history
Update Wrapper to Support Pegasus 5.0.0
  • Loading branch information
lichtefeld authored Feb 8, 2021
2 parents 15e805c + 2455bed commit 32deba8
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 368 deletions.
30 changes: 11 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
<!--
[![Build status](https://ci.appveyor.com/api/projects/status/3jhdnwreqoni1492/branch/master?svg=true)](https://ci.appveyor.com/project/isi-vista/vista-pegasus-wrapper/branch/master)
-->
[![Build status](https://travis-ci.com/isi-vista/vista-pegasus-wrapper.svg?branch=master)](https://travis-ci.com/isi-vista/vista-pegasus-wrapper?branch=master)
[![codecov](https://codecov.io/gh/isi-vista/vista-pegasus-wrapper/branch/master/graph/badge.svg)](https://codecov.io/gh/isi-vista/vista-pegasus-wrapper)

Expand Down Expand Up @@ -28,25 +25,27 @@ This library simplifies the process of writing a profile which can be converted
Using [WorkflowBuilder from `workflow.py`](pegasus_wrapper/workflow.py) develop a function to generate a `Workflow.dax`.
See [example_workflow](pegasus_wrapper/scripts/example_workflow_builder.py) for an extremely simple workflow which we will use to demonstrate the process.
To see the example workflow add a `root.params` file to the parameters directory with the following:
*Note the Directory should be in your $Home and not a NFS like /nas/gaia/ as the submission will fail for an NFS reason*
```
example_root_dir: "path/to/output/dir/"
conda_environment: "pegasus-wrapper"
conda_base_path: "path/to/conda"
```
run `python -m pegasus_wrapper.scripts.example_workflow_builder parameters/root.params` from this project's root folder.

The log output will provide you the output location of the `Text.dax` Assuming you are logged into a submit node with an active Pegasus install:

```
cd "path/to/output/dir"
pegasus-plan --conf pegasus.conf --dax Test.dax --dir "path/to/output/dir" --relative-dir exampleRun-001
pegasus-run "path/to/output/dir/"exampleRun-001
./submit.sh
```
The example workflow submits **ONLY** to `scavenge`. In an actual workflow we would recommend parameterizing it.

Our current system places `ckpt` files to indicate that a job has finished in the event the DAX needs to be generated again to fix a bug after an issue was found. This system is non-comprehensive as it currently requires manual control. When submitting a new job using previous handles use a new relative dir in the plan and run.

A [Nuke Checkpoints](scripts/nuke_checkpoints.py) script is provided for ease of removing checkpoint files. To use, pass a directory location as the launch parameter and the script will remove checkpoint files from the directory and all sub-directories.

It is recommended to use a shared directory on the NAS, e.g. `/nas/gaia` to host a workflow under as compared to a users `/nas/user/xyz` home directory due to space limitations on the NAS.

# FAQ
## How can I exclude some nodes?

Expand All @@ -57,11 +56,6 @@ Use run_on_single_node parameter when you initialize a workflow (or a Slurm reso
* Note you cannot use this option with the **exclude_list** option.
* Note you cannot specify more than one node using this option.

## What are valid root directories for the workflow?

Currently the root directory should be be in your home directory and not on an NAS like `/nas/gaia/` as the submission will fail for an NFS reason.
The experiment directory can be (and ought to be) on such a drive, though.

# Common Errors

## Mismatching partition selection and max walltime
Expand All @@ -72,19 +66,17 @@ Partitions each have a max walltime associated with them. See the saga cluster w

If you change code while a pipeline is runnning, the jobs will pick up the changes. This could be helpful if you notice an error and fix it before that code runs, but can also lead to some unexpected behavior.

## `No module named 'Pegasus'`

This is a weird one that pops up usually when first getting set up with Pegasus. First, if you see this please contact one of the maintainers (currently @joecummings or @spigo900). To fix this, install the following packages with these commands in this exact order - they are dependent on each other.
1. `pip install git+https://github.com/pegasus-isi/pegasus/#egg=pegasus-wms.common&subdirectory=packages/pegasus-common`
2. `pip install git+https://github.com/pegasus-isi/pegasus/#egg=pegasus-wms.api&subdirectory=packages/pegasus-api`
3. `pip install git+https://github.com/pegasus-isi/pegasus/#egg=pegasus-wms.dax&subdirectory=packages/pegasus-dax-python`
4. `pip install git+https://github.com/pegasus-isi/pegasus/#egg=pegasus-wms.worker&subdirectory=packages/pegasus-worker`
5. `pip install git+https://github.com/pegasus-isi/pegasus/#egg=pegasus-wms&subdirectory=packages/pegasus-python`
## `No module named 'Pegasus'` (Version 4.9.3)
*This is believed to have been fixed for Pegasus Version 5. If this arises please leave an issue*

## Debugging from `srun` fails to load `cuda` and `cudnn`

A new node gotten with `srun` does not load the Spack modules you usually have set up in your runtime scripts. You need to manually install these if you want to work with Tensorflow or anything requiring Cuda.

# Updating from wrapper script to use Pegasus5.0.0 from Pegasus4.9.3

No changes should be needed for any project using the previous version of the wrapper which supported Pegasus4.9.3.

# Contributing

Run `make precommit` before commiting.
Expand Down
2 changes: 1 addition & 1 deletion pegasus_wrapper/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pegasus_wrapper.locator import Locator

from more_itertools import collapse
from Pegasus.DAX3 import File, Job
from Pegasus.api import File, Job
from typing_extensions import Protocol


Expand Down
158 changes: 73 additions & 85 deletions pegasus_wrapper/pegasus_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
from pathlib import Path
from typing import Optional

from Pegasus.api import Arch
from Pegasus.DAX3 import OS, PFN, Executable, File
from vistautils.parameters import Parameters

from Pegasus.api import (
OS,
Arch,
Directory,
FileServer,
Operation,
Properties,
Site,
SiteCatalog,
)

SUBMIT_SCRIPT = """#!/bin/bash
set -e
pegasus-plan \\
--conf pegasus.conf \\
--dax {dax_file} \\
{dax_file} \\
--conf pegasus.properties \\
--dir {workflow_directory} \\
--cleanup leaf \\
--force \\
Expand All @@ -20,89 +29,68 @@
"""


def path_to_pegasus_file(
path: Path,
*,
site: str = "local",
name: Optional[str] = None,
is_raw_input: bool = False
) -> File:
"""
Given a *path* object return a pegasus `File` for usage in a workflow
If the resource is not on a local machine provide the *site* string.
Files can be used for either an input or output of a Job.
Args:
path: path to the file
site: site to be used, default is local. Should be set to saga if running
on cluster.
name: name given to the file
is_raw_input: indicates that the file doesn't come from the output of another
job in the workflow, so can be safely added to the Pegasus DAX
Returns:
Pegasus File at the given path
"""
rtnr = File(name if name else str(path.absolute()).replace("/", "-"))
if is_raw_input:
rtnr.addPFN(path_to_pfn(path, site=site))
return rtnr


def path_to_pfn(path: Path, *, site: str = "local") -> PFN:
return PFN(str(path.absolute()), site=site)


def script_to_pegasus_executable(
path: Path,
name: Optional[str] = None,
*,
site: str = "local",
namespace: Optional[str] = None,
version: Optional[str] = None,
arch: Optional[Arch] = None,
os: Optional[OS] = None,
osrelease: Optional[str] = None,
osversion: Optional[str] = None,
glibc: Optional[str] = None,
installed: Optional[bool] = None,
container: Optional[str] = None
) -> Executable:
"""
Turns a script path into a pegasus Executable
Arguments:
*name*: Logical name of executable
*namespace*: Executable namespace
*version*: Executable version
*arch*: Architecture that this exe was compiled for
*os*: Name of os that this exe was compiled for
*osrelease*: Release of os that this exe was compiled for
*osversion*: Version of os that this exe was compiled for
*glibc*: Version of glibc this exe was compiled against
*installed*: Is the executable installed (true), or stageable (false)
*container*: Optional attribute to specify the container to use
"""

rtrnr = Executable(
path.stem + path.suffix if name is None else name,
namespace=namespace,
version=version,
arch=arch,
os=os,
osrelease=osrelease,
osversion=osversion,
glibc=glibc,
installed=installed,
container=container,
)
rtrnr.addPFN(path_to_pfn(path, site=site))
return rtrnr


def build_submit_script(path: Path, dax_file: str, workflow_directory: Path) -> None:
path.write_text(
SUBMIT_SCRIPT.format(workflow_directory=workflow_directory, dax_file=dax_file)
)
# Designate the submit script as executable
path.chmod(0o777)


def add_local_nas_to_sites(
sites_catalog: SiteCatalog, params: Parameters = Parameters.empty()
) -> None:
home = params.string("home_dir", default=str(Path.home().absolute()))
shared_scratch_dir = params.string(
"local_shared_scratch", default=f"{home}/workflows/scratch"
)
local_storage_dir = params.string("local_storage", default=f"{home}/workflows/output")

sites_catalog.add_sites(
Site("local", arch=Arch.X86_64, os_type=OS.LINUX).add_directories(
Directory(Directory.SHARED_SCRATCH, shared_scratch_dir).add_file_servers(
FileServer("file://" + shared_scratch_dir, Operation.ALL)
),
Directory(Directory.LOCAL_STORAGE, local_storage_dir).add_file_servers(
FileServer("file://" + local_storage_dir, Operation.ALL)
),
)
)


def add_saga_cluster_to_sites(
sites_catalog: SiteCatalog, params: Parameters = Parameters.empty()
) -> None:
home = params.string("home_dir", default=str(Path.home().absolute()))

shared_scratch_dir = params.string(
"saga_shared_scratch", default=f"{home}/workflows/shared-scratch"
)

saga = Site("saga", arch=Arch.X86_64, os_type=OS.LINUX)
saga.add_directories(
Directory(Directory.SHARED_SCRATCH, shared_scratch_dir).add_file_servers(
FileServer("file://" + shared_scratch_dir, Operation.ALL)
)
)

saga.add_env(
key="PEGASUS_HOME", value="/nas/gaia/shared/cluster/pegasus5/pegasus-5.0.0"
)

# Profiles
saga.add_pegasus_profile(style="glite", auxillary_local=True)
saga.add_condor_profile(grid_resource="batch slurm")

sites_catalog.add_sites(saga)


def configure_saga_properities( # pylint: disable=unused-argument
properties: Properties, params: Parameters = Parameters.empty()
) -> None:
properties["pegasus.data.configuration"] = "sharedfs"
properties["pegasus.metrics.app"] = "SAGA"
properties["dagman.retry"] = "0"

# TODO: Implement a method to add parameters to this properties file
# See: https://github.com/isi-vista/vista-pegasus-wrapper/issues/72
40 changes: 19 additions & 21 deletions pegasus_wrapper/resource_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from vistautils.parameters import Parameters
from vistautils.range import Range

from Pegasus.DAX3 import Job, Namespace, Profile
from Pegasus.api import Job
from saga_tools.slurm import to_slurm_memory_string
from typing_extensions import Protocol

Expand Down Expand Up @@ -159,7 +159,7 @@ def from_parameters(params: Parameters) -> ResourceRequest:

def unify(self, other: ResourceRequest) -> ResourceRequest:
if isinstance(other, SlurmResourceRequest):
partition = other.partition or self.partition
partition = other.partition
else:
partition = self.partition

Expand All @@ -186,19 +186,11 @@ def apply_to_job(self, job: Job, *, job_name: str) -> None:
f"Partition '{self.partition.name}' has a max walltime of {self.partition.max_walltime} mins, which is less than the time given ({self.job_time_in_minutes} mins) for job: {job_name}."
)

qos_or_account = (
f"qos {self.partition.name}"
if self.partition.name in (SCAVENGE, EPHEMERAL)
else f"account {self.partition.name}"
)
slurm_resource_content = SLURM_RESOURCE_STRING.format(
qos_or_account=qos_or_account,
partition=self.partition.name,
num_cpus=self.num_cpus or 1,
num_gpus=self.num_gpus if self.num_gpus is not None else 0,
job_name=job_name,
mem_str=to_slurm_memory_string(self.memory or _SLURM_DEFAULT_MEMORY),
time=self.convert_time_to_slurm_format(self.job_time_in_minutes),
)

if (
Expand All @@ -216,18 +208,24 @@ def apply_to_job(self, job: Job, *, job_name: str) -> None:
if self.run_on_single_node:
slurm_resource_content += f" --nodelist={self.run_on_single_node}"

logging.debug(
"Slurm Resource Request for %s: %s", job_name, slurm_resource_content
)
job.addProfile(
Profile(Namespace.PEGASUS, "glite.arguments", slurm_resource_content)
if self.partition.name in (SCAVENGE, EPHEMERAL):
slurm_resource_content += f" --qos={self.partition.name}"

job.add_pegasus_profile(
runtime=self.job_time_in_minutes,
queue=str(self.partition.name),
project=None
if self.partition.name in (EPHEMERAL, SCAVENGE)
else self.partition.name,
glite_arguments=slurm_resource_content,
)
category_profile = Profile(Namespace.DAGMAN, "category", self.partition)
if not job.hasProfile(category_profile):
job.addProfile(category_profile)

if (
"dagman" not in job.profiles.keys()
or "CATEGORY" not in job.profiles["dagman"].keys()
):
job.add_dagman_profile(category=str(self.partition))


SLURM_RESOURCE_STRING = """--{qos_or_account} --partition {partition} --ntasks 1
--cpus-per-task {num_cpus} --gpus-per-task {num_gpus} --job-name {job_name} --mem {mem_str}
--time {time}"""
SLURM_RESOURCE_STRING = """--ntasks=1 --cpus-per-task={num_cpus} --gpus-per-task={num_gpus} --job-name={job_name} --mem={mem_str}"""
_BACKEND_PARAM = "backend"
Empty file.
20 changes: 0 additions & 20 deletions pegasus_wrapper/resources/pegasus.conf

This file was deleted.

Loading

0 comments on commit 32deba8

Please sign in to comment.