diff --git a/lib/pavilion/cancel_utils.py b/lib/pavilion/cancel_utils.py index 86a52b567..04e65d961 100644 --- a/lib/pavilion/cancel_utils.py +++ b/lib/pavilion/cancel_utils.py @@ -2,16 +2,29 @@ import io from collections import defaultdict -from typing import List, TextIO +from operator import attrgetter +from itertools import filterfalse +from typing import List, TextIO, Iterable, Union, Iterator import time from pavilion import schedulers from pavilion import utils from pavilion.test_run import TestRun, load_tests from pavilion import output +from pavilion.config import PavConfig +from pavilion.micro import do -def cancel_jobs(pav_cfg, tests: List[TestRun], errfile: TextIO = None) -> List[dict]: +def not_completed(tests: Iterator[Union[TestRun, "TestSeries"]]) -> List[TestRun]: + """Return a list of only those tests in the input sequence + that have not completed running.""" + + return list(filterfalse(attrgetter("complete"), tests)) + +def cancel_jobs( + pav_cfg: PavConfig, + tests: Iterable[TestRun], + errfile: TextIO = None) -> List[dict]: """Collect all jobs from the given tests, and cancel them if all the tests attached to those jobs have been cancelled. @@ -65,22 +78,23 @@ def cancel_jobs(pav_cfg, tests: List[TestRun], errfile: TextIO = None) -> List[d SERIES_WARN_EXPIRE = 60*60*24 # 24 hours -def cancel_tests(pav_cfg, tests: List, outfile: TextIO, - max_wait: float = 3.0, no_series_warning=False): +def cancel_tests(pav_cfg: PavConfig, tests: Iterable[TestRun], outfile: TextIO, + max_wait: float = 3.0, no_series_warning: bool = False) -> int: """Cancel all of the given tests, printing useful user messages and error information.""" user = utils.get_login() - tests = [test for test in tests if not test.complete] + tests = not_completed(tests) # Cancel each test. Note that this does not cancel test jobs or builds. cancelled_test_info = [] + for test in tests: # Don't try to cancel complete tests test.cancel("Cancelled via cmdline by user '{}'".format(user)) cancelled_test_info.append(test) - if cancelled_test_info: + if len(cancelled_test_info) > 0: test_count = len(tests) output.draw_table( title="Cancelling {} test{}".format(test_count, 's' if test_count > 1 else ''), @@ -91,17 +105,17 @@ def cancel_tests(pav_cfg, tests: List, outfile: TextIO, for test in cancelled_test_info]) else: output.fprint(outfile, "No tests needed to be cancelled.") + return 0 timeout = time.time() + max_wait wait_tests = list(tests) wait_msg = True - while wait_tests and time.time() > timeout: - for test in wait_tests.copy(): - if test.complete: - wait_tests.remove(test) - if wait_tests: + while len(wait_tests) > 0 and time.time() > timeout: + wait_tests = not_completed(wait_tests) + + if len(wait_tests) > 0: if wait_msg: output.fprint(outfile, "Giving tests a moment to quit.", end='') wait_msg = False @@ -112,11 +126,12 @@ def cancel_tests(pav_cfg, tests: List, outfile: TextIO, if not wait_msg: output.fprint(outfile, 'Done') + output.fprint(outfile, '\n') job_cancel_info = cancel_jobs(pav_cfg, tests, outfile) - if job_cancel_info: + if len(job_cancel_info) > 0: jobs = len(job_cancel_info) output.draw_table( outfile=outfile, @@ -141,3 +156,13 @@ def cancel_tests(pav_cfg, tests: List, outfile: TextIO, break return 0 + + +def cancel_series(sers: Iterable["TestSeries"], errfile: TextIO = None) -> int: + """Cancel all the series in the sequence.""" + + running_series = not_completed(sers) + + do(lambda x: x.cancel(), running_series) + + return 0 diff --git a/lib/pavilion/cmd_utils.py b/lib/pavilion/cmd_utils.py index b90009340..7376446f2 100644 --- a/lib/pavilion/cmd_utils.py +++ b/lib/pavilion/cmd_utils.py @@ -8,7 +8,7 @@ import sys import time from pathlib import Path -from typing import List, TextIO, Union +from typing import List, TextIO, Union, Iterator from collections import defaultdict from pavilion import config @@ -23,10 +23,62 @@ PavilionError, TestGroupError from pavilion.test_run import TestRun, load_tests, TestAttributes from pavilion.types import ID_Pair +from pavilion.micro import flatten LOGGER = logging.getLogger(__name__) +def expand_range(test_range: str) -> List[str]: + """Expand a given test or series range into a list of the individual + tests or series in that range""" + + tests = [] + + if test_range == "all": + return ["all"] + + elif '-' in test_range: + id_start, id_end = test_range.split('-', 1) + + if id_start.startswith('s'): + series_range_start = int(id_start.replace('s','')) + + if id_end.startswith('s'): + series_range_end = int(id_end.replace('s','')) + else: + series_range_end = int(id_end) + + series_ids = range(series_range_start, series_range_end+1) + + for sid in series_ids: + tests.append('s' + str(sid)) + else: + test_range_start = int(id_start) + test_range_end = int(id_end) + test_ids = range(test_range_start, test_range_end+1) + + for tid in test_ids: + tests.append(str(tid)) + else: + tests.append(test_range) + + return tests + + +def expand_ranges(ranges: Iterator[str]) -> Iterator[str]: + """Given a sequence of test and series ranges, expand them + into a sequence of individual tests and series.""" + + return flatten(map(expand_range, ranges)) + + +#pylint: disable=C0103 +def is_series_id(id: str) -> bool: + """Determine whether the given ID is a series ID.""" + + return len(id) > 0 and id[0].lower() == 's' + + def load_last_series(pav_cfg, errfile: TextIO) -> Union[series.TestSeries, None]: """Load the series object for the last series run by this user on this system.""" @@ -51,7 +103,7 @@ def set_arg_defaults(args): args.filter = getattr(args, 'filter', def_filter) -def arg_filtered_tests(pav_cfg, args: argparse.Namespace, +def arg_filtered_tests(pav_cfg: "PavConfig", args: argparse.Namespace, verbose: TextIO = None) -> dir_db.SelectItems: """Search for test runs that match based on the argument values in args, and return a list of matching test id's. @@ -64,6 +116,8 @@ def arg_filtered_tests(pav_cfg, args: argparse.Namespace, 2. All of the used bits are *ALWAYS* used, so any errors will pop up immediately in unit tests. + TODO: Rewrite the interface so that it's cleaner and not coupled to argparse. - HW + :param pav_cfg: The Pavilion config. :param args: An argument namespace with args defined by `filters.add_test_filter_args`, plus one additional `tests` argument @@ -80,26 +134,10 @@ def arg_filtered_tests(pav_cfg, args: argparse.Namespace, sort_by = getattr(args, 'sort_by', 'created') ids = [] + for test_range in args.tests: - if '-' in test_range: - id_start, id_end = test_range.split('-', 1) - if id_start.startswith('s'): - series_range_start = int(id_start.replace('s','')) - if id_end.startswith('s'): - series_range_end = int(id_end.replace('s','')) - else: - series_range_end = int(id_end) - series_ids = range(series_range_start, series_range_end+1) - for sid in series_ids: - ids.append('s' + str(sid)) - else: - test_range_start = int(id_start) - test_range_end = int(id_end) - test_ids = range(test_range_start, test_range_end+1) - for tid in test_ids: - ids.append(str(tid)) - else: - ids.append(test_range) + ids.extend(expand_range(test_range)) + args.tests = ids if 'all' in args.tests: diff --git a/lib/pavilion/commands/cancel.py b/lib/pavilion/commands/cancel.py index e11a1752c..4299b55fc 100644 --- a/lib/pavilion/commands/cancel.py +++ b/lib/pavilion/commands/cancel.py @@ -2,6 +2,7 @@ import errno import time +from argparse import Namespace from pavilion import cancel_utils from pavilion import cmd_utils @@ -10,6 +11,8 @@ from pavilion import series from pavilion.errors import TestSeriesError from pavilion.test_run import TestRun +from pavilion.config import PavConfig +from pavilion.micro import partition from .base_classes import Command from ..errors import TestRunError @@ -39,19 +42,31 @@ def _setup_arguments(self, parser): 'in the most recent series submitted by the user is cancelled.') filters.add_test_filter_args(parser, sort_keys=[], disable_opts=['sys-name']) - def run(self, pav_cfg, args): - """Cancel the given tests.""" + def run(self, pav_cfg: PavConfig, args: Namespace) -> int: + """Cancel the given tests or series.""" - if not args.tests: + if len(args.tests) == 0: # Get the last series ran by this user. series_id = series.load_user_series_id(pav_cfg) + if series_id is not None: args.tests.append(series_id) - cancelled_series = False + # Separate out into tests and series + series_ids, test_ids = partition(cmd_utils.is_series_id, args.tests) - test_paths = cmd_utils.arg_filtered_tests(pav_cfg, args, verbose=self.errfile).paths + args.tests = test_ids + args.series = series_ids + # Get TestRun and TestSeries objects + test_paths = cmd_utils.arg_filtered_tests(pav_cfg, args, verbose=self.errfile).paths tests = cmd_utils.get_tests_by_paths(pav_cfg, test_paths, errfile=self.errfile) - return cancel_utils.cancel_tests(pav_cfg, tests, self.outfile) + sinfos = cmd_utils.arg_filtered_series(pav_cfg, args, verbose=self.errfile) + test_series = map(lambda x: series.TestSeries.load(pav_cfg, x.sid), sinfos) + + # Cancel TestRuns and TestSeries + test_ret = cancel_utils.cancel_tests(pav_cfg, tests, self.outfile) + sers_ret = cancel_utils.cancel_series(test_series, self.outfile) + + return test_ret or sers_ret diff --git a/lib/pavilion/commands/series.py b/lib/pavilion/commands/series.py index 96ad06e20..64e1bba5d 100644 --- a/lib/pavilion/commands/series.py +++ b/lib/pavilion/commands/series.py @@ -3,6 +3,7 @@ import argparse import errno import sys +from argparse import Namespace from typing import List from pavilion import arguments @@ -17,6 +18,7 @@ from pavilion import sys_vars from pavilion import utils from pavilion.errors import TestSeriesError, TestSeriesWarning +from pavilion.config import PavConfig from .base_classes import Command, sub_cmd @@ -426,7 +428,7 @@ def _state_history_cmd(self, pav_cfg: config.PavConfig, args): ) @sub_cmd() - def _cancel_cmd(self, pav_cfg, args): + def _cancel_cmd(self, pav_cfg: PavConfig, args: Namespace) -> int: """Cancel all series found given the arguments.""" series_info = cmd_utils.arg_filtered_series(pav_cfg, args, verbose=self.errfile) diff --git a/lib/pavilion/limiter.py b/lib/pavilion/limiter.py new file mode 100644 index 000000000..76cbdb7e0 --- /dev/null +++ b/lib/pavilion/limiter.py @@ -0,0 +1,28 @@ +import time +import math +from typing import Callable, Tuple, Any + + +class TimeLimiter: + """Wraps a call to a function and only calls it if the specified + cooldown (in seconds) has elapsed since the last call.""" + + def __init__(self, func: Callable[[], Any], cooldown: float): + self.function = func + self.cooldown = cooldown + self.last_called = -math.inf + + def __call__(self) -> Tuple[bool, Any]: + """Calls the function if enough time has passed, and returns + a tuple containing a boolean indicating whether the function was actually called + and the return value of the function (or None if it was not called).""" + + current_time = time.time() + + if current_time - self.last_called > self.cooldown: + res = self.function() + self.last_called = current_time + + return (True, res) + + return (False, None) diff --git a/lib/pavilion/micro.py b/lib/pavilion/micro.py index a4b22d744..7384c329f 100644 --- a/lib/pavilion/micro.py +++ b/lib/pavilion/micro.py @@ -1,10 +1,17 @@ -"""A collection of 'microfunctions' primarily designed to abstract common -tasks and patterns, for the purpose of conciseness and readability.""" +""" +A collection of 'microfunctions' primarily designed to abstract common +tasks and patterns, for the purpose of conciseness and readability. + +Some functions are borrowed from the recipes in the itertools docs: + +https://docs.python.org/3/library/itertools.html#itertools-recipes +""" from pathlib import Path -from itertools import filterfalse, chain, tee -from typing import (List, Union, TypeVar, Iterator, Iterable, Callable, Optional, - Hashable, Dict, Tuple) +from itertools import filterfalse, chain, tee, islice +from collections import deque +from typing import (List, Union, TypeVar, Iterator, Iterable, Callable, Optional, Hashable, Dict, + Tuple, Any) T = TypeVar('T') U = TypeVar('U') @@ -20,7 +27,9 @@ def partition(pred: Callable[[T], bool], lst: Iterable[T]) -> Tuple[Iterator[T], return filter(pred, f_true), filterfalse(pred, f_false) def flatten(lst: Iterable[Iterable[T]]) -> Iterator[T]: - """Convert a singly nested iterable into an unnested iterable.""" + """Convert a singly nested iterable into an unnested iterable. + + Borrowed from the itertools docs.""" return chain.from_iterable(lst) def remove_all(lst: Iterable[T], item: T) -> Iterator[T]: @@ -28,7 +37,8 @@ def remove_all(lst: Iterable[T], item: T) -> Iterator[T]: return filter(lambda x: x != item, lst) def unique(lst: Iterable[T]) -> List[T]: - """Return a list of the unique items in the original list.""" + """Return a list of the unique items in the original list. + Not guaranteed to preserve the order of unique items.""" return list(set(lst)) def replace(lst: Iterable[T], old: T, new: T) -> Iterator[T]: @@ -79,3 +89,21 @@ def set_default(val: Optional[T], default: T) -> T: return default return val + +def consume(lst: Iterator[Any], num_items: int = None) -> None: + """Advance the iterator by num_items. If n is None, consume entirely. + + Useful for forcing side-effects for mapped functions.""" + + if num_items is None: + deque(lst, maxlen=0) + else: + next(islice(lst, num_items, num_items), None) + +# pylint: disable=C0103 +def do(func: Callable[[T], Any], lst: Iterable[T]) -> None: + """Map the function over the sequence of objects, ensuring + that all side effects occur. This is necessary because map + is lazily evaluated.""" + + consume(map(func, lst)) diff --git a/lib/pavilion/series/__init__.py b/lib/pavilion/series/__init__.py index e86713cf6..23e5adcff 100644 --- a/lib/pavilion/series/__init__.py +++ b/lib/pavilion/series/__init__.py @@ -1,7 +1,7 @@ """Module init for series objects and related functions.""" import json -from typing import TextIO +from typing import TextIO, Optional from pavilion import output from pavilion import utils, dir_db @@ -13,7 +13,7 @@ from .common import COMPLETE_FN, STATUS_FN, get_all_started -def load_user_series_id(pav_cfg, errfile=None): +def load_user_series_id(pav_cfg, errfile=None) -> Optional[str]: """Load the last series id used by the current user.""" user = utils.get_login() diff --git a/lib/pavilion/series/series.py b/lib/pavilion/series/series.py index b3b44d02f..dd62e5af6 100644 --- a/lib/pavilion/series/series.py +++ b/lib/pavilion/series/series.py @@ -1,3 +1,4 @@ +# pylint: disable=W0221 """Series are built around a config that specifies a 'series' of tests to run. It also tracks the tests that have run under it.""" import io @@ -11,7 +12,8 @@ import time from collections import defaultdict, OrderedDict from pathlib import Path -from typing import List, Dict, Set, Union, TextIO, Iterator +from operator import attrgetter +from typing import List, Dict, Set, Union, TextIO, Iterator, Optional import pavilion from pavilion import cancel_utils @@ -27,6 +29,8 @@ from pavilion.status_file import SeriesStatusFile, SERIES_STATES from pavilion.test_run import TestRun from pavilion.types import ID_Pair +from pavilion.micro import partition +from pavilion.limiter import TimeLimiter from yaml_config import YAMLError, RequiredError from .info import SeriesInfo from .test_set import TestSet @@ -46,16 +50,20 @@ class TestSeries: DEPENDENCY_FN = 'dependency' OUT_FN = 'series.out' PGID_FN = 'series.pgid' + CANCEL_FN = 'series.CANCELED' NAME_RE = re.compile('[a-z][a-z0-9_-]+$') def __init__(self, pav_cfg: config.PavConfig, series_cfg, _id=None, - verbosity: Verbose = Verbose.HIGH, outfile: TextIO = None): + verbosity: Verbose = Verbose.HIGH, outfile: TextIO = None, + cancel_cooldown: float = 0.5): """Initialize the series. Test sets may be added via 'add_tests()'. :param pav_cfg: The pavilion configuration object. :param series_cfg: Series config, if generated from a series file. :param _id: The test id number. If this is given, it implies that we're regenerating this series from saved files. + :param cancel_cooldown: The minimum time permitted between attempts + to cancel the series, in seconds. """ self.pav_cfg: config.PavConfig = pav_cfg @@ -64,6 +72,7 @@ def __init__(self, pav_cfg: config.PavConfig, series_cfg, _id=None, self.outfile = io.StringIO() if outfile is None else outfile self.verbosity = verbosity + self.cancel_limiter = TimeLimiter(self.has_cancel_file, cooldown=cancel_cooldown) name = self.config.get('name') or 'unnamed' if not self.NAME_RE.match(name): @@ -357,7 +366,7 @@ def reset_test_sets(self): self.test_sets = {} - def cancel(self, message=None, cancel_tests=True): + def _cancel_tests(self, message: str = None, cancel_tests: bool = True) -> None: """Goes through all test objects assigned to series and cancels tests that haven't been completed. @@ -373,12 +382,41 @@ def cancel(self, message=None, cancel_tests=True): if cancel_tests: for test in self.tests.values(): + # Cancel the test test.cancel(message or "Cancelled via series. Reason not given.") - cancel_utils.cancel_jobs(self.pav_cfg, List[self.tests.values()]) + # Cancel the scheduler jobs associated with the tests + cancel_utils.cancel_jobs(self.pav_cfg, self.tests.values()) self.status.set(SERIES_STATES.CANCELED, "Series cancelled: {}".format(message)) + def cancel(self, message: str = None, cancel_tests: bool = True) -> None: + """Create the cancellation file for the series, then (optionally) cancel + all tests assocated with the series.""" + + cancel_file = self.path / self.CANCEL_FN + cancel_file.touch() + + self._cancel_tests(message, cancel_tests) + + def has_cancel_file(self) -> bool: + """Determine whether the series has been cancelled.""" + + return (self.path / self.CANCEL_FN).exists() + + def check_cancelled(self) -> bool: + """Check whether the cancel file has been created, and if it has been, + cancel the series.""" + + checked, cancelled = self.cancel_limiter() + + if checked and cancelled: + self._cancel_tests(message="Series cancelled by another user.""") + + return True + + return False + def run(self, build_only: bool = False, rebuild: bool = False, local_builds_only: bool = False): """Build and kickoff all of the test sets in the series. @@ -401,8 +439,8 @@ def run(self, build_only: bool = False, rebuild: bool = False, "Error creating test sets: {}".format(err.args[0])) raise - # The names of all test sets that have completed. - complete = set() # type: Set[str] + if self.check_cancelled(): + return repeat = self.repeat @@ -415,15 +453,13 @@ def run(self, build_only: bool = False, rebuild: bool = False, repeat_iteration = 0 # run sets in order - while potential_sets: + while len(potential_sets) > 0: - sets_to_run = [] # type: List[TestSet] + if self.check_cancelled(): + return - # kick off any sets that aren't waiting on any sets to complete - for test_set in potential_sets: - parent_names = [parent.name for parent in test_set.parent_sets] - if all(map(lambda par: par in complete, parent_names)): - sets_to_run.append(test_set) + # Separate out sets whose parents have completed running + sets_to_run, waiting_sets = partition(attrgetter("parents_complete"), potential_sets) for test_set in sets_to_run: # Make sure it's ok to run this test set based on parent status. @@ -447,12 +483,14 @@ def run(self, build_only: bool = False, rebuild: bool = False, "Error making tests for series '{}'." .format(self.sid), err) - for test_set in sets_to_run: - potential_sets.remove(test_set) + potential_sets = list(waiting_sets) repeat -= 1 - if not potential_sets and repeat: + if self.check_cancelled(): + return + + if len(potential_sets) == 0 and repeat > 0: # If we're repeating multiple times, reset the test sets for the series # and recreate them to run again. repeat_iteration += 1 @@ -481,6 +519,10 @@ def _run_set(self, test_set: TestSet, build_only: bool, rebuild: bool, local_bui # Add all the tests we created to this test set. self._add_tests(test_batch, test_set.iter_name) + # Cancel tests if a cancel file has been dropped + if self.check_cancelled(): + return + # Build each test try: test_set.build(deprecated_builds, failed_builds) @@ -496,6 +538,9 @@ def _run_set(self, test_set: TestSet, build_only: bool, rebuild: bool, local_bui if not test_set.ready_to_start: continue + if self.check_cancelled(): + return + try: started_tests, new_jobs = test_set.kickoff() tests_running += len(started_tests) @@ -521,6 +566,7 @@ def _run_set(self, test_set: TestSet, build_only: bool, rebuild: bool, local_bui _simultaneous = test_set.simultaneous if test_set.simultaneous else self.simultaneous # Wait for jobs until enough have finished to start a new batch. while tests_running + self.batch_size > _simultaneous: + self.check_cancelled() tests_running -= test_set.wait() @@ -567,7 +613,7 @@ def info(self) -> SeriesInfo: return SeriesInfo(self.pav_cfg, self.path) @property - def pgid(self) -> Union[int, None]: + def pgid(self) -> Optional[int]: """Returns pgid of series if it exists, None otherwise.""" if self._pgid is None: diff --git a/lib/pavilion/series/test_set.py b/lib/pavilion/series/test_set.py index 7d5fe40fc..533318de3 100644 --- a/lib/pavilion/series/test_set.py +++ b/lib/pavilion/series/test_set.py @@ -746,6 +746,15 @@ def wait(self, wait_for_all=False, wait_period: int = TEST_WAIT_PERIOD, return completed_tests + + @property + def parents_complete(self) -> bool: + """Determine whether the test is ready to run, that is, whether all of its parent sets + have completed running.""" + + return all(map(lambda par: par.done, self.parent_sets)) + + @property def should_run(self) -> Union[bool, None]: """Evaluate whether this set should run at all, and mark it as 'done' diff --git a/lib/pavilion/test_run/test_run.py b/lib/pavilion/test_run/test_run.py index 18b6a5025..1f1393815 100644 --- a/lib/pavilion/test_run/test_run.py +++ b/lib/pavilion/test_run/test_run.py @@ -2,6 +2,7 @@ the list of all known test runs.""" # pylint: disable=too-many-lines +# pylint: disable=W0221 import copy import json import logging @@ -855,8 +856,8 @@ def set_run_complete(self): self._complete = True def cancel(self, reason: str): - """Set the cancel file for this test, and denote in its status that it was - cancelled.""" + """Create the cancellation file for the test, and denote in its status that it was + cancelled, but do nothing beyond that.""" if self.cancelled or self.complete: # Already cancelled. diff --git a/test/tests/lock_tests.py b/test/tests/lock_tests.py index cce53407b..83fd1f0e5 100644 --- a/test/tests/lock_tests.py +++ b/test/tests/lock_tests.py @@ -134,7 +134,7 @@ def _acquire_lock(*args, **kwargs): errfile = io.StringIO() with lockfile.LockFile(self.lock_path, errfile=errfile): self.lock_path.unlink() - self.assertIn("mysteriously disappeared", errfile.getvalue()) + self.assertIn("mysteriously disappeared", errfile.getvalue().replace("\n", " ")) dmy_lock = lockfile.LockFile(self.lock_path, expires_after=100) dmy_lock._id = 'abcd' @@ -144,7 +144,7 @@ def _acquire_lock(*args, **kwargs): dmy_lock._create_lockfile() # Remove our bad lockfile self.lock_path.unlink() - self.assertIn("mysteriously replaced", errfile.getvalue()) + self.assertIn("mysteriously replaced", errfile.getvalue().replace("\n", " ")) def test_fuzzylock_mutual_exclusion(self): # Test that FuzzyLock object correctly excludes concurrent access diff --git a/test/tests/series_tests.py b/test/tests/series_tests.py index cc7401a4b..681eb291f 100644 --- a/test/tests/series_tests.py +++ b/test/tests/series_tests.py @@ -1,5 +1,6 @@ """Tests for the Series object.""" from collections import OrderedDict +import time from pavilion import series from pavilion import series_config @@ -12,7 +13,7 @@ class SeriesTests(PavTestCase): def test_init(self): """Check initialization of the series object.""" - ignore_keys = ['outfile'] + ignore_keys = ['outfile', 'cancel_limiter'] # Initialize from scratch series1 = series.TestSeries( @@ -373,3 +374,24 @@ def _setup_conditionals_test(self, only_if=None, not_if=None) -> series.TestSeri series_obj.wait(timeout=10) return series_obj + + def test_series_cancel(self): + """Test that cancellation of series works as expected.""" + # Create and start a series + series_cfg = series_config.generate_series_config('test') + ser = series.TestSeries(self.pav_cfg, series_cfg=series_cfg) + + cancel_file = ser.path / ser.CANCEL_FN + + self.assertFalse(ser.has_cancel_file()) + + cancel_file.touch() + + self.assertTrue(ser.has_cancel_file()) + + ser.run() + + time.sleep(0.5) + + # Check state to verify that it was cancelled + self.assertTrue(ser.status.has_state('CANCELED'))