Skip to content

Commit

Permalink
Added run CLI command
Browse files Browse the repository at this point in the history
  • Loading branch information
Darren Govoni committed Mar 10, 2022
1 parent 8a5bbc9 commit f803b36
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 5 deletions.
57 changes: 53 additions & 4 deletions blazer/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,58 @@ def cli(context, debug):
format="%(asctime)s : %(name)s %(levelname)s : %(message)s", level=logging.INFO
)

logger.info("Blazer CLI!")
logger.debug("Blazer DEBUG!")
if len(sys.argv) == 1:
click.echo(context.get_help())


@cli.command(name="run", help="Run a shell command")
@click.option("-s", "--shell", is_flag=True, default=False, help="Run command in shell")
@click.option("-m", "--mpi", is_flag=True, default=False, help="Enable MPI")
@click.option("-a", "--args", is_flag=True, default=False, help="Add jobid and uuid as args")
@click.option("-n","--numjobs", default=1, help="Number of times to run")
@click.option("-c","--command", default="echo hello", help="Command to run in \"'s")
@click.pass_context
def run(context, shell, mpi, args, numjobs, command):
from uuid import uuid4
import blazer
from blazer.hpc.mpi import rank, scatter

cmds = []
for i in range(0,numjobs):
uuid = str(uuid4())

if args:
_command = f"{command} {i} {uuid}"
else:
_command = command
cmds += [{'command':_command, 'jobid':i, 'uuid':uuid}]

def run_cmd(cmd):
import os
import subprocess

if not cmd:
return 0

logging.info("run_cmd: %s",cmd)
if rank:
logging.info("Running job Rank[%s] %s: jobid [%s] uuid [%s]",rank,cmd['command'],cmd['jobid'],cmd['uuid'])
else:
logging.info("Running job %s: jobid [%s] uuid [%s]",cmd['command'],cmd['jobid'],cmd['uuid'])

result = subprocess.run(cmd['command'].split(' '), stdout=subprocess.PIPE)

return result.stdout.decode('utf-8').strip()

if mpi:

with blazer.begin():
results = scatter(cmds, run_cmd)
blazer.print("RESULTS:",results)
else:
result = [run_cmd(cmd) for cmd in cmds]
print("RESULTS:",result)




if len(sys.argv) == 1:
click.echo(context.get_help())
13 changes: 12 additions & 1 deletion blazer/hpc/mpi/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,25 @@ def chunker(generator, chunksize):

chunked_data = chunker(data, size)
results = []

for i, chunk in enumrate(chunked_data):

# Pad data for rank size
extra_chunks = 0
if len(chunk) < size:
chunk += [None for i in range(len(chunk), size)]
chunk_list = [None for i in range(len(chunk), size)]
extra_chunks = len(chunk_list)
chunk += chunk_list

data = comm.scatter(chunk, root=0)
_data = func(data)
logging.debug("[%s] scatter[%s, %s]: Chunk %s %s, Func is %s Data is %s Result is %s", host, rank,host,i, chunk, func, data, _data)
newData = comm.gather(_data, root=0)

# Unpad data
if newData and extra_chunks > 0:
newData = newData[:-extra_chunks]

results += [newData]

return flatten(results)
Expand Down

0 comments on commit f803b36

Please sign in to comment.