diff --git a/src/tomato/daemon/job.py b/src/tomato/daemon/job.py index e622b80c..91fdd806 100644 --- a/src/tomato/daemon/job.py +++ b/src/tomato/daemon/job.py @@ -339,36 +339,28 @@ def tomato_job() -> None: logger.info("handing off to 'job_main_loop'") logger.info("==============================") - ret = job_main_loop(context, args.port, payload, pip, jobpath, snappath, logpath) + job_main_loop(context, args.port, payload, pip, jobpath, snappath, logpath) logger.info("==============================") merge_netcdfs(jobpath, respath) - if ret is None: - logger.info("job finished successfully, attempting to set status to 'c'") - params = dict(status="c", completed_at=str(datetime.now(timezone.utc))) - req.send_pyobj(dict(cmd="job", id=jobid, params=params)) - events = dict(poller.poll(args.timeout)) - if req not in events: - logger.warning(f"could not contact tomato-daemon in {args.timeout/1000} s") - req.setsockopt(zmq.LINGER, 0) - req.close() - poller.unregister(req) - req = context.socket(zmq.REQ) - req.connect(f"tcp://127.0.0.1:{args.port}") - else: - ret = req.recv_pyobj() - logger.debug(f"{ret=}") - if ret.success is False: - logger.error("could not set job status for unknown reason") - return 1 + logger.info("job finished successfully, attempting to set status to 'c'") + params = dict(status="c", completed_at=str(datetime.now(timezone.utc))) + req.send_pyobj(dict(cmd="job", id=jobid, params=params)) + events = dict(poller.poll(args.timeout)) + if req not in events: + logger.warning(f"could not contact tomato-daemon in {args.timeout/1000} s") + req.setsockopt(zmq.LINGER, 0) + req.close() + poller.unregister(req) + req = context.socket(zmq.REQ) + req.connect(f"tcp://127.0.0.1:{args.port}") else: - logger.info("job was terminated, status should be 'cd'") - logger.info("handing off to 'driver_reset'") - logger.info("==============================") - # driver_reset(pip) - logger.info("==============================") - ready = False + ret = req.recv_pyobj() + logger.debug(f"{ret=}") + if ret.success is False: + logger.error("could not set job status for unknown reason") + return 1 logger.info(f"resetting pipeline {pip!r}") params = dict(jobid=None, ready=ready, name=pip) req.send_pyobj(dict(cmd="pipeline", params=params))