diff --git a/blazer/cli/cli.py b/blazer/cli/cli.py index 7f180f3..9649bff 100644 --- a/blazer/cli/cli.py +++ b/blazer/cli/cli.py @@ -27,9 +27,58 @@ def cli(context, debug): format="%(asctime)s : %(name)s %(levelname)s : %(message)s", level=logging.INFO ) - logger.info("Blazer CLI!") - logger.debug("Blazer DEBUG!") + if len(sys.argv) == 1: + click.echo(context.get_help()) + + +@cli.command(name="run", help="Run a shell command") +@click.option("-s", "--shell", is_flag=True, default=False, help="Run command in shell") +@click.option("-m", "--mpi", is_flag=True, default=False, help="Enable MPI") +@click.option("-a", "--args", is_flag=True, default=False, help="Add jobid and uuid as args") +@click.option("-n","--numjobs", default=1, help="Number of times to run") +@click.option("-c","--command", default="echo hello", help="Command to run in \"'s") +@click.pass_context +def run(context, shell, mpi, args, numjobs, command): + from uuid import uuid4 + import blazer + from blazer.hpc.mpi import rank, scatter + + cmds = [] + for i in range(0,numjobs): + uuid = str(uuid4()) + + if args: + _command = f"{command} {i} {uuid}" + else: + _command = command + cmds += [{'command':_command, 'jobid':i, 'uuid':uuid}] + + def run_cmd(cmd): + import os + import subprocess + + if not cmd: + return 0 + + logging.info("run_cmd: %s",cmd) + if rank: + logging.info("Running job Rank[%s] %s: jobid [%s] uuid [%s]",rank,cmd['command'],cmd['jobid'],cmd['uuid']) + else: + logging.info("Running job %s: jobid [%s] uuid [%s]",cmd['command'],cmd['jobid'],cmd['uuid']) + + result = subprocess.run(cmd['command'].split(' '), stdout=subprocess.PIPE) + + return result.stdout.decode('utf-8').strip() + + if mpi: + + with blazer.begin(): + results = scatter(cmds, run_cmd) + blazer.print("RESULTS:",results) + else: + result = [run_cmd(cmd) for cmd in cmds] + print("RESULTS:",result) + + - if len(sys.argv) == 1: - click.echo(context.get_help()) \ No newline at end of file diff --git a/blazer/hpc/mpi/primitives.py b/blazer/hpc/mpi/primitives.py index cff2b91..e673645 100644 --- a/blazer/hpc/mpi/primitives.py +++ b/blazer/hpc/mpi/primitives.py @@ -253,14 +253,25 @@ def chunker(generator, chunksize): chunked_data = chunker(data, size) results = [] + for i, chunk in enumrate(chunked_data): + + # Pad data for rank size + extra_chunks = 0 if len(chunk) < size: - chunk += [None for i in range(len(chunk), size)] + chunk_list = [None for i in range(len(chunk), size)] + extra_chunks = len(chunk_list) + chunk += chunk_list data = comm.scatter(chunk, root=0) _data = func(data) logging.debug("[%s] scatter[%s, %s]: Chunk %s %s, Func is %s Data is %s Result is %s", host, rank,host,i, chunk, func, data, _data) newData = comm.gather(_data, root=0) + + # Unpad data + if newData and extra_chunks > 0: + newData = newData[:-extra_chunks] + results += [newData] return flatten(results)