diff --git a/.github/workflows/workflow-test.yml b/.github/workflows/workflow-test.yml index 93a7ac0..be92fa1 100644 --- a/.github/workflows/workflow-test.yml +++ b/.github/workflows/workflow-test.yml @@ -33,4 +33,4 @@ jobs: run: tomato --version - name: Run pytest shell: bash - run: pytest -vv \ No newline at end of file + run: pytest -vvv --durations=0 \ No newline at end of file diff --git a/src/tomato/daemon/job.py b/src/tomato/daemon/job.py index bde8738..576bc4b 100644 --- a/src/tomato/daemon/job.py +++ b/src/tomato/daemon/job.py @@ -27,7 +27,7 @@ import psutil from tomato.daemon.io import merge_netcdfs, data_to_pickle -from tomato.models import Pipeline, Daemon, Component, Device, Driver, Job +from tomato.models import Pipeline, Daemon, Component, Device, Driver, Job, CompletedJob from dgbowl_schemas.tomato import to_payload from dgbowl_schemas.tomato.payload import Task @@ -100,7 +100,9 @@ def manage_running_pips(daemon: Daemon, req): logger.debug(f"{running=}") for pip in running: job = daemon.jobs[pip.jobid] - if job.pid is None: + if isinstance(job, CompletedJob): + continue + elif job.pid is None: continue pidexists = psutil.pid_exists(job.pid) logger.debug(f"{pidexists=}") diff --git a/src/tomato/ketchup/__init__.py b/src/tomato/ketchup/__init__.py index bc61196..ede07df 100644 --- a/src/tomato/ketchup/__init__.py +++ b/src/tomato/ketchup/__init__.py @@ -296,7 +296,7 @@ def snapshot( return Reply(success=False, msg=f"job {jobid} is still queued") for jobid in jobids: - jobs[jobid].snappath = Path(f"snapshot.{jobid}.nc") + jobs[jobid].snappath = f"snapshot.{jobid}.nc" merge_netcdfs(jobs[jobid], snapshot=True) if len(jobids) > 1: msg = f"snapshot for jobs {jobids} created successfully" diff --git a/tests/common/counter_20_1.yml b/tests/common/counter_20_1.yml new file mode 100644 index 0000000..8366c22 --- /dev/null +++ b/tests/common/counter_20_1.yml @@ -0,0 +1,10 @@ +version: "0.2" +sample: + name: counter_20_1 +method: + - device: "counter" + technique: "count" + time: 20.0 + delay: 1.0 +tomato: + verbosity: "DEBUG" \ No newline at end of file diff --git a/tests/common/counter_stresstest.yml b/tests/common/counter_stresstest.yml new file mode 100644 index 0000000..34a03d8 --- /dev/null +++ b/tests/common/counter_stresstest.yml @@ -0,0 +1,11 @@ +version: "1.0" +sample: + name: counter_stresstest +method: + - component_tag: "counter" + technique_name: "count" + max_duration: 0.2 + sampling_interval: 0.1 +settings: + unlock_when_done: true + verbosity: DEBUG \ No newline at end of file diff --git a/tests/test_99_example_counter.py b/tests/test_99_example_counter.py index 074423f..8ab14da 100644 --- a/tests/test_99_example_counter.py +++ b/tests/test_99_example_counter.py @@ -70,7 +70,7 @@ def test_counter_cancel(casename, datadir, start_tomato_daemon, stop_tomato_daem @pytest.mark.parametrize( "casename, external", [ - ("counter_60_0.1", True), + ("counter_20_1", True), ("counter_snapshot", False), ], ) diff --git a/tests/test_99_stresstest.py b/tests/test_99_stresstest.py new file mode 100644 index 0000000..c0ebbff --- /dev/null +++ b/tests/test_99_stresstest.py @@ -0,0 +1,42 @@ +import pytest +import os +import subprocess +import xarray as xr +from datetime import datetime + +from tomato.models import Job +from . import utils + +PORT = 12345 + + +@pytest.mark.parametrize( + "case, nreps", + [ + ("counter_stresstest", 5), + ], +) +def test_stresstest(case, nreps, datadir, stop_tomato_daemon): + os.chdir(datadir) + subprocess.run(["tomato", "init", "-p", f"{PORT}", "-A", ".", "-D", "."]) + subprocess.run(["tomato", "start", "-p", f"{PORT}", "-A", ".", "-L", "."]) + utils.wait_until_tomato_running(port=PORT, timeout=3000) + + subprocess.run(["tomato", "pipeline", "load", "-p", f"{PORT}", "pip-counter", case]) + for i in range(nreps): + subprocess.run(["ketchup", "submit", "-p", f"{PORT}", f"{case}.yml"]) + + subprocess.run(["tomato", "pipeline", "ready", "-p", f"{PORT}", "pip-counter"]) + + utils.wait_until_ketchup_status(jobid=nreps, status="c", port=PORT, timeout=40000) + + prev = None + for i in range(nreps): + i += 1 + assert os.path.exists(f"results.{i}.nc") + dt = xr.open_datatree(f"results.{i}.nc") + completed_at = Job.model_validate_json(dt.attrs["tomato_Job"]).completed_at + ti = datetime.fromisoformat(completed_at) + if prev is not None: + assert ti > prev + prev = ti