From f0b44fa02532cd9d16c312a235074371dd8805ff Mon Sep 17 00:00:00 2001 From: Peter Kraus Date: Tue, 30 Jul 2024 12:29:03 +0200 Subject: [PATCH] Refactor `tomato reload` (#96) * Implement first part of reload. * Fix failing stop test. * more stop job fixes * fix one more test. * More tests. * That's ruff man --- docs/source/usage.rst | 2 +- src/tomato/daemon/cmd.py | 120 ++++++++++++++- src/tomato/daemon/driver.py | 97 ++++++++----- src/tomato/models.py | 2 +- src/tomato/tomato/__init__.py | 142 +++--------------- tests/common/devices_reload_address.json | 19 +++ tests/common/devices_reload_channel.json | 19 +++ tests/common/devices_reload_driver.json | 19 +++ tests/common/devices_reload_pipdel.json | 19 +++ tests/common/devices_reload_pipmod.json | 27 ++++ tests/conftest.py | 5 +- tests/test_01_tomato.py | 40 +---- tests/test_03_state.py | 3 +- tests/test_04_tomato_reload.py | 177 +++++++++++++++++++++++ 14 files changed, 482 insertions(+), 209 deletions(-) create mode 100644 tests/common/devices_reload_address.json create mode 100644 tests/common/devices_reload_channel.json create mode 100644 tests/common/devices_reload_driver.json create mode 100644 tests/common/devices_reload_pipdel.json create mode 100644 tests/common/devices_reload_pipmod.json create mode 100644 tests/test_04_tomato_reload.py diff --git a/docs/source/usage.rst b/docs/source/usage.rst index b58eb64e..40d19804 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -167,7 +167,7 @@ to check the status of and to cancel *jobs* in the queue. *Jobs* submitted to the queue will remain in the queue until a *pipeline* meets all of the following criteria: - - A *pipeline* where all of the ``techniques`` specified in the *payload* are matched + - A *pipeline* where all of the ``tasks`` specified in the *payload* are matched by its ``capabilities`` must exist. Once the :mod:`tomato.daemon` finds such a *pipeline*, the status of the *job* will change to ``qw``. - The matching *pipeline* must contain a *sample* with a ``samplename`` that matches diff --git a/src/tomato/daemon/cmd.py b/src/tomato/daemon/cmd.py index 0b805088..e2c75682 100644 --- a/src/tomato/daemon/cmd.py +++ b/src/tomato/daemon/cmd.py @@ -58,11 +58,11 @@ def stop(msg: dict, daemon: Daemon) -> Reply: io.store(daemon) if any([pip.jobid is not None for pip in daemon.pips.values()]): logger.error("cannot stop tomato-daemon as jobs are running") - return Reply(success=False, msg=daemon.status, data=daemon) + return Reply(success=False, msg="jobs are running", data=daemon.jobs) else: daemon.status = "stop" logger.critical("stopping tomato-daemon") - return Reply(success=True, msg=daemon.status) + return Reply(success=True) def setup(msg: dict, daemon: Daemon) -> Reply: @@ -70,13 +70,123 @@ def setup(msg: dict, daemon: Daemon) -> Reply: logger.debug("%s", msg) if daemon.status == "bootstrap": for key in ["drvs", "devs", "pips", "cmps"]: - if key in msg: - setattr(daemon, key, msg[key]) + setattr(daemon, key, msg[key]) logger.info("setup successful with pipelines: '%s'", daemon.pips.keys()) daemon.status = "running" else: + # First, check that we're not touching anything associated with a running job + check_components = set() + check_devices = set() + check_drivers = set() + for dpip in daemon.pips.values(): + if dpip.jobid is None: + continue + if dpip.name not in msg["pips"]: + return Reply( + success=False, + msg="reload would delete a running pipeline", + data=dpip, + ) + pip = msg["pips"][dpip.name] + if pip.components != dpip.components: + return Reply( + success=False, + msg="reload would modify components of a running pipeline", + data=dpip, + ) + check_components.update(dpip.components) + + for cname in check_components: + dcomp = daemon.cmps[cname] + if cname not in msg["cmps"]: + return Reply( + success=False, + msg="reload would delete a component of a running pipeline", + data=dcomp, + ) + comp = msg["cmps"][cname] + if ( + dcomp.name != comp.name + or dcomp.driver != comp.driver + or dcomp.device != comp.device + or dcomp.address != comp.address + or dcomp.channel != comp.channel + or dcomp.role != comp.role + ): + return Reply( + success=False, + msg="reload would modify a component of a running pipeline", + data=dcomp, + ) + check_devices.add(dcomp.device) + check_drivers.add(dcomp.driver) + + for dname in check_devices: + ddev = daemon.devs[dname] + if dname not in msg["devs"]: + return Reply( + success=False, + msg="reload would delete a device of a component in a running pipeline", + data=ddev, + ) + dev = msg["devs"][dname] + if ( + ddev.name != dev.name + or ddev.driver != dev.driver + or ddev.address != dev.address + or ddev.pollrate != dev.pollrate + or any(ch not in dev.channels for ch in ddev.channels) + ): + return Reply( + success=False, + msg="reload would modify a device of a component in a running pipeline", + data=ddev, + ) + + for dname in check_drivers: + ddrv = daemon.drvs[dname] + if dname not in msg["drvs"]: + return Reply( + success=False, + msg="reload would delete a driver of a device in a running pipeline", + data=ddev, + ) + drv = msg["drvs"][dname] + if ddrv.name != drv.name or ddrv.settings != drv.settings: + return Reply( + success=False, + msg="reload would modify a driver of a device in a running pipeline", + data=ddrv, + ) + + _api_reload(msg["drvs"], daemon.drvs, "driver", ["settings"]) + + _api_reload(msg["pips"], daemon.pips, "pipeline", ["components"]) + + attrlist = ["driver", "device", "address", "channel", "role"] + _api_reload(msg["cmps"], daemon.cmps, "component", attrlist) + + _api_reload(msg["devs"], daemon.devs, "device", ["channels", "pollrate"]) + logger.info("reload successful with pipelines: '%s'", daemon.pips.keys()) - return Reply(success=True, msg=daemon.status, data=daemon) + return Reply(success=True, data=daemon) + + +def _api_reload(mdict: dict, ddict: dict, objname: str, attrlist: list[str]): + for obj in mdict.values(): + if obj.name not in ddict: + logger.debug("adding new %s '%s'", objname, obj.name) + ddict[obj.name] = obj + continue + dobj = ddict[obj.name] + for attr in attrlist: + if getattr(dobj, attr) != getattr(obj, attr): + logger.debug("%s '%s.%s' updated", objname, dobj.name, attr) + setattr(dobj, attr, getattr(obj, attr)) + for dobj in ddict.copy().values(): + if dobj.name not in mdict: + logger.warning("removing unused %s '%s'", objname, dobj.name) + del ddict[dobj.name] def pipeline(msg: dict, daemon: Daemon) -> Reply: diff --git a/src/tomato/daemon/driver.py b/src/tomato/daemon/driver.py index b27b3d95..ea49d33d 100644 --- a/src/tomato/daemon/driver.py +++ b/src/tomato/daemon/driver.py @@ -13,7 +13,7 @@ import argparse from importlib import metadata from datetime import datetime, timezone -from threading import currentThread +from threading import current_thread import zmq import psutil @@ -25,6 +25,28 @@ logger = logging.getLogger(__name__) +def tomato_driver_bootstrap( + req: zmq.Socket, logger: logging.Logger, interface: ModelInterface, driver: str +): + logger.debug("getting daemon status") + req.send_pyobj(dict(cmd="status", with_data=True)) + daemon = req.recv_pyobj().data + drv = daemon.drvs[driver] + interface.settings = drv.settings + + logger.info("registering components for driver '%s'", driver) + for comp in daemon.cmps.values(): + if comp.driver == driver: + logger.info("registering component '%s'", comp.name) + ret = interface.dev_register(address=comp.address, channel=comp.channel) + logger.debug(f"iface {ret=}") + params = dict(name=comp.name, capabilities=ret.data) + req.send_pyobj(dict(cmd="component", params=params)) + ret = req.recv_pyobj() + logger.debug(f"daemon {ret=}") + logger.info("driver '%s' bootstrapped successfully", driver) + + def tomato_driver() -> None: """ The function called when `tomato-driver` is executed. @@ -89,33 +111,14 @@ def tomato_driver() -> None: elif psutil.POSIX: pid = os.getpid() - logger.debug("getting daemon status") - req.send_pyobj( - dict(cmd="status", with_data=True, sender=f"{__name__}.tomato_driver_bootstrap") - ) - daemon = req.recv_pyobj().data - logger.debug(f"{daemon=}") - - logger.info(f"attempting to spawn driver {args.driver!r}") + logger.info("attempting to create Interface for driver '%s'", args.driver) Interface = driver_to_interface(args.driver) if Interface is None: - logger.critical(f"library of driver {args.driver!r} not found") + logger.critical("class DriverInterface driver '%s' not found", args.driver) return - drv = daemon.drvs[args.driver] - interface: ModelInterface = Interface(settings=drv.settings) - - logger.info("registering components for driver '%s'", args.driver) - for comp in daemon.cmps.values(): - if comp.driver == args.driver: - logger.info("registering component '%s'", comp.name) - ret = interface.dev_register(address=comp.address, channel=comp.channel) - logger.debug(f"iface {ret=}") - params = dict(name=comp.name, capabilities=ret.data) - req.send_pyobj(dict(cmd="component", params=params)) - ret = req.recv_pyobj() - logger.debug(f"daemon {ret=}") - logger.info("driver '%s' bootstrapped successfully", args.driver) + interface: ModelInterface = Interface() + tomato_driver_bootstrap(req, logger, interface, args.driver) params = dict( name=args.driver, @@ -152,6 +155,13 @@ def tomato_driver() -> None: msg=f"status of driver {params['name']!r} is {status!r}", data=dict(**params, status=status), ) + elif msg["cmd"] == "register": + tomato_driver_bootstrap(req, logger, interface, args.driver) + ret = Reply( + success=True, + msg="components re-registered successfully", + data=interface.devmap.keys(), + ) elif msg["cmd"] == "stop": status = "stop" ret = Reply( @@ -220,10 +230,10 @@ def manager(port: int, timeout: int = 1000): This manager ensures individual driver processes are (re-)spawned and instructed to quit as necessary. """ - + sender = f"{__name__}.manager" context = zmq.Context() - logger = logging.getLogger(f"{__name__}.manager") - thread = currentThread() + logger = logging.getLogger(sender) + thread = current_thread() logger.info("launched successfully") req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{port}") @@ -232,10 +242,10 @@ def manager(port: int, timeout: int = 1000): to = timeout while getattr(thread, "do_run"): - req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager")) + req.send_pyobj(dict(cmd="status", with_data=True, sender=sender)) events = dict(poller.poll(to)) if req not in events: - logger.warning(f"could not contact tomato-daemon in {to} ms") + logger.warning("could not contact tomato-daemon in %d ms", to) to = to * 2 continue elif to > timeout: @@ -250,24 +260,33 @@ def manager(port: int, timeout: int = 1000): else: drv = daemon.drvs[driver] if drv.pid is not None and not psutil.pid_exists(drv.pid): - logger.warning(f"respawning crashed driver {driver!r}") + logger.warning("respawning crashed driver '%s'", driver) spawn_tomato_driver(daemon.port, driver, req, daemon.verbosity) action_counter += 1 elif drv.pid is None and drv.spawned_at is None: - logger.debug(f"spawning driver {driver!r}") + logger.debug("spawning driver '%s'", driver) spawn_tomato_driver(daemon.port, driver, req, daemon.verbosity) action_counter += 1 - elif drv.pid is None: - tspawn = datetime.fromisoformat(drv.spawned_at) - if (datetime.now(timezone.utc) - tspawn).seconds > 10: - logger.warning(f"respawning late driver {driver!r}") - spawn_tomato_driver(daemon.port, driver, req, daemon.verbosity) - action_counter += 1 - logger.debug("tick") + if action_counter == 0: + contact_drivers = set() + for comp in daemon.cmps.values(): + if comp.capabilities is None: + contact_drivers.add(comp.driver) + for driver in contact_drivers: + drv = daemon.drvs[driver] + if drv.port is None: + continue + logger.debug("contacting driver '%s' to re-register components", driver) + dreq = context.socket(zmq.REQ) + dreq.connect(f"tcp://127.0.0.1:{drv.port}") + dreq.send_pyobj(dict(cmd="register", params=None, sender=sender)) + ret = dreq.recv_pyobj() + logger.debug(f"{ret=}") + dreq.close() time.sleep(1 if action_counter > 0 else 0.1) logger.info("instructed to quit") - req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager")) + req.send_pyobj(dict(cmd="status", with_data=True, sender=sender)) daemon = req.recv_pyobj().data for driver in daemon.drvs.values(): logger.debug("stopping driver '%s' on port %d", driver.name, driver.port) diff --git a/src/tomato/models.py b/src/tomato/models.py index 4166cb07..02585d30 100644 --- a/src/tomato/models.py +++ b/src/tomato/models.py @@ -79,5 +79,5 @@ class Daemon(BaseModel, arbitrary_types_allowed=True): class Reply(BaseModel): success: bool - msg: str + msg: Optional[str] = None data: Optional[Any] = None diff --git a/src/tomato/tomato/__init__.py b/src/tomato/tomato/__init__.py index a38e4cdf..eebb1a22 100644 --- a/src/tomato/tomato/__init__.py +++ b/src/tomato/tomato/__init__.py @@ -27,7 +27,6 @@ from pathlib import Path from datetime import datetime, timezone from importlib import metadata -import time import logging import psutil @@ -250,23 +249,7 @@ def stop( req.connect(f"tcp://127.0.0.1:{port}") req.send_pyobj(dict(cmd="stop")) rep = req.recv_pyobj() - if rep.msg == "stop": - return Reply( - success=True, - msg=f"tomato-daemon on port {port} was instructed to stop", - ) - elif rep.msg == "running": - return Reply( - success=False, - msg=f"tomato-daemon on port {port} cannot stop as jobs are running", - data=rep.data, - ) - else: - return Reply( - success=False, - msg=f"unknown error: {rep.msg}", - data=rep.data, - ) + return rep else: return stat @@ -333,121 +316,42 @@ def reload( devicefile = load_device_file(Path(settings["devices"]["config"])) devs = {dev["name"]: Device(**dev) for dev in devicefile["devices"]} pips, cmps = get_pipelines(devs, devicefile["pipelines"]) + drvs = {dev.driver: Driver(name=dev.driver) for dev in devs.values()} logger.debug(f"{pips=}") logger.debug(f"{cmps=}") - drvs = {dev.driver: Driver(name=dev.driver) for dev in devs.values()} + logger.debug(f"{devs=}") + logger.debug(f"{drvs=}") for drv in drvs.keys(): if drv in settings["drivers"]: drvs[drv].settings.update(settings["drivers"][drv]) - - stat = status(**kwargs, with_data=True) - if not stat.success: - return stat - daemon = stat.data + ret = status(**kwargs) + if not ret.success: + return ret req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{port}") - if daemon.status == "bootstrap": - req.send_pyobj( - dict( - cmd="setup", - settings=settings, - pips=pips, - devs=devs, - drvs=drvs, - cmps=cmps, - sender=f"{__name__}.reload", - ) - ) - rep = req.recv_pyobj() - elif daemon.status == "running": - retries = 0 - while True: - if retries == MAX_RETRIES: - return Reply( - success=False, msg="tomato-drivers are not online", data=daemon - ) - elif any(drv.port is None for drv in daemon.drvs.values()): - retries += 1 - logger.warning("not all tomato-drivers are online yet, waiting") - logger.debug("retry number %d / %d", retries, MAX_RETRIES) - time.sleep(timeout / 1000) - daemon = status(**kwargs, with_data=True).data - else: - break - - # check changes in driver settings - for drv in drvs.values(): - logger.debug(f"{drv=}") - ddrv = daemon.drvs[drv.name] - if drv.settings != ddrv.settings: - ret = _updater(context, ddrv.port, "settings", drv.settings) - if ret.success is False: - return ret - msg = dict(name=drv.name, settings=drv.settings) - ret = _updater(context, port, "driver", msg) - if ret.success is False: - return ret - - # check changes in devices - for dev in devs.values(): - logger.debug(f"{dev=}") - ddev = daemon.devs[dev.name] - if dev.channels != ddev.channels: - for channel in dev.channels: - params = dict( - address=dev.address, - channel=channel, - ) - drv = daemon.drvs[dev.driver] - logger.debug(f"{params=}") - logger.debug(f"{ddev=}") - logger.debug(f"{drv=}") - ret = _updater(context, drv.port, "dev_register", params) - logger.debug(f"{ret=}") - if ret.success is False: - return ret - params = dev.model_dump() - ret = _updater(context, port, "device", params) - if ret.success is False: - return ret - elif dev != ddev.name: - logger.error("updating devices not yet implemented") - for ddev in daemon.devs.values(): - if ddev.name not in devs: - logger.error("removing devices not yet implemented") - # check changes in pipelines - for pip in pips.values(): - logger.debug(f"{pip=}") - if pip.name not in daemon.pips: - logger.debug(f"{daemon.pips=}") - ret = _updater(context, port, "pipeline", pip.model_dump()) - logger.debug(f"{ret=}") - if ret.success is False: - return ret - else: - logger.error("updating pipelines not yet implemented") - for pip in daemon.pips.values(): - if pip.name not in pips: - params = dict(name=pip.name, delete=True) - ret = _updater(context, port, "pipeline", params) - if ret.success is False: - return ret - req.send_pyobj( - dict(cmd="setup", settings=settings, sender=f"{__name__}.reload") + req.send_pyobj( + dict( + cmd="setup", + settings=settings, + pips=pips, + devs=devs, + drvs=drvs, + cmps=cmps, + sender=f"{__name__}.reload", ) - rep = req.recv_pyobj() - - if rep.msg == "running": + ) + ret = req.recv_pyobj() + if ret.success: return Reply( success=True, - msg=f"tomato configured on port {port} with settings from {appdir}", - data=rep.data, + msg=f"tomato on port {port} reloaded with settings from {appdir}", + data=ret.data, ) else: return Reply( success=False, - msg=f"tomato configuration on port {port} failed: {rep.msg}", - data=rep.data, + msg=f"tomato on port {port} could not be reloaded: {ret.msg}", + data=ret.data, ) diff --git a/tests/common/devices_reload_address.json b/tests/common/devices_reload_address.json new file mode 100644 index 00000000..0fbec488 --- /dev/null +++ b/tests/common/devices_reload_address.json @@ -0,0 +1,19 @@ +{ + "devices": [ + { + "name": "dev-counter", + "driver": "example_counter", + "address": "example-addr-new", + "channels": [1], + "pollrate": 1 + } + ], + "pipelines": [ + { + "name": "pip-counter", + "devices": [ + {"role": "counter", "device": "dev-counter", "channel": 1} + ] + } + ] +} \ No newline at end of file diff --git a/tests/common/devices_reload_channel.json b/tests/common/devices_reload_channel.json new file mode 100644 index 00000000..fcf1c1b2 --- /dev/null +++ b/tests/common/devices_reload_channel.json @@ -0,0 +1,19 @@ +{ + "devices": [ + { + "name": "dev-counter", + "driver": "example_counter", + "address": "example-addr", + "channels": [2], + "pollrate": 1 + } + ], + "pipelines": [ + { + "name": "pip-counter", + "devices": [ + {"role": "counter", "device": "dev-counter", "channel": 2} + ] + } + ] +} \ No newline at end of file diff --git a/tests/common/devices_reload_driver.json b/tests/common/devices_reload_driver.json new file mode 100644 index 00000000..627536a2 --- /dev/null +++ b/tests/common/devices_reload_driver.json @@ -0,0 +1,19 @@ +{ + "devices": [ + { + "name": "dev-counter", + "driver": "psutil", + "address": "example-addr", + "channels": [2], + "pollrate": 1 + } + ], + "pipelines": [ + { + "name": "pip-counter", + "devices": [ + {"role": "counter", "device": "dev-counter", "channel": 2} + ] + } + ] +} \ No newline at end of file diff --git a/tests/common/devices_reload_pipdel.json b/tests/common/devices_reload_pipdel.json new file mode 100644 index 00000000..cd96be1b --- /dev/null +++ b/tests/common/devices_reload_pipdel.json @@ -0,0 +1,19 @@ +{ + "devices": [ + { + "name": "dev-counter", + "driver": "example_counter", + "address": "example-addr", + "channels": [1], + "pollrate": 1 + } + ], + "pipelines": [ + { + "name": "pip-counter-new", + "devices": [ + {"role": "counter", "device": "dev-counter", "channel": 1} + ] + } + ] +} \ No newline at end of file diff --git a/tests/common/devices_reload_pipmod.json b/tests/common/devices_reload_pipmod.json new file mode 100644 index 00000000..6abe8fc7 --- /dev/null +++ b/tests/common/devices_reload_pipmod.json @@ -0,0 +1,27 @@ +{ + "devices": [ + { + "name": "dev-counter", + "driver": "example_counter", + "address": "example-addr", + "channels": [1], + "pollrate": 1 + }, + { + "name": "new-counter", + "driver": "example_counter", + "address": "new-addr", + "channels": [1], + "pollrate": 1 + } + ], + "pipelines": [ + { + "name": "pip-counter", + "devices": [ + {"role": "counter", "device": "dev-counter", "channel": 1}, + {"role": "other", "device": "new-counter", "channel": 1} + ] + } + ] +} \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 708d787b..93196c1e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,7 +46,6 @@ def stop_tomato_daemon(port: int = 12345): subprocess.run(["taskkill", "/F", "/T", "/IM", "tomato-daemon.exe"]) subprocess.run(["taskkill", "/F", "/T", "/IM", "tomato-job.exe"]) subprocess.run(["taskkill", "/F", "/T", "/IM", "tomato-driver.exe"]) - else: subprocess.run(["killall", "tomato-daemon"]) subprocess.run(["killall", "tomato-job"]) @@ -64,6 +63,4 @@ def stop_tomato_daemon(port: int = 12345): proc.terminate() except psutil.NoSuchProcess: pass - gone, alive = psutil.wait_procs(procs, timeout=5) - print(f"{gone=}") - print(f"{alive=}") + gone, alive = psutil.wait_procs(procs, timeout=1) diff --git a/tests/test_01_tomato.py b/tests/test_01_tomato.py index 0d3341f2..1ade0fb1 100644 --- a/tests/test_01_tomato.py +++ b/tests/test_01_tomato.py @@ -1,7 +1,5 @@ -import json import os from pathlib import Path -import yaml import zmq import subprocess @@ -57,42 +55,6 @@ def test_tomato_start_double(datadir, stop_tomato_daemon): ) -def test_tomato_reload(datadir, stop_tomato_daemon): - test_tomato_start_with_init(datadir, stop_tomato_daemon) - ret = tomato.status(**kwargs, with_data=True) - assert ret.success - assert len(ret.data.drvs) == 1 - assert ret.data.drvs["example_counter"].settings == {"testpar": 1234} - assert len(ret.data.devs) == 1 - assert len(ret.data.devs["dev-counter"].channels) == 1 - assert len(ret.data.pips) == 1 - - with open("devices_counter.json", "r") as inf: - jsdata = json.load(inf) - with open("devices.yml", "w") as ouf: - yaml.dump(jsdata, ouf) - - ret = tomato.reload(**kwargs, appdir=Path()) - print(f"{ret=}") - assert ret.success - assert len(ret.data.drvs) == 1 - assert ret.data.drvs["example_counter"].settings == {"testpar": 1234} - assert len(ret.data.devs) == 1 - assert len(ret.data.devs["dev-counter"].channels) == 4 - assert len(ret.data.pips) == 4 - - with open("settings.toml", "a") as inf: - inf.write("example_counter.testparb = 1") - ret = tomato.reload(**kwargs, appdir=Path()) - print(f"{ret=}") - assert ret.success - assert len(ret.data.drvs) == 1 - assert ret.data.drvs["example_counter"].settings == {"testpar": 1234, "testparb": 1} - assert len(ret.data.devs) == 1 - assert len(ret.data.devs["dev-counter"].channels) == 4 - assert len(ret.data.pips) == 4 - - def test_tomato_pipeline(datadir, stop_tomato_daemon): test_tomato_start_with_init(datadir, stop_tomato_daemon) ret = tomato.pipeline_load(**kwargs, pipeline="pip-counter", sampleid="test") @@ -186,7 +148,7 @@ def test_tomato_stop(start_tomato_daemon, stop_tomato_daemon): assert wait_until_tomato_running(port=PORT, timeout=5000) ret = tomato.stop(**kwargs) assert ret.success - wait_until_tomato_stopped(port=PORT, timeout=5000) + assert wait_until_tomato_stopped(port=PORT, timeout=5000) assert Path("daemon_12345.log").exists() with Path("daemon_12345.log").open() as logf: diff --git a/tests/test_03_state.py b/tests/test_03_state.py index 7659a0af..18f6b171 100644 --- a/tests/test_03_state.py +++ b/tests/test_03_state.py @@ -116,11 +116,12 @@ def test_recover_crashed_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) ret = tomato.status(**kwargs, with_data=True) print(f"{ret=}") + pid = ret.data.jobs[1].pid ret = tomato.stop(**kwargs) kill_tomato_daemon(port=PORT) - proc = psutil.Process(pid=ret.data.jobs[1].pid) + proc = psutil.Process(pid=pid) proc.terminate() psutil.wait_procs([proc], timeout=3) diff --git a/tests/test_04_tomato_reload.py b/tests/test_04_tomato_reload.py new file mode 100644 index 00000000..add1a890 --- /dev/null +++ b/tests/test_04_tomato_reload.py @@ -0,0 +1,177 @@ +import json +from pathlib import Path +import yaml +import zmq + +from tomato import tomato +from .utils import wait_until_tomato_running, wait_until_ketchup_status, run_casenames + +PORT = 12345 +CTXT = zmq.Context() +timeout = 1000 +kwargs = dict(port=PORT, context=CTXT, timeout=timeout) + + +def test_reload_noop(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=timeout) + ret = tomato.reload(**kwargs, appdir=Path()) + assert ret.success + assert len(ret.data.drvs) == 1 + assert len(ret.data.devs) == 1 + assert len(ret.data.pips) == 1 + assert len(ret.data.cmps) == 1 + + +def test_reload_settings(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=timeout) + + with open("settings.toml", "a") as inf: + inf.write("example_counter.testparb = 1") + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success + assert len(ret.data.drvs) == 1 + assert len(ret.data.devs) == 1 + assert len(ret.data.pips) == 1 + assert len(ret.data.cmps) == 1 + assert ret.data.drvs["example_counter"].settings == {"testpar": 1234, "testparb": 1} + + +def test_reload_cmps_pips(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=timeout) + + with open("devices_counter.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success + assert len(ret.data.drvs) == 1 + assert len(ret.data.devs) == 1 + assert len(ret.data.pips) == 4 + assert len(ret.data.cmps) == 4 + + +def test_reload_devs(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=timeout) + + with open("devices_multidev.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success + assert len(ret.data.drvs) == 1 + assert len(ret.data.devs) == 2 + assert len(ret.data.pips) == 2 + assert len(ret.data.cmps) == 3 + + +def test_reload_drvs(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=timeout) + + # Let's add psutil driver / device + with open("devices_psutil.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success + assert len(ret.data.drvs) == 2 + assert len(ret.data.devs) == 2 + assert len(ret.data.pips) == 1 + assert len(ret.data.cmps) == 2 + + # Let's remove psutil driver / device and modify channels + with open("devices_counter.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success + assert len(ret.data.drvs) == 1 + assert len(ret.data.devs) == 1 + assert len(ret.data.pips) == 4 + assert len(ret.data.cmps) == 4 + + +def test_reload_running(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=timeout) + + run_casenames(["counter_20_5"], [None], ["pip-counter"]) + assert wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=5000) + + # Try modifying settings of a driver in use + with open("settings.toml", "a") as inf: + inf.write("example_counter.testparb = 1") + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success is False + assert "reload would modify a driver of a device in a running pipeline" in ret.msg + + # Revert settings.toml back + with open("settings.toml", "r") as inf: + lines = inf.readlines() + with open("settings.toml", "w") as out: + out.writelines(lines[:-1]) + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success + + # Try modifying device driver + with open("devices_reload_driver.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success is False + assert "reload would modify components of a running pipeline" in ret.msg + + # Try removing channel on device + with open("devices_reload_channel.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success is False + assert "reload would modify components of a running pipeline" in ret.msg + + # Try modifying address on device + with open("devices_reload_address.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success is False + assert "reload would modify components of a running pipeline" in ret.msg + + # Try removing pipeline + with open("devices_reload_pipdel.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success is False + assert "reload would delete a running pipeline" in ret.msg + + # Try modifying pipeline + with open("devices_reload_pipmod.json", "r") as inf: + jsdata = json.load(inf) + with open("devices.yml", "w") as ouf: + yaml.dump(jsdata, ouf) + ret = tomato.reload(**kwargs, appdir=Path()) + print(f"{ret=}") + assert ret.success is False + assert "reload would modify components of a running pipeline" in ret.msg