Skip to content

Commit

Permalink
Remove xfail from tests. (#76)
Browse files Browse the repository at this point in the history
* unxfail ketchup_cancel

* Adjust sleep in job.py

* Debugging & logging

* More tests. 02 should work.

* Final un-xfail

* Tighten up test_02

* argh black

* wait until pickle in 02

* Buffer in running jobs

* Wait until pickle in ketchup_cancel

* Need some debug there.

* Shunt writing into manager.

* Thread has no exitcode.

* Lint

* snapshot uses job.jobpath

* lazy pirate for job-process

* Ugh platforms
  • Loading branch information
PeterKraus authored Apr 14, 2024
1 parent a0bdf71 commit eedbfd6
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 130 deletions.
2 changes: 1 addition & 1 deletion src/tomato/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def tomato_daemon():
dmgr.start()
t0 = time.process_time()
while True:
socks = dict(poller.poll(100))
socks = dict(poller.poll(1000))
if rep in socks:
msg = rep.recv_pyobj()
logger.debug(f"received {msg=}")
Expand Down
3 changes: 1 addition & 2 deletions src/tomato/daemon/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""

from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job
from copy import deepcopy
import logging

import tomato.daemon.io as io
Expand Down Expand Up @@ -46,7 +45,7 @@ def merge_pipelines(

def status(msg: dict, daemon: Daemon) -> Reply:
if msg.get("with_data", False):
return Reply(success=True, msg=daemon.status, data=deepcopy(daemon))
return Reply(success=True, msg=daemon.status, data=daemon)
else:
return Reply(success=True, msg=daemon.status)

Expand Down
8 changes: 7 additions & 1 deletion src/tomato/daemon/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,24 +240,30 @@ def manager(port: int, timeout: int = 1000):
to = timeout
daemon = req.recv_pyobj().data
drivers_needed = {v.driver for v in daemon.devs.values()}
action_counter = 0
for driver in drivers_needed:
if driver not in daemon.drvs:
logger.debug(f"spawning driver {driver!r}")
spawn_tomato_driver(daemon.port, driver, req)
action_counter += 1
else:
drv = daemon.drvs[driver]
if drv.pid is not None and not psutil.pid_exists(drv.pid):
logger.warning(f"respawning crashed driver {driver!r}")
spawn_tomato_driver(daemon.port, driver, req)
action_counter += 1
elif drv.pid is None and drv.spawned_at is None:
logger.debug(f"spawning driver {driver!r}")
spawn_tomato_driver(daemon.port, driver, req)
action_counter += 1
elif drv.pid is None:
tspawn = datetime.fromisoformat(drv.spawned_at)
if (datetime.now(timezone.utc) - tspawn).seconds > 10:
logger.warning(f"respawning late driver {driver!r}")
spawn_tomato_driver(daemon.port, driver, req)
time.sleep(timeout / 1e3)
action_counter += 1
logger.debug("tick")
time.sleep(1 if action_counter > 0 else 0.1)

logger.info("instructed to quit")
req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager"))
Expand Down
163 changes: 91 additions & 72 deletions src/tomato/daemon/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
from importlib import metadata
from datetime import datetime, timezone
from pathlib import Path
from threading import currentThread
from multiprocessing import Process

from threading import currentThread, Thread
from typing import Any
import zmq
import psutil

Expand Down Expand Up @@ -99,7 +98,6 @@ def manage_running_pips(daemon: Daemon, req):
logger.debug(f"{running=}")
for pip in running:
job = daemon.jobs[pip.jobid]
logger.debug(f"{job=}")
if job.pid is None:
continue
pidexists = psutil.pid_exists(job.pid)
Expand All @@ -111,6 +109,7 @@ def manage_running_pips(daemon: Daemon, req):
proc = psutil.Process(pid=job.pid)
kill_tomato_job(proc)
logger.info(f"job {job.id} with pid {job.pid} was terminated successfully")
merge_netcdfs(Path(job.jobpath), Path(job.respath))
reset = True
params = dict(status="cd")
# dead jobs marked as running (status == 'r') should be cleared
Expand Down Expand Up @@ -244,6 +243,33 @@ def manager(port: int, timeout: int = 500):
logger.info("instructed to quit")


def lazy_pirate(
pyobj: Any, retries: int, timeout: int, address: str, context: zmq.Context
) -> Any:
logger.debug("Here")
req = context.socket(zmq.REQ)
req.connect(address)
poller = zmq.Poller()
poller.register(req, zmq.POLLIN)
for _ in range(retries):
req.send_pyobj(pyobj)
events = dict(poller.poll(timeout))
if req not in events:
logger.warning(f"could not contact tomato-daemon in {timeout/1000} s")
req.setsockopt(zmq.LINGER, 0)
req.close()
poller.unregister(req)
req = context.socket(zmq.REQ)
req.connect(address)
poller.register(req, zmq.POLLIN)
else:
break
else:
logger.error(f"number of connection retries exceeded: {retries}")
raise RuntimeError(f"Number of connection retries exceeded: {retries}")
return req.recv_pyobj()


def tomato_job() -> None:
"""
The function called when `tomato-job` is executed.
Expand Down Expand Up @@ -271,6 +297,12 @@ def tomato_job() -> None:
default=1000,
type=int,
)
parser.add_argument(
"--retries",
help="Number of retries for driver actions.",
default=10,
type=int,
)
parser.add_argument(
"jobfile",
type=Path,
Expand All @@ -286,11 +318,11 @@ def tomato_job() -> None:
jobid = jsdata["job"]["id"]
jobpath = Path(jsdata["job"]["path"]).resolve()

logfile = jobpath / f"job-{jobid}.log"
logpath = jobpath / f"job-{jobid}.log"
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)8s - %(name)-30s - %(message)s",
handlers=[logging.FileHandler(logfile, mode="a")],
handlers=[logging.FileHandler(logpath, mode="a")],
)
logger = logging.getLogger(__name__)

Expand All @@ -309,98 +341,84 @@ def tomato_job() -> None:
context = zmq.Context()
req = context.socket(zmq.REQ)
req.connect(f"tcp://127.0.0.1:{args.port}")
poller = zmq.Poller()
poller.register(req, zmq.POLLIN)

pkwargs = dict(
address=f"tcp://127.0.0.1:{args.port}",
retries=args.retries,
timeout=args.timeout,
context=context,
)
params = dict(pid=pid, status="r", executed_at=str(datetime.now(timezone.utc)))
req.send_pyobj(dict(cmd="job", id=jobid, params=params))
events = dict(poller.poll(args.timeout))
if req in events:
req.recv_pyobj()
else:
logger.warning(f"could not contact tomato-daemon in {args.timeout/1000} s")
lazy_pirate(pyobj=dict(cmd="job", id=jobid, params=params), **pkwargs)

output = tomato["output"]
prefix = f"results.{jobid}" if output["prefix"] is None else output["prefix"]
outpath = Path(output["path"])
snappath = outpath / f"snapshot.{jobid}.nc"
logger.debug(f"output folder is {outpath}")
if outpath.exists():
assert outpath.is_dir()
else:
logger.debug("path does not exist, creating")
os.makedirs(outpath)
prefix = f"results.{jobid}" if output["prefix"] is None else output["prefix"]
respath = outpath / f"{prefix}.nc"
snappath = outpath / f"snapshot.{jobid}.nc"
params = dict(respath=str(respath), snappath=str(snappath), jobpath=str(jobpath))
lazy_pirate(pyobj=dict(cmd="job", id=jobid, params=params), **pkwargs)

logger.info("handing off to 'job_main_loop'")
logger.info("==============================")
ret = job_main_loop(context, args.port, payload, pip, jobpath, snappath)
job_main_loop(context, args.port, payload, pip, jobpath, snappath, logpath)
logger.info("==============================")

merge_netcdfs(jobpath, outpath / f"{prefix}.nc")
merge_netcdfs(jobpath, respath)

logger.info("job finished successfully, attempting to set status to 'c'")
params = dict(status="c", completed_at=str(datetime.now(timezone.utc)))
ret = lazy_pirate(pyobj=dict(cmd="job", id=jobid, params=params), **pkwargs)
logger.debug(f"{ret=}")
if ret.success is False:
logger.error("could not set job status for unknown reason")
return 1

if ret is None:
logger.info("job finished successfully, attempting to set status to 'c'")
params = dict(status="c", completed_at=str(datetime.now(timezone.utc)))
req.send_pyobj(dict(cmd="job", id=jobid, params=params))
events = dict(poller.poll(args.timeout))
if req not in events:
logger.warning(f"could not contact tomato-daemon in {args.timeout/1000} s")
req.setsockopt(zmq.LINGER, 0)
req.close()
poller.unregister(req)
req = context.socket(zmq.REQ)
req.connect(f"tcp://127.0.0.1:{args.port}")
else:
ret = req.recv_pyobj()
logger.debug(f"{ret=}")
if ret.success is False:
logger.error("could not set job status for unknown reason")
return 1
else:
logger.info("job was terminated, status should be 'cd'")
logger.info("handing off to 'driver_reset'")
logger.info("==============================")
# driver_reset(pip)
logger.info("==============================")
ready = False
logger.info(f"resetting pipeline {pip!r}")
params = dict(jobid=None, ready=ready, name=pip)
req.send_pyobj(dict(cmd="pipeline", params=params))
events = dict(poller.poll(args.timeout))
if req in events:
ret = req.recv_pyobj()
logger.debug(f"{ret=}")
if not ret.success:
logger.error(f"could not reset pipeline {pip!r}")
return 1
else:
logger.error(f"could not contact tomato-daemon in {args.timeout/1000} s")
ret = lazy_pirate(pyobj=dict(cmd="pipeline", params=params), **pkwargs)
logger.debug(f"{ret=}")
if not ret.success:
logger.error(f"could not reset pipeline {pip!r}")
return 1
logger.info("exiting tomato-job")


def job_process(
def job_thread(
tasks: list,
component: Component,
device: Device,
driver: Driver,
jobpath: Path,
logpath: Path,
):
"""
Child process of `tomato-job`, responsible for tasks on one Component of a Pipeline.
A subthread of `tomato-job`, responsible for tasks on one Component of a Pipeline.
For each task in tasks, starts the task, then monitors the Component status and polls
for data, and moves on to the next task as instructed in the payload.
Stores the data for that Component as a `pickle` of a :class:`xr.Dataset`.
"""
sender = f"{__name__}.job_process"
sender = f"{__name__}.job_thread"
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)8s - %(name)-30s - %(message)s",
handlers=[logging.FileHandler(logpath, mode="a")],
)
logger = logging.getLogger(sender)
logger.debug(f"in job process of {component.role!r}")
logger.debug(f"in job thread of {component.role!r}")

context = zmq.Context()
req = context.socket(zmq.REQ)
req.connect(f"tcp://127.0.0.1:{driver.port}")
logger.debug(f"job process of {component.role!r} connected to tomato-daemon")
logger.debug(f"job thread of {component.role!r} connected to tomato-daemon")

kwargs = dict(address=component.address, channel=component.channel)

Expand Down Expand Up @@ -433,7 +451,8 @@ def job_process(
logger.debug(f"{ret=}")
if ret.success and ret.msg == "ready":
break
time.sleep(device.pollrate / 10)
time.sleep(device.pollrate - (tN - t0))
logger.debug("tock")
req.send_pyobj(dict(cmd="task_data", params={**kwargs}))
ret = req.recv_pyobj()
if ret.success:
Expand All @@ -447,12 +466,14 @@ def job_main_loop(
pipname: str,
jobpath: Path,
snappath: Path,
logpath: Path,
) -> None:
"""
The main loop function of `tomato-job`, split for better readability.
"""
sender = f"{__name__}.job_worker"
sender = f"{__name__}.job_main_loop"
logger = logging.getLogger(sender)
logger.debug("process started")

req = context.socket(zmq.REQ)
req.connect(f"tcp://127.0.0.1:{port}")
Expand Down Expand Up @@ -481,36 +502,34 @@ def job_main_loop(
logger.debug(f"{plan=}")

# distribute plan into threads
processes = {}
threads = {}
for role, tasks in plan.items():
component = pipeline.devs[role]
logger.debug(f"{component=}")
device = daemon.devs[component.name]
logger.debug(f"{device=}")
driver = daemon.drvs[device.driver]
logger.debug(f"{driver=}")
processes[role] = Process(
target=job_process,
args=(tasks, component, device, driver, jobpath),
name="job-process",
threads[role] = Thread(
target=job_thread,
args=(tasks, component, device, driver, jobpath, logpath),
name="job-thread",
)
processes[role].start()
threads[role].start()

# wait until threads join or we're killed
snapshot = payload["tomato"].get("snapshot", None)
t0 = time.perf_counter()
while True:
logger.debug("tick")
tN = time.perf_counter()
if snapshot is not None and tN - t0 > snapshot["frequency"]:
logger.debug("creating snapshot")
merge_netcdfs(jobpath, snappath)
t0 += snapshot["frequency"]
joined = [proc.is_alive() is False for proc in processes.values()]
joined = [proc.is_alive() is False for proc in threads.values()]
if all(joined):
break
else:
time.sleep(1)
logger.debug(f"{[proc.exitcode for proc in processes.values()]}")
for proc in processes.values():
if proc.exitcode != 0:
return proc.exitcode
# We'd like to execute this loop exactly once every second
time.sleep(1.0 - tN % 1)
3 changes: 1 addition & 2 deletions src/tomato/ketchup/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,8 @@ def snapshot(
if jobs[jobid].status in {"q", "qw"}:
return Reply(success=False, msg=f"job {jobid} is still queued")

jobdir = Path(status.data.settings["jobs"]["storage"])
for jobid in jobids:
merge_netcdfs(jobdir / str(jobid), Path(f"snapshot.{jobid}.nc"))
merge_netcdfs(Path(jobs[jobid].jobpath), Path(f"snapshot.{jobid}.nc"))
return Reply(success=True, msg=f"snapshot for job(s) {jobids} created successfully")


Expand Down
3 changes: 3 additions & 0 deletions src/tomato/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class Job(BaseModel):
submitted_at: Optional[str] = None
executed_at: Optional[str] = None
completed_at: Optional[str] = None
jobpath: Optional[str] = None
respath: Optional[str] = None
snappath: Optional[str] = None


class Daemon(BaseModel, arbitrary_types_allowed=True):
Expand Down
10 changes: 10 additions & 0 deletions tests/common/counter_20_5.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "0.2"
sample:
name: counter_20_5
method:
- device: "counter"
technique: "count"
time: 20.0
delay: 5.0
tomato:
verbosity: "DEBUG"
Loading

0 comments on commit eedbfd6

Please sign in to comment.