Skip to content

Commit

Permalink
Updated implementation of cross-cluster computing and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Darren Govoni committed Mar 1, 2022
1 parent 54bc631 commit edc44d5
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 156 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,36 @@ In addition there are other primitives to help manipulate lists of tasks or data

- **where** - Filter a list of tasks or data elements based on a function or lambda
- **select** - Apply a function to each list element and return the result


### Cross-Cluster Supercomputing

Blazer comes with a built-in design pattern for performing cross-cluster HPC. This is useful if you want to allocate compute resources on different super-computers and then build a pipeline of jobs across them. Here is a simple example using ALCF's Cooley and Theta systems (which are built into blazer).

```python
from blazer.hpc.alcf import cooley, thetagpu
from blazer.hpc.local import parallel, pipeline, partial as p

# Log into each cluster using MAF password from MobilePASS
cooleyjob = cooley.job(user='dgovoni', n=1, q="debug", A="datascience", password=True, script="/home/dgovoni/git/blazer/testcooley.sh").login()
thetajob = thetagpu.job(user='dgovoni', n=1, q="single-gpu", A="datascience", password=True, script="/home/dgovoni/git/blazer/testthetagpu.sh").login()

def hello(data, *args):
return "Hello "+str(data)

# Mix and match cluster compute jobs with local code tasks
# in serial chaining
cooleyjob("some data").then(hello).then(thetajob).then(hello)

# Run a cross cluster compute job
result = pipeline([
p(thetajob,"some data2"),
p(cooleyjob,"some data1")
])

print("Done")
```

When each job `.login()` method is run, it will gather the MAF login credentials for that system and then use that to schedule jobs on that system via ssh.

Notice the use of the `pipeline` primitive above. It's the same primitive you would use to build your compute workflows! Composable tasks and composable super-computers.
5 changes: 5 additions & 0 deletions blazer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import logging

logging.basicConfig(
format="%(asctime)s : %(levelname)s : %(message)s", level=logging.DEBUG
)
from .hpc.mpi.primitives import stop, begin, skip, MASTER as ROOT, mprint as print

__all__ = (
Expand Down
43 changes: 0 additions & 43 deletions blazer/examples/example8.py

This file was deleted.

25 changes: 17 additions & 8 deletions blazer/examples/example9.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
from blazer.hpc.alcf import cooley, thetagpu
from blazer.hpc.local import parallel, pipeline, partial as p

job1 = cooley.job(n=2, q="debug", A="datascience", password=True, script="/home/dgovoni/git/blazer/testcooley.sh")
job2 = thetagpu.job(n=1, q="single-gpu", A="datascience", password=True, script="/home/dgovoni/git/blazer/testthetagpu.sh")
# Log into each cluster using MAF password from MobilePASS
cooleyjob = cooley.job(user='dgovoni', n=1, q="debug", A="datascience", password=True, script="/home/dgovoni/git/blazer/testcooley.sh").login()
thetajob = thetagpu.job(user='dgovoni', n=1, q="single-gpu", A="datascience", password=True, script="/home/dgovoni/git/blazer/testthetagpu.sh").login()

result = "start" | job1 | job2
print(result)
def hello(data, *args):
return "Hello "+str(data)

'''
job2 = thetagpu.job(n=1, q="single-gpu", A="datascience", venv="/home/dgovoni/git/blazer/venv/bin/python", password=True,
code="/home/dgovoni/git/blazer/blazer/examples/example7.py")
'''
# Mix and match cluster compute jobs with local code tasks
# in serial chaining
cooleyjob("some data").then(hello).then(thetajob).then(hello)

# Run a cross cluster compute job
result = pipeline([
p(thetajob,"some data2"),
p(cooleyjob,"some data1")
])

print("Done")
3 changes: 2 additions & 1 deletion blazer/hpc/alcf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from . import _cooley as cooley
from . import _thetagpu as thetagpu
from pipe import Pipe

__all__ = (
'cooley',
'thetagpu'
)
)
115 changes: 68 additions & 47 deletions blazer/hpc/alcf/_cooley.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,77 @@
import logging

