Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DriverInterface class #86

Merged
merged 17 commits into from
Jul 17, 2024
Merged
1 change: 0 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ sustainable batteries of the future.
:maxdepth: 1
:caption: tomato driver library

apidoc/tomato.drivers.example_counter

.. toctree::
:maxdepth: 1
Expand Down
2 changes: 1 addition & 1 deletion docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,4 @@ The *payload* file contains all information required to enter a *job* into the q
allow its assignment onto a *pipeline*. The overall schema of the *payload* is defined
in the :mod:`dgbowl_schemas.tomato` module, and is parsed using :func:`dgbowl_schemas.tomato.to_payload`:

.. autopydantic_model:: dgbowl_schemas.tomato.payload_0_2.Payload
.. autopydantic_model:: dgbowl_schemas.tomato.payload_1_0.Payload
19 changes: 14 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@ dependencies = [
"toml >= 0.10",
"pyyaml >= 6.0",
"psutil >= 5.9",
"dgbowl_schemas >= 108",
"dgbowl_schemas @ git+https://github.com/dgbowl/dgbowl-schemas.git@Payload_1.0",
"pyzmq >= 25.1",
"h5netcdf >= 1.3",
"xarray >= 2024.2",
"pydantic ~= 1.0",
"pydantic >= 2.0",

]

[project.optional-dependencies]
testing = ["pytest"]
testing = [
"pytest",
"tomato-example-counter @ git+https://github.com/dgbowl/tomato-example-counter.git",
"tomato-psutil @ git+https://github.com/dgbowl/tomato-psutil.git",
]
docs = [
"sphinx ~= 7.2",
"sphinx-rtd-theme ~= 1.3.0",
"autodoc-pydantic ~= 1.9.0",
"autodoc-pydantic ~= 2.1",
"sphinxcontrib-mermaid ~= 0.9.2",
]

Expand All @@ -62,4 +67,8 @@ enabled = true
dev_template = "{tag}.dev{ccount}"
dirty_template = "{tag}.dev{ccount}"

[tool.ruff]
[tool.ruff]

[tool.pytest.ini_options]
log_cli = false
log_cli_level = "DEBUG"
53 changes: 22 additions & 31 deletions src/tomato/daemon/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import zmq
import psutil

import tomato.drivers
from tomato.driverinterface_1_0 import ModelInterface
from tomato.drivers import driver_to_interface
from tomato.models import Reply

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,7 +57,7 @@ def tomato_driver() -> None:
parser.add_argument(
"--verbosity",
help="Verbosity of the tomato-driver.",
default=logging.INFO,
default=logging.DEBUG,
type=int,
)
parser.add_argument(
Expand Down Expand Up @@ -96,19 +97,18 @@ def tomato_driver() -> None:
logger.debug(f"{daemon=}")

logger.info(f"attempting to spawn driver {args.driver!r}")
if not hasattr(tomato.drivers, args.driver):
Interface = driver_to_interface(args.driver)
if Interface is None:
logger.critical(f"library of driver {args.driver!r} not found")
return

kwargs = dict(settings=daemon.drvs[args.driver].settings)
driver = getattr(tomato.drivers, args.driver).Driver(**kwargs)
interface: ModelInterface = Interface(settings=daemon.drvs[args.driver].settings)

logger.info(f"registering devices in driver {args.driver!r}")
for dev in daemon.devs.values():
if dev.driver == args.driver:
for channel in dev.channels:
driver.dev_register(address=dev.address, channel=channel)
logger.debug(f"{driver.devmap=}")
interface.dev_register(address=dev.address, channel=channel)
logger.debug(f"{interface.devmap=}")

logger.info(f"driver {args.driver!r} bootstrapped successfully")

Expand All @@ -117,7 +117,7 @@ def tomato_driver() -> None:
port=port,
pid=pid,
connected_at=str(datetime.now(timezone.utc)),
settings=driver.settings,
settings=interface.settings,
)
req.send_pyobj(
dict(cmd="driver", params=params, sender=f"{__name__}.tomato_driver")
Expand All @@ -128,7 +128,7 @@ def tomato_driver() -> None:
logger.debug(f"{ret=}")
return

logger.info(f"driver {args.driver!r} is entering main loop")
logger.info("driver '%s' is entering main loop", args.driver)

poller = zmq.Poller()
poller.register(rep, zmq.POLLIN)
Expand All @@ -137,7 +137,7 @@ def tomato_driver() -> None:
socks = dict(poller.poll(100))
if rep in socks:
msg = rep.recv_pyobj()
logger.debug(f"received {msg=}")
logger.debug("received msg=%s", msg)
if "cmd" not in msg:
logger.error(f"received msg without cmd: {msg=}")
ret = Reply(success=False, msg="received msg without cmd", data=msg)
Expand All @@ -155,40 +155,32 @@ def tomato_driver() -> None:
data=dict(status=status, driver=args.driver),
)
elif msg["cmd"] == "settings":
driver.settings = msg["params"]
params["settings"] = driver.settings
interface.settings = msg["params"]
params["settings"] = interface.settings
ret = Reply(
success=True,
msg="settings received",
data=msg.get("params"),
)
elif msg["cmd"] == "dev_register":
driver.dev_register(**msg["params"])
ret = Reply(
success=True,
msg="device registered",
data=msg.get("params"),
)
elif msg["cmd"] == "task_status":
ret = driver.task_status(**msg["params"])
elif msg["cmd"] == "task_start":
ret = driver.task_start(**msg["params"])
elif msg["cmd"] == "task_data":
ret = driver.task_data(**msg["params"])
logger.debug(f"{ret=}")
elif hasattr(interface, msg["cmd"]):
ret = getattr(interface, msg["cmd"])(**msg["params"])
else:
logger.critical("unknown command: '%s'", msg["cmd"])
logger.debug("replying %s", ret)
rep.send_pyobj(ret)
if status == "stop":
break

logger.info(f"driver {args.driver!r} is beginning teardown")
logger.info(f"driver {args.driver!r} is beginning reset")

driver.teardown()
interface.reset()

logger.critical(f"driver {args.driver!r} is quitting")


def spawn_tomato_driver(port: int, driver: str, req: zmq.Socket, verbosity: int):
cmd = ["tomato-driver", "--port", str(port), "--verbosity", str(verbosity), driver]
# cmd = ["tomato-driver", "--port", str(port), "--verbosity", str(verbosity), driver]
cmd = ["tomato-driver", "--port", str(port), driver]
if psutil.WINDOWS:
cfs = subprocess.CREATE_NO_WINDOW
cfs |= subprocess.CREATE_NEW_PROCESS_GROUP
Expand Down Expand Up @@ -277,7 +269,6 @@ def manager(port: int, timeout: int = 1000):
for driver in daemon.drvs.values():
logger.debug(f"stopping driver {driver.name!r} on port {driver.port}")
ret = stop_tomato_driver(driver.port, context)
logger.debug(f"{ret=}")
if ret.success:
logger.info(f"stopped driver {driver.name!r}")
else:
Expand Down
Loading