diff --git a/doc/runner.rst b/doc/runner.rst index ff705f0b..da724864 100644 --- a/doc/runner.rst +++ b/doc/runner.rst @@ -98,6 +98,7 @@ Option:: --inherit-environ=VARS --copy-env --no-locale + --timeout TIMEOUT --track-memory --tracemalloc @@ -140,6 +141,9 @@ Option:: - ``LC_TELEPHONE`` - ``LC_TIME`` +* ``--timeout``: set a timeout in seconds for an execution of the benchmark. + If the benchmark execution times out, pyperf exits with error code 124. + There is no time out by default. * ``--tracemalloc``: Use the ``tracemalloc`` module to track Python memory allocation and get the peak of memory usage in metadata (``tracemalloc_peak``). The module is only available on Python 3.4 and newer. diff --git a/pyperf/_manager.py b/pyperf/_manager.py index d45ab8d0..a55f7a5a 100644 --- a/pyperf/_manager.py +++ b/pyperf/_manager.py @@ -7,6 +7,8 @@ from pyperf._utils import MS_WINDOWS, create_environ, create_pipe, popen_killer +EXIT_TIMEOUT = 60 + # Limit to 5 calibration processes # (10 if calibration is needed for loops and warmups) MAX_CALIBRATION = 5 @@ -69,6 +71,9 @@ def worker_cmd(self, calibrate_loops, calibrate_warmups, wpipe): if args.profile: cmd.extend(['--profile', args.profile]) + if args.timeout: + cmd.extend(['--timeout', str(args.timeout)]) + if args.hook: for hook in args.hook: cmd.extend(['--hook', hook]) @@ -102,10 +107,12 @@ def spawn_worker(self, calibrate_loops, calibrate_warmups): proc = subprocess.Popen(cmd, env=env, **kw) with popen_killer(proc): - with rpipe.open_text() as rfile: - bench_json = rfile.read() - - exitcode = proc.wait() + try: + bench_json = rpipe.read_text(timeout=self.args.timeout) + exitcode = proc.wait(timeout=EXIT_TIMEOUT) + except TimeoutError as exc: + print(exc) + sys.exit(124) if exitcode: raise RuntimeError("%s failed with exit code %s" diff --git a/pyperf/_runner.py b/pyperf/_runner.py index c43b9e30..0cd79de0 100644 --- a/pyperf/_runner.py +++ b/pyperf/_runner.py @@ -183,6 +183,10 @@ def __init__(self, values=None, processes=None, 'value, used to calibrate the number of ' 'loops (default: %s)' % format_timedelta(min_time)) + parser.add_argument('--timeout', + help='Specify a timeout in seconds for a single ' + 'benchmark execution (default: disabled)', + type=strictly_positive) parser.add_argument('--worker', action='store_true', help='Worker process, run the benchmark.') parser.add_argument('--worker-task', type=positive_or_nul, metavar='TASK_ID', diff --git a/pyperf/_utils.py b/pyperf/_utils.py index e834089c..05e68ff9 100644 --- a/pyperf/_utils.py +++ b/pyperf/_utils.py @@ -1,9 +1,11 @@ import contextlib import math import os +import select import statistics import sys import sysconfig +import time from shlex import quote as shell_quote # noqa from shutil import which @@ -320,6 +322,36 @@ def open_text(self): self._file = file return file + def read_text(self, timeout=None): + if timeout is not None: + return self._read_text_timeout(timeout) + else: + with self.open_text() as rfile: + return rfile.read() + + def _read_text_timeout(self, timeout): + fd = self.fd + os.set_blocking(fd, False) + + start_time = time.monotonic() + output = [] + while True: + if time.monotonic() - start_time > timeout: + raise TimeoutError(f"Timed out after {timeout} seconds") + ready, _, _ = select.select([fd], [], [], timeout) + if not ready: + continue + try: + data = os.read(fd, 1024) + except BlockingIOError: + continue + if not data: + break + output.append(data) + + data = b"".join(output) + return data.decode("utf8") + class WritePipe(_Pipe): def to_subprocess(self): diff --git a/pyperf/tests/test_runner.py b/pyperf/tests/test_runner.py index afc66d8d..154c220b 100644 --- a/pyperf/tests/test_runner.py +++ b/pyperf/tests/test_runner.py @@ -149,6 +149,33 @@ def test_pipe(self): self.assertEqual(bench_json, tests.benchmark_as_json(result.bench)) + def test_pipe_with_timeout(self): + rpipe, wpipe = create_pipe() + with rpipe: + with wpipe: + arg = wpipe.to_subprocess() + # Don't close the file descriptor, it is closed by + # the Runner class + wpipe._fd = None + + result = self.exec_runner('--pipe', str(arg), + '--worker', '-l1', '-w1') + + # Mock the select to make the read pipeline not ready + with mock.patch('pyperf._utils.select.select', + return_value=(False, False, False)): + with self.assertRaises(TimeoutError) as cm: + rpipe.read_text(timeout=0.1) + self.assertEqual(str(cm.exception), + 'Timed out after 0.1 seconds') + + # Mock the select to make the read pipeline ready + with mock.patch('pyperf._utils.select.select', + return_value=(True, False, False)): + bench_json = rpipe.read_text(timeout=0.1) + self.assertEqual(bench_json.rstrip(), + tests.benchmark_as_json(result.bench).rstrip()) + def test_json_exists(self): with tempfile.NamedTemporaryFile('wb+') as tmp: