From 906bdb2379ffb1bc958776c52de1298742bdcc68 Mon Sep 17 00:00:00 2001 From: jeremiah-corrado <62707311+jeremiah-corrado@users.noreply.github.com> Date: Fri, 6 Sep 2024 10:15:28 -0600 Subject: [PATCH] Option for running each benchmark in its own server instance (#3730) * set up option for running each benchmark in its own server instance Signed-off-by: Jeremiah Corrado * fix command formatting Signed-off-by: Jeremiah Corrado * separate option for launching server from within slurm allocation Signed-off-by: Jeremiah Corrado * apply black formatting Signed-off-by: Jeremiah Corrado * parse new run_benchmarks arguments as bool Signed-off-by: Jeremiah Corrado * parse 'within_slrum_alloc' arg as bool Signed-off-by: Jeremiah Corrado * fix default arguments Signed-off-by: Jeremiah Corrado --------- Signed-off-by: Jeremiah Corrado --- benchmarks/run_benchmarks.py | 69 ++++++++++++++++++++++------ server_util/test/server_test_util.py | 65 +++++++++++++++++++++++--- 2 files changed, 115 insertions(+), 19 deletions(-) diff --git a/benchmarks/run_benchmarks.py b/benchmarks/run_benchmarks.py index 22914cbf96..1560a5efd0 100755 --- a/benchmarks/run_benchmarks.py +++ b/benchmarks/run_benchmarks.py @@ -11,8 +11,13 @@ import os import subprocess import sys -from server_util.test.server_test_util import get_arkouda_numlocales, start_arkouda_server,\ - run_client, stop_arkouda_server +from server_util.test.server_test_util import ( + get_arkouda_numlocales, + start_arkouda_server, + run_client, + stop_arkouda_server, +) + benchmark_dir = os.path.dirname(__file__) util_dir = os.path.join(benchmark_dir, "..", "server_util", "test") sys.path.insert(0, os.path.abspath(util_dir)) @@ -90,12 +95,13 @@ def add_to_dat(benchmark, output, dat_dir, graph_infra): benchmark_out = "{}.exec.out.tmp".format(benchmark) with open(benchmark_out, "w") as f: f.write(output) - subprocess.check_output([computePerfStats, benchmark, dat_dir, perfkeys, benchmark_out]) + subprocess.check_output( + [computePerfStats, benchmark, dat_dir, perfkeys, benchmark_out] + ) os.remove(benchmark_out) def generate_graphs(args): - """ Generate graphs using the existing .dat files and graph infrastructure. """ @@ -140,20 +146,31 @@ def create_parser(): default=get_arkouda_numlocales(), help="Number of locales to use for the server", ) - parser.add_argument("-sp", "--server-port", default="5555", help="Port number to use for the server") - parser.add_argument("--server-args", action="append", help="Additional server arguments") - parser.add_argument("--numtrials", default=1, type=int, help="Number of trials to run") parser.add_argument( - "benchmarks", nargs="*", help="Basename of benchmarks to run with extension stripped" + "-sp", "--server-port", default="5555", help="Port number to use for the server" + ) + parser.add_argument( + "--server-args", action="append", help="Additional server arguments" + ) + parser.add_argument( + "--numtrials", default=1, type=int, help="Number of trials to run" + ) + parser.add_argument( + "benchmarks", + nargs="*", + help="Basename of benchmarks to run with extension stripped", ) parser.add_argument( "--save-data", default=False, action="store_true", - help="Save performance data to output files, requires $CHPL_HOME" + help="Save performance data to output files, requires $CHPL_HOME", ) parser.add_argument( - "--gen-graphs", default=False, action="store_true", help="Generate graphs, requires $CHPL_HOME" + "--gen-graphs", + default=False, + action="store_true", + help="Generate graphs, requires $CHPL_HOME", ) parser.add_argument( "--dat-dir", @@ -167,10 +184,22 @@ def create_parser(): help="Directory containing graph infrastructure", ) parser.add_argument("--platform-name", default="", help="Test platform name") - parser.add_argument("--description", default="", help="Description of this configuration") + parser.add_argument( + "--description", default="", help="Description of this configuration" + ) parser.add_argument("--annotations", default="", help="File containing annotations") parser.add_argument("--configs", help="comma seperate list of configurations") parser.add_argument("--start-date", help="graph start date") + parser.add_argument( + "--isolated", + default=False, + help="run each benchmark in its own server instance", + ) + parser.add_argument( + "--within-slrum-alloc", + default=False, + help="whether this script was launched from within a slurm allocation (for use with --isolated only)", + ) return parser @@ -179,22 +208,36 @@ def main(): args, client_args = parser.parse_known_args() args.graph_dir = args.graph_dir or os.path.join(args.dat_dir, "html") config_dat_dir = os.path.join(args.dat_dir, args.description) + run_isolated = bool(args.isolated) if args.save_data or args.gen_graphs: os.makedirs(config_dat_dir, exist_ok=True) - start_arkouda_server(args.num_locales, port=args.server_port, server_args=args.server_args) + if not run_isolated: + start_arkouda_server( + args.num_locales, port=args.server_port, server_args=args.server_args + ) args.benchmarks = args.benchmarks or BENCHMARKS for benchmark in args.benchmarks: + if run_isolated: + start_arkouda_server( + args.num_locales, + port=args.server_port, + server_args=args.server_args, + within_slurm_alloc=bool(args.within_slrum_alloc), + ) for trial in range(args.numtrials): benchmark_py = os.path.join(benchmark_dir, "{}.py".format(benchmark)) out = run_client(benchmark_py, client_args) if args.save_data or args.gen_graphs: add_to_dat(benchmark, out, config_dat_dir, args.graph_infra) print(out) + if run_isolated: + stop_arkouda_server() - stop_arkouda_server() + if not run_isolated: + stop_arkouda_server() if args.save_data or args.gen_graphs: comp_file = os.getenv("ARKOUDA_PRINT_PASSES_FILE", "") diff --git a/server_util/test/server_test_util.py b/server_util/test/server_test_util.py index 8995cb4088..a7f3cddf69 100644 --- a/server_util/test/server_test_util.py +++ b/server_util/test/server_test_util.py @@ -27,6 +27,7 @@ class TestRunningMode(Enum): """ Enum indicating the running mode of the test harness """ + CLIENT = "CLIENT" CLASS_SERVER = "CLASS_SERVER" GLOBAL_SERVER = "GLOBAL_SERVER" @@ -117,11 +118,11 @@ def read_server_and_port_from_file(server_connection_info): while True: try: with open(server_connection_info, "r") as f: - (hostname,port,connect_url) = f.readline().split(" ") + (hostname, port, connect_url) = f.readline().split(" ") port = int(port) if hostname == socket.gethostname(): hostname = "localhost" - return (hostname,port,connect_url) + return (hostname, port, connect_url) except (ValueError, FileNotFoundError) as e: time.sleep(1) continue @@ -181,7 +182,51 @@ def kill_server(server_process): server_process.kill() -def start_arkouda_server(numlocales, trace=False, port=5555, host=None, server_args=None): +def get_server_launch_cmd(numlocales): + """ + Get an srun command to launch ./arkouda_server_real directly + """ + import re + + # get the srun command for 'arkouda_server_real' + p = subprocess.Popen( + ["./arkouda_server", f"-nl{numlocales}", "--dry-run"], stdout=subprocess.PIPE + ) + srun_cmd, err = p.communicate() + srun_cmd = srun_cmd.decode() + + if err is not None: + raise RuntimeError("failed to capture arkouda srun command: ", err) + + # remove and capture the '--constraint=' argument if present + constraint_setting = None + m = re.search(r"--constraint=[\w,]*\s", srun_cmd) + if m is not None: + constraint_setting = srun_cmd[m.start() : m.end()] + srun_cmd = srun_cmd[: m.start()] + srun_cmd[m.end() + 1 :] + + # extract evironment variable settings specified in the command + # and include them in the executing environment + env = os.environ.copy() + max_env_idx = 0 + for match in re.finditer(r"([A-Z_]+)=(\S+)", srun_cmd): + max_env_idx = max(max_env_idx, match.end()) + env.update({match.group(1): match.group(2)}) + + # remove the environment variables from the command string + srun_cmd = srun_cmd[max_env_idx:] + + return (srun_cmd, env, constraint_setting) + + +def start_arkouda_server( + numlocales, + trace=False, + port=5555, + host=None, + server_args=None, + within_slurm_alloc=False, +): """ Start the Arkouda server and wait for it to start running. Connection info is written to `get_arkouda_server_info_file()`. @@ -191,6 +236,8 @@ def start_arkouda_server(numlocales, trace=False, port=5555, host=None, server_a :param int port: the desired arkouda_server port, defaults to 5555 :param str host: the desired arkouda_server host, defaults to None :param list server_args: additional arguments to pass to the server + :param within_slurm_alloc: whether the current script is running within a slurm allocation. + in which case, special care needs to be taken when launching the server. :return: tuple containing server host, port, and process :rtype: ServerInfo(host, port, process) """ @@ -198,8 +245,14 @@ def start_arkouda_server(numlocales, trace=False, port=5555, host=None, server_a with contextlib.suppress(FileNotFoundError): os.remove(connection_file) - cmd = [ - get_arkouda_server(), + if within_slurm_alloc: + raw_server_cmd, env, _ = get_server_launch_cmd(numlocales) + raw_server_cmd = raw_server_cmd.strip().strip().split(" ") + else: + raw_server_cmd = [get_arkouda_server(),] + env = None + + cmd = raw_server_cmd + [ "--trace={}".format("true" if trace else "false"), "--serverConnectionInfo={}".format(connection_file), "-nl {}".format(numlocales), @@ -209,7 +262,7 @@ def start_arkouda_server(numlocales, trace=False, port=5555, host=None, server_a cmd += server_args logging.info('Starting "{}"'.format(cmd)) - process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL) + process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, env=env) atexit.register(kill_server, process) if not host: