From eedbfd63bd490ac90c721a7a63f2129f40b0443d Mon Sep 17 00:00:00 2001 From: Peter Kraus Date: Sun, 14 Apr 2024 14:30:47 +0200 Subject: [PATCH] Remove `xfail` from tests. (#76) * unxfail ketchup_cancel * Adjust sleep in job.py * Debugging & logging * More tests. 02 should work. * Final un-xfail * Tighten up test_02 * argh black * wait until pickle in 02 * Buffer in running jobs * Wait until pickle in ketchup_cancel * Need some debug there. * Shunt writing into manager. * Thread has no exitcode. * Lint * snapshot uses job.jobpath * lazy pirate for job-process * Ugh platforms --- src/tomato/daemon/__init__.py | 2 +- src/tomato/daemon/cmd.py | 3 +- src/tomato/daemon/driver.py | 8 +- src/tomato/daemon/job.py | 163 ++++++++++-------- src/tomato/ketchup/__init__.py | 3 +- src/tomato/models.py | 3 + tests/common/counter_20_5.yml | 10 ++ ...{counter_15_0.1.yml => counter_60_0.1.yml} | 4 +- tests/test_02_ketchup.py | 28 ++- tests/test_03_state.py | 52 ++---- tests/test_99_example_counter.py | 4 +- tests/utils.py | 34 +++- 12 files changed, 184 insertions(+), 130 deletions(-) create mode 100644 tests/common/counter_20_5.yml rename tests/common/{counter_15_0.1.yml => counter_60_0.1.yml} (75%) diff --git a/src/tomato/daemon/__init__.py b/src/tomato/daemon/__init__.py index 1c22dffc..4fdf9b05 100644 --- a/src/tomato/daemon/__init__.py +++ b/src/tomato/daemon/__init__.py @@ -79,7 +79,7 @@ def tomato_daemon(): dmgr.start() t0 = time.process_time() while True: - socks = dict(poller.poll(100)) + socks = dict(poller.poll(1000)) if rep in socks: msg = rep.recv_pyobj() logger.debug(f"received {msg=}") diff --git a/src/tomato/daemon/cmd.py b/src/tomato/daemon/cmd.py index e4e99bff..9e3c2188 100644 --- a/src/tomato/daemon/cmd.py +++ b/src/tomato/daemon/cmd.py @@ -13,7 +13,6 @@ """ from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job -from copy import deepcopy import logging import tomato.daemon.io as io @@ -46,7 +45,7 @@ def merge_pipelines( def status(msg: dict, daemon: Daemon) -> Reply: if msg.get("with_data", False): - return Reply(success=True, msg=daemon.status, data=deepcopy(daemon)) + return Reply(success=True, msg=daemon.status, data=daemon) else: return Reply(success=True, msg=daemon.status) diff --git a/src/tomato/daemon/driver.py b/src/tomato/daemon/driver.py index 920fdc13..b80ff0ca 100644 --- a/src/tomato/daemon/driver.py +++ b/src/tomato/daemon/driver.py @@ -240,24 +240,30 @@ def manager(port: int, timeout: int = 1000): to = timeout daemon = req.recv_pyobj().data drivers_needed = {v.driver for v in daemon.devs.values()} + action_counter = 0 for driver in drivers_needed: if driver not in daemon.drvs: logger.debug(f"spawning driver {driver!r}") spawn_tomato_driver(daemon.port, driver, req) + action_counter += 1 else: drv = daemon.drvs[driver] if drv.pid is not None and not psutil.pid_exists(drv.pid): logger.warning(f"respawning crashed driver {driver!r}") spawn_tomato_driver(daemon.port, driver, req) + action_counter += 1 elif drv.pid is None and drv.spawned_at is None: logger.debug(f"spawning driver {driver!r}") spawn_tomato_driver(daemon.port, driver, req) + action_counter += 1 elif drv.pid is None: tspawn = datetime.fromisoformat(drv.spawned_at) if (datetime.now(timezone.utc) - tspawn).seconds > 10: logger.warning(f"respawning late driver {driver!r}") spawn_tomato_driver(daemon.port, driver, req) - time.sleep(timeout / 1e3) + action_counter += 1 + logger.debug("tick") + time.sleep(1 if action_counter > 0 else 0.1) logger.info("instructed to quit") req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager")) diff --git a/src/tomato/daemon/job.py b/src/tomato/daemon/job.py index bc8cd9f2..231a8bcd 100644 --- a/src/tomato/daemon/job.py +++ b/src/tomato/daemon/job.py @@ -21,9 +21,8 @@ from importlib import metadata from datetime import datetime, timezone from pathlib import Path -from threading import currentThread -from multiprocessing import Process - +from threading import currentThread, Thread +from typing import Any import zmq import psutil @@ -99,7 +98,6 @@ def manage_running_pips(daemon: Daemon, req): logger.debug(f"{running=}") for pip in running: job = daemon.jobs[pip.jobid] - logger.debug(f"{job=}") if job.pid is None: continue pidexists = psutil.pid_exists(job.pid) @@ -111,6 +109,7 @@ def manage_running_pips(daemon: Daemon, req): proc = psutil.Process(pid=job.pid) kill_tomato_job(proc) logger.info(f"job {job.id} with pid {job.pid} was terminated successfully") + merge_netcdfs(Path(job.jobpath), Path(job.respath)) reset = True params = dict(status="cd") # dead jobs marked as running (status == 'r') should be cleared @@ -244,6 +243,33 @@ def manager(port: int, timeout: int = 500): logger.info("instructed to quit") +def lazy_pirate( + pyobj: Any, retries: int, timeout: int, address: str, context: zmq.Context +) -> Any: + logger.debug("Here") + req = context.socket(zmq.REQ) + req.connect(address) + poller = zmq.Poller() + poller.register(req, zmq.POLLIN) + for _ in range(retries): + req.send_pyobj(pyobj) + events = dict(poller.poll(timeout)) + if req not in events: + logger.warning(f"could not contact tomato-daemon in {timeout/1000} s") + req.setsockopt(zmq.LINGER, 0) + req.close() + poller.unregister(req) + req = context.socket(zmq.REQ) + req.connect(address) + poller.register(req, zmq.POLLIN) + else: + break + else: + logger.error(f"number of connection retries exceeded: {retries}") + raise RuntimeError(f"Number of connection retries exceeded: {retries}") + return req.recv_pyobj() + + def tomato_job() -> None: """ The function called when `tomato-job` is executed. @@ -271,6 +297,12 @@ def tomato_job() -> None: default=1000, type=int, ) + parser.add_argument( + "--retries", + help="Number of retries for driver actions.", + default=10, + type=int, + ) parser.add_argument( "jobfile", type=Path, @@ -286,11 +318,11 @@ def tomato_job() -> None: jobid = jsdata["job"]["id"] jobpath = Path(jsdata["job"]["path"]).resolve() - logfile = jobpath / f"job-{jobid}.log" + logpath = jobpath / f"job-{jobid}.log" logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)8s - %(name)-30s - %(message)s", - handlers=[logging.FileHandler(logfile, mode="a")], + handlers=[logging.FileHandler(logpath, mode="a")], ) logger = logging.getLogger(__name__) @@ -309,98 +341,84 @@ def tomato_job() -> None: context = zmq.Context() req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{args.port}") - poller = zmq.Poller() - poller.register(req, zmq.POLLIN) + + pkwargs = dict( + address=f"tcp://127.0.0.1:{args.port}", + retries=args.retries, + timeout=args.timeout, + context=context, + ) params = dict(pid=pid, status="r", executed_at=str(datetime.now(timezone.utc))) - req.send_pyobj(dict(cmd="job", id=jobid, params=params)) - events = dict(poller.poll(args.timeout)) - if req in events: - req.recv_pyobj() - else: - logger.warning(f"could not contact tomato-daemon in {args.timeout/1000} s") + lazy_pirate(pyobj=dict(cmd="job", id=jobid, params=params), **pkwargs) output = tomato["output"] - prefix = f"results.{jobid}" if output["prefix"] is None else output["prefix"] outpath = Path(output["path"]) - snappath = outpath / f"snapshot.{jobid}.nc" logger.debug(f"output folder is {outpath}") if outpath.exists(): assert outpath.is_dir() else: logger.debug("path does not exist, creating") os.makedirs(outpath) + prefix = f"results.{jobid}" if output["prefix"] is None else output["prefix"] + respath = outpath / f"{prefix}.nc" + snappath = outpath / f"snapshot.{jobid}.nc" + params = dict(respath=str(respath), snappath=str(snappath), jobpath=str(jobpath)) + lazy_pirate(pyobj=dict(cmd="job", id=jobid, params=params), **pkwargs) logger.info("handing off to 'job_main_loop'") logger.info("==============================") - ret = job_main_loop(context, args.port, payload, pip, jobpath, snappath) + job_main_loop(context, args.port, payload, pip, jobpath, snappath, logpath) logger.info("==============================") - merge_netcdfs(jobpath, outpath / f"{prefix}.nc") + merge_netcdfs(jobpath, respath) + + logger.info("job finished successfully, attempting to set status to 'c'") + params = dict(status="c", completed_at=str(datetime.now(timezone.utc))) + ret = lazy_pirate(pyobj=dict(cmd="job", id=jobid, params=params), **pkwargs) + logger.debug(f"{ret=}") + if ret.success is False: + logger.error("could not set job status for unknown reason") + return 1 - if ret is None: - logger.info("job finished successfully, attempting to set status to 'c'") - params = dict(status="c", completed_at=str(datetime.now(timezone.utc))) - req.send_pyobj(dict(cmd="job", id=jobid, params=params)) - events = dict(poller.poll(args.timeout)) - if req not in events: - logger.warning(f"could not contact tomato-daemon in {args.timeout/1000} s") - req.setsockopt(zmq.LINGER, 0) - req.close() - poller.unregister(req) - req = context.socket(zmq.REQ) - req.connect(f"tcp://127.0.0.1:{args.port}") - else: - ret = req.recv_pyobj() - logger.debug(f"{ret=}") - if ret.success is False: - logger.error("could not set job status for unknown reason") - return 1 - else: - logger.info("job was terminated, status should be 'cd'") - logger.info("handing off to 'driver_reset'") - logger.info("==============================") - # driver_reset(pip) - logger.info("==============================") - ready = False logger.info(f"resetting pipeline {pip!r}") params = dict(jobid=None, ready=ready, name=pip) - req.send_pyobj(dict(cmd="pipeline", params=params)) - events = dict(poller.poll(args.timeout)) - if req in events: - ret = req.recv_pyobj() - logger.debug(f"{ret=}") - if not ret.success: - logger.error(f"could not reset pipeline {pip!r}") - return 1 - else: - logger.error(f"could not contact tomato-daemon in {args.timeout/1000} s") + ret = lazy_pirate(pyobj=dict(cmd="pipeline", params=params), **pkwargs) + logger.debug(f"{ret=}") + if not ret.success: + logger.error(f"could not reset pipeline {pip!r}") return 1 logger.info("exiting tomato-job") -def job_process( +def job_thread( tasks: list, component: Component, device: Device, driver: Driver, jobpath: Path, + logpath: Path, ): """ - Child process of `tomato-job`, responsible for tasks on one Component of a Pipeline. + A subthread of `tomato-job`, responsible for tasks on one Component of a Pipeline. For each task in tasks, starts the task, then monitors the Component status and polls for data, and moves on to the next task as instructed in the payload. Stores the data for that Component as a `pickle` of a :class:`xr.Dataset`. """ - sender = f"{__name__}.job_process" + sender = f"{__name__}.job_thread" + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)8s - %(name)-30s - %(message)s", + handlers=[logging.FileHandler(logpath, mode="a")], + ) logger = logging.getLogger(sender) - logger.debug(f"in job process of {component.role!r}") + logger.debug(f"in job thread of {component.role!r}") context = zmq.Context() req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{driver.port}") - logger.debug(f"job process of {component.role!r} connected to tomato-daemon") + logger.debug(f"job thread of {component.role!r} connected to tomato-daemon") kwargs = dict(address=component.address, channel=component.channel) @@ -433,7 +451,8 @@ def job_process( logger.debug(f"{ret=}") if ret.success and ret.msg == "ready": break - time.sleep(device.pollrate / 10) + time.sleep(device.pollrate - (tN - t0)) + logger.debug("tock") req.send_pyobj(dict(cmd="task_data", params={**kwargs})) ret = req.recv_pyobj() if ret.success: @@ -447,12 +466,14 @@ def job_main_loop( pipname: str, jobpath: Path, snappath: Path, + logpath: Path, ) -> None: """ The main loop function of `tomato-job`, split for better readability. """ - sender = f"{__name__}.job_worker" + sender = f"{__name__}.job_main_loop" logger = logging.getLogger(sender) + logger.debug("process started") req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{port}") @@ -481,7 +502,7 @@ def job_main_loop( logger.debug(f"{plan=}") # distribute plan into threads - processes = {} + threads = {} for role, tasks in plan.items(): component = pipeline.devs[role] logger.debug(f"{component=}") @@ -489,28 +510,26 @@ def job_main_loop( logger.debug(f"{device=}") driver = daemon.drvs[device.driver] logger.debug(f"{driver=}") - processes[role] = Process( - target=job_process, - args=(tasks, component, device, driver, jobpath), - name="job-process", + threads[role] = Thread( + target=job_thread, + args=(tasks, component, device, driver, jobpath, logpath), + name="job-thread", ) - processes[role].start() + threads[role].start() # wait until threads join or we're killed snapshot = payload["tomato"].get("snapshot", None) t0 = time.perf_counter() while True: + logger.debug("tick") tN = time.perf_counter() if snapshot is not None and tN - t0 > snapshot["frequency"]: logger.debug("creating snapshot") merge_netcdfs(jobpath, snappath) t0 += snapshot["frequency"] - joined = [proc.is_alive() is False for proc in processes.values()] + joined = [proc.is_alive() is False for proc in threads.values()] if all(joined): break else: - time.sleep(1) - logger.debug(f"{[proc.exitcode for proc in processes.values()]}") - for proc in processes.values(): - if proc.exitcode != 0: - return proc.exitcode + # We'd like to execute this loop exactly once every second + time.sleep(1.0 - tN % 1) diff --git a/src/tomato/ketchup/__init__.py b/src/tomato/ketchup/__init__.py index d273c1a4..912bf4d4 100644 --- a/src/tomato/ketchup/__init__.py +++ b/src/tomato/ketchup/__init__.py @@ -309,9 +309,8 @@ def snapshot( if jobs[jobid].status in {"q", "qw"}: return Reply(success=False, msg=f"job {jobid} is still queued") - jobdir = Path(status.data.settings["jobs"]["storage"]) for jobid in jobids: - merge_netcdfs(jobdir / str(jobid), Path(f"snapshot.{jobid}.nc")) + merge_netcdfs(Path(jobs[jobid].jobpath), Path(f"snapshot.{jobid}.nc")) return Reply(success=True, msg=f"snapshot for job(s) {jobids} created successfully") diff --git a/src/tomato/models.py b/src/tomato/models.py index e0c5676b..e27d99cb 100644 --- a/src/tomato/models.py +++ b/src/tomato/models.py @@ -45,6 +45,9 @@ class Job(BaseModel): submitted_at: Optional[str] = None executed_at: Optional[str] = None completed_at: Optional[str] = None + jobpath: Optional[str] = None + respath: Optional[str] = None + snappath: Optional[str] = None class Daemon(BaseModel, arbitrary_types_allowed=True): diff --git a/tests/common/counter_20_5.yml b/tests/common/counter_20_5.yml new file mode 100644 index 00000000..725abf07 --- /dev/null +++ b/tests/common/counter_20_5.yml @@ -0,0 +1,10 @@ +version: "0.2" +sample: + name: counter_20_5 +method: + - device: "counter" + technique: "count" + time: 20.0 + delay: 5.0 +tomato: + verbosity: "DEBUG" \ No newline at end of file diff --git a/tests/common/counter_15_0.1.yml b/tests/common/counter_60_0.1.yml similarity index 75% rename from tests/common/counter_15_0.1.yml rename to tests/common/counter_60_0.1.yml index 7de630d0..111e04d9 100644 --- a/tests/common/counter_15_0.1.yml +++ b/tests/common/counter_60_0.1.yml @@ -1,10 +1,10 @@ version: "0.2" sample: - name: counter_15_0.1 + name: counter_60_0.1 method: - device: "counter" technique: "count" - time: 15.0 + time: 60.0 delay: 0.1 tomato: verbosity: "DEBUG" \ No newline at end of file diff --git a/tests/test_02_ketchup.py b/tests/test_02_ketchup.py index 7e184479..794d6ca9 100644 --- a/tests/test_02_ketchup.py +++ b/tests/test_02_ketchup.py @@ -3,7 +3,11 @@ import zmq from tomato import ketchup, tomato -from .utils import wait_until_tomato_running, wait_until_ketchup_status +from .utils import ( + wait_until_tomato_running, + wait_until_ketchup_status, + wait_until_pickle, +) PORT = 12345 CTXT = zmq.Context() @@ -115,7 +119,7 @@ def test_ketchup_status_complete(pl, datadir, start_tomato_daemon, stop_tomato_d test_ketchup_submit_one(f"{pl}.yml", None, *args) tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid=pl) tomato.pipeline_ready(**kwargs, pipeline="pip-counter") - wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=10000) + assert wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=5000) status = tomato.status(**kwargs, with_data=True) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") @@ -124,11 +128,10 @@ def test_ketchup_status_complete(pl, datadir, start_tomato_daemon, stop_tomato_d assert os.path.exists("results.1.nc") -@pytest.mark.xfail(strict=False) @pytest.mark.parametrize( "pl", [ - "counter_15_0.1", + "counter_60_0.1", ], ) def test_ketchup_cancel(pl, datadir, start_tomato_daemon, stop_tomato_daemon): @@ -136,25 +139,30 @@ def test_ketchup_cancel(pl, datadir, start_tomato_daemon, stop_tomato_daemon): test_ketchup_submit_one(f"{pl}.yml", None, *args) tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid=pl) tomato.pipeline_ready(**kwargs, pipeline="pip-counter") - wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) + assert wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) + + assert wait_until_pickle(jobid=1, timeout=2000) status = tomato.status(**kwargs, with_data=True) ret = ketchup.cancel(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") assert ret.success assert ret.data[1].status == "rd" - wait_until_ketchup_status(jobid=1, status="cd", port=PORT, timeout=5000) + + assert wait_until_ketchup_status(jobid=1, status="cd", port=PORT, timeout=5000) status = tomato.status(**kwargs, with_data=True) ret = ketchup.status(**kwargs, status=status, verbosity=0, jobids=[1]) print(f"{ret=}") + print(f"{os.listdir()=}") + print(f"{os.listdir('Jobs')=}") + print(f"{os.listdir(os.path.join('Jobs', '1'))=}") assert ret.data[1].status == "cd" assert os.path.exists("results.1.nc") -@pytest.mark.xfail(strict=False) @pytest.mark.parametrize( "pl", [ - "counter_15_0.1", + "counter_60_0.1", ], ) def test_ketchup_snapshot(pl, datadir, start_tomato_daemon, stop_tomato_daemon): @@ -162,7 +170,9 @@ def test_ketchup_snapshot(pl, datadir, start_tomato_daemon, stop_tomato_daemon): test_ketchup_submit_one(f"{pl}.yml", None, *args) tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid=pl) tomato.pipeline_ready(**kwargs, pipeline="pip-counter") - wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) + assert wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) + + assert wait_until_pickle(jobid=1, timeout=2000) status = tomato.status(**kwargs, with_data=True) ret = ketchup.snapshot(jobids=[1], status=status) print(f"{ret=}") diff --git a/tests/test_03_state.py b/tests/test_03_state.py index e78314be..7659a0af 100644 --- a/tests/test_03_state.py +++ b/tests/test_03_state.py @@ -9,6 +9,7 @@ wait_until_tomato_running, wait_until_tomato_stopped, wait_until_ketchup_status, + kill_tomato_daemon, ) PORT = 12345 @@ -53,20 +54,13 @@ def test_stop_with_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon def test_recover_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): assert wait_until_tomato_running(port=PORT, timeout=WAIT) os.chdir(datadir) - ketchup.submit(payload="counter_15_0.1.yml", jobname="job-1", **kwargs) - tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid="counter_15_0.1") + ketchup.submit(payload="counter_20_5.yml", jobname="job-1", **kwargs) + tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid="counter_20_5") tomato.pipeline_ready(**kwargs, pipeline="pip-counter") - wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) + assert wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) ret = tomato.stop(**kwargs) - procs = [] - for p in psutil.process_iter(["name", "cmdline"]): - if "tomato-daemon" in p.info["name"] and f"{PORT}" in p.info["cmdline"]: - p.terminate() - procs.append(p) - gone, alive = psutil.wait_procs(procs, timeout=3) - print(f"{gone=}") - print(f"{alive=}") + kill_tomato_daemon(port=PORT) assert os.path.exists("tomato_state_12345.pkl") ret = tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) @@ -79,7 +73,7 @@ def test_recover_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): assert ret.data.nextjob == 2 assert ret.data.jobs[1].status == "r" - wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=15000) + assert wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=25000) ret = tomato.status(**kwargs, with_data=True) print(f"{ret=}") assert ret.success @@ -94,28 +88,21 @@ def test_recover_waiting_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): ketchup.submit(payload="counter_5_0.2.yml", jobname="job-1", **kwargs) tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid="counter_5_0.2") tomato.pipeline_ready(**kwargs, pipeline="pip-counter") - wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) + assert wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) ret = tomato.stop(**kwargs) - procs = [] - for p in psutil.process_iter(["name", "cmdline"]): - if "tomato-daemon" in p.info["name"] and f"{PORT}" in p.info["cmdline"]: - p.terminate() - procs.append(p) - gone, alive = psutil.wait_procs(procs, timeout=3) - print(f"{gone=}") - print(f"{alive=}") - - time.sleep(10) + kill_tomato_daemon(port=PORT) + + time.sleep(5) tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) assert wait_until_tomato_running(port=PORT, timeout=WAIT) - wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=5000) + assert wait_until_ketchup_status(jobid=1, status="c", port=PORT, timeout=5000) ret = tomato.status(**kwargs, with_data=True) print(f"{ret=}") assert ret.success assert len(ret.data.jobs) == 1 assert ret.data.nextjob == 2 - assert ret.data.jobs[1].status == "ce" + assert ret.data.jobs[1].status == "c" assert ret.data.pips["pip-counter"].jobid is None assert ret.data.pips["pip-counter"].sampleid == "counter_5_0.2" @@ -123,22 +110,15 @@ def test_recover_waiting_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): def test_recover_crashed_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): assert wait_until_tomato_running(port=PORT, timeout=WAIT) os.chdir(datadir) - ketchup.submit(payload="counter_15_0.1.yml", jobname="job-1", **kwargs) - tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid="counter_15_0.1") + ketchup.submit(payload="counter_20_5.yml", jobname="job-1", **kwargs) + tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid="counter_20_5") tomato.pipeline_ready(**kwargs, pipeline="pip-counter") wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) ret = tomato.status(**kwargs, with_data=True) print(f"{ret=}") ret = tomato.stop(**kwargs) - procs = [] - for p in psutil.process_iter(["name", "cmdline"]): - if "tomato-daemon" in p.info["name"] and f"{PORT}" in p.info["cmdline"]: - p.terminate() - procs.append(p) - gone, alive = psutil.wait_procs(procs, timeout=3) - print(f"{gone=}") - print(f"{alive=}") + kill_tomato_daemon(port=PORT) proc = psutil.Process(pid=ret.data.jobs[1].pid) proc.terminate() @@ -153,4 +133,4 @@ def test_recover_crashed_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): assert ret.data.nextjob == 2 assert ret.data.jobs[1].status == "ce" assert ret.data.pips["pip-counter"].jobid is None - assert ret.data.pips["pip-counter"].sampleid == "counter_15_0.1" + assert ret.data.pips["pip-counter"].sampleid == "counter_20_5" diff --git a/tests/test_99_example_counter.py b/tests/test_99_example_counter.py index b9e71834..c8eed3e9 100644 --- a/tests/test_99_example_counter.py +++ b/tests/test_99_example_counter.py @@ -44,7 +44,7 @@ def test_counter_npoints( @pytest.mark.parametrize( "casename", [ - "counter_15_0.1", + "counter_60_0.1", ], ) def test_counter_cancel(casename, datadir, start_tomato_daemon, stop_tomato_daemon): @@ -67,7 +67,7 @@ def test_counter_cancel(casename, datadir, start_tomato_daemon, stop_tomato_daem @pytest.mark.parametrize( "casename, external", [ - ("counter_15_0.1", True), + ("counter_60_0.1", True), ("counter_snapshot", False), ], ) diff --git a/tests/utils.py b/tests/utils.py index 98dd410f..e225a77e 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,8 @@ import time import yaml import logging +import os +import psutil logger = logging.getLogger(__name__) @@ -40,7 +42,7 @@ def wait_until_tomato_running(port: int, timeout: int): data = yaml.safe_load(ret.stdout) if data["success"]: return True - time.sleep(timeout / 5000) + time.sleep(0.5) return False @@ -55,7 +57,7 @@ def wait_until_tomato_stopped(port: int, timeout: int): data = yaml.safe_load(ret.stdout) if not data["success"]: return True - time.sleep(timeout / 5000) + time.sleep(0.5) return False @@ -70,5 +72,31 @@ def wait_until_ketchup_status(jobid: int, status: str, port: int, timeout: int): data = yaml.safe_load(ret.stdout)["data"] if data[jobid]["status"] == status: return True - time.sleep(timeout / 5000) + time.sleep(0.5) return False + + +def wait_until_pickle(jobid: int, timeout: int): + t0 = time.perf_counter() + while (time.perf_counter() - t0) < (timeout / 1000): + files = os.listdir(os.path.join(os.getcwd(), "Jobs", f"{jobid}")) + for file in files: + if file.endswith(".pkl"): + return True + time.sleep(0.5) + return False + + +def kill_tomato_daemon(port: int = 12345): + procs = [] + for p in psutil.process_iter(["name", "cmdline"]): + if "tomato-daemon" in p.info["name"] and f"{port}" in p.info["cmdline"]: + for pc in p.children(): + if psutil.WINDOWS: + pc.terminate() + procs.append(p) + p.terminate() + procs.append(p) + gone, alive = psutil.wait_procs(procs, timeout=3) + print(f"{gone=}") + print(f"{alive=}")