from contextlib import contextmanager
import traceback
from pipe import Pipe
import paramiko
import getpass
from .then import Then

@Pipe
def job(data, q="debug", n=1, t=5, A='datascience', venv=None, script=None, code=None, password=False):
import time
import datetime

print("COOLEY DATA",data)

_ssh = paramiko.SSHClient()
_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

if password:
pw = getpass.getpass("Password: ")
_ssh.connect(hostname="cooley.alcf.anl.gov", username="dgovoni", password=pw)
else:
_ssh.connect(hostname="cooley.alcf.anl.gov", username="dgovoni") # sshkey?

if script:
command = f"qsub -t {t} -n {n} -q {q} {script}"
else:
command = f"qsub -t {t} -n {n} -q {q} {venv} {code}"

_, stdout, _ = _ssh.exec_command(command)

job_id = None
for line in stdout.read().splitlines():
parts = line.split()
if len(parts) == 1:
job_id = int(parts[0])

start = datetime.datetime.now()
not_finished = True
while not_finished:
command = f"qstat -u dgovoni | grep {job_id}"
_, stdout, _ = _ssh.exec_command(command)
for line in stdout.read().splitlines():
if str(line).find("exiting") > -1:
not_finished = False

now = datetime.datetime.now()
if now - start > datetime.timedelta(minutes=t):
not_finished = False
time.sleep(1)

print("Cooley running...")
time.sleep(3)
@Then
def job(user='nobody', q="debug", n=1, t=5, A='datascience', venv=None, script=None, code=None, password=False):

class Job:

def __init__(self):
logging.debug("Job init")
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def login(self):
logging.debug("Logging in from Job")
pw = getpass.getpass("Cooley MAF Password: ")
self._ssh.connect(hostname="cooley.alcf.anl.gov", username=user, password=pw)

return self

def __call__(self, *args, **kwargs):
logging.debug("Job call %s %s",args, kwargs)
import time
import datetime

data = "stubbed"
logging.debug("COOLEY DATA %s",data)

if script:
command = f"qsub -t {t} -n {n} -q {q} {script}"
else:
command = f"qsub -t {t} -n {n} -q {q} {venv} {code}"

_, stdout, _ = self._ssh.exec_command(command)

job_id = None
for line in stdout.read().splitlines():
parts = line.split()
if len(parts) == 1:
job_id = int(parts[0])

start = datetime.datetime.now()
not_finished = True
while not_finished:
command = f"qstat -u {user} | grep {job_id}"
_, stdout, _ = self._ssh.exec_command(command)
for line in stdout.read().splitlines():
if str(line).find("exiting") > -1:
not_finished = False

# Determine if job succeeded or failed.
# If failed, throw exception
if False:
raise Exception()

now = datetime.datetime.now()
if now - start > datetime.timedelta(minutes=t):
not_finished = False
time.sleep(1)

logging.debug("Cooley exiting...")

self._ssh.close()
return "cooley" + str(data)

return "cooley" + data
logging.debug("Returning job from cooley")
return Job()

class run(object):

Expand All @@ -59,10 +80,10 @@ def __exit__(self, _type, value, _traceback):
stack = traceback.extract_stack()
file, end = self._get_origin_info(stack,'__exit__')
self.end = end
print("FILE",self.file,self.start,self.end)
logging.debug("FILE",self.file,self.start,self.end)
with open(file) as code:
lines = code.readlines()
print("COOLEY CODE:",lines[self.start:self.end])
logging.debug("COOLEY CODE: %s",lines[self.start:self.end])

def __enter__(self):
stack = traceback.extract_stack()
Expand Down
Loading

0 comments on commit edc44d5

Please sign in to comment.