From bfa1df8b921cac59365629239eaf451a804f9515 Mon Sep 17 00:00:00 2001 From: zhenyu-ms <111329301+zhenyu-ms@users.noreply.github.com> Date: Fri, 21 Nov 2025 17:05:14 +0800 Subject: [PATCH 1/3] fix resources concurrent start & stop --- testplan/common/entity/base.py | 111 ++++++++++++++------------------- 1 file changed, 48 insertions(+), 63 deletions(-) diff --git a/testplan/common/entity/base.py b/testplan/common/entity/base.py index 5cd1b355c..f89aab520 100644 --- a/testplan/common/entity/base.py +++ b/testplan/common/entity/base.py @@ -3,6 +3,7 @@ configuration, start/stop/run/abort, create results and have some state. """ +import multiprocessing.pool as mp import os import signal import sys @@ -238,12 +239,11 @@ def start(self): else: resource.logger.info("%s started", resource) - def start_in_pool(self, pool): + def start_in_pool(self, pool: mp.ThreadPool): """ - Start all resources concurrently in thread pool. + Start all resources concurrently in thread pool and log exceptions. :param pool: thread pool - :type pool: ``ThreadPool`` """ for resource in self._resources.values(): @@ -253,24 +253,18 @@ def start_in_pool(self, pool): " its `async_start` attribute is set to False" ) - # Trigger start all resources - resources_to_wait_for = [] - for resource in self._resources.values(): - if not resource.auto_start: - continue + def start_in_thread(resource): + resource.start() + resource.wait(resource.STATUS.STARTED) + resource.logger.info("%s started", resource) - pool.apply_async( - self._log_exception( - resource, resource.start, self.start_exceptions - ) - ) - resources_to_wait_for.append(resource) - - # Wait resources status to be STARTED. - for resource in resources_to_wait_for: - if resource not in self.start_exceptions: - resource.wait(resource.STATUS.STARTED) - resource.logger.info("%s started", resource) + async_r = pool.map_async( + self._apply_resource_exception_logged( + self.start_exceptions, start_in_thread + ), + (r for r in self._resources.values() if r.auto_start), + ) + async_r.wait() def sync_stop_resource(self, resource: "Resource"): """ @@ -352,67 +346,58 @@ def stop(self, is_reversed: bool = False): resource.force_stop() resource.logger.info("%s stopped", resource) - def stop_in_pool(self, pool, is_reversed=False): + def stop_in_pool(self, pool: mp.ThreadPool): """ - Stop all resources in reverse order and log exceptions. + Stop all resources concurrently in thread pool and log exceptions. :param pool: thread pool - :type pool: ``ThreadPool`` - :param is_reversed: flag whether to stop resources in reverse order - :type is_reversed: ``bool`` """ - resources = list(self._resources.values()) - if is_reversed is True: - resources = resources[::-1] - - # Stop all resources - resources_to_wait_for = [] - for resource in resources: - if resource.status in ( - resource.STATUS.STOPPING, - resource.STATUS.STOPPED, - ): - continue - pool.apply_async( - self._log_exception( - resource, resource.stop, self.stop_exceptions + for resource in self._resources.values(): + if not resource.async_start: + raise RuntimeError( + f"Cannot stop resource {resource} in thread pool," + " its `async_start` attribute is set to False" ) - ) - resources_to_wait_for.append(resource) - # Wait resources status to be STOPPED. - for resource in resources_to_wait_for: - if resource not in self.stop_exceptions: - if resource.async_start: + def stop_in_thread(resource: "Resource"): + try: + if resource.status not in ( + resource.STATUS.STOPPING, + resource.STATUS.STOPPED, + ): + resource.stop() + if resource.status == resource.STATUS.STOPPING: resource.wait(resource.STATUS.STOPPED) - else: - # avoid post_stop being called twice - wait( - lambda: resource.status == resource.STATUS.STOPPED, - timeout=resource.cfg.status_wait_timeout, - ) - resource.logger.info("%s stopped", resource) - else: + except: # Resource status should be STOPPED even it failed to stop resource.force_stop() + raise + finally: + resource.logger.info("%s stopped", resource) - def _log_exception(self, resource, func, exception_record): + async_r = pool.map_async( + self._apply_resource_exception_logged( + self.stop_exceptions, stop_in_thread + ), + self._resources.values(), + ) + async_r.wait() + + def _apply_resource_exception_logged( + self, exception_record, func: Callable[["Resource"], None] + ) -> Callable[["Resource"], None]: """ - Decorator for logging an exception at resource and environment level. + Applying ``func`` to resource and log possible exception at environment level. - :param resource: resource to log the exception with - :type resource: :py:class:`~testplan.common.entity.base.Resource` - :param func: function to catch exception for - :type func: ``Callable`` :param exception_record: A dictionary that maps resource name to exception message during start or stop: `self.start_exception` for `start()` and `self.stop_exceptions` for `stop()`. - :type exception_record: ``dict`` + :param func: function to catch exception for """ - def wrapper(*args, **kargs): + def wrapper(resource: "Resource"): try: - func(*args, **kargs) + func(resource) except Exception: msg = "While executing {} of resource [{}]\n{}".format( func.__name__, resource.cfg.name, traceback.format_exc() From 247c7bbdd6fafbf2729fa0ec76c59dbbbc0ca326 Mon Sep 17 00:00:00 2001 From: zhenyu-ms <111329301+zhenyu-ms@users.noreply.github.com> Date: Mon, 24 Nov 2025 16:16:25 +0800 Subject: [PATCH 2/3] update impl; add tests --- testplan/common/entity/base.py | 74 +++--- testplan/common/remote/remote_resource.py | 4 +- testplan/runners/pools/remote.py | 12 +- .../common/entity/test_environment.py | 234 ++++++++++++++++++ 4 files changed, 278 insertions(+), 46 deletions(-) create mode 100644 tests/unit/testplan/common/entity/test_environment.py diff --git a/testplan/common/entity/base.py b/testplan/common/entity/base.py index f89aab520..8f41bfc97 100644 --- a/testplan/common/entity/base.py +++ b/testplan/common/entity/base.py @@ -12,6 +12,7 @@ import traceback from collections import OrderedDict, deque from contextlib import suppress +from multiprocessing import TimeoutError from typing import ( Any, Callable, @@ -34,7 +35,7 @@ from testplan.common.utils.path import default_runpath, makedirs, makeemptydirs from testplan.common.utils.strings import slugify, uuid4 from testplan.common.utils.thread import execute_as_thread, interruptible_join -from testplan.common.utils.timing import wait, Timer +from testplan.common.utils.timing import Timer, wait from testplan.common.utils.validation import is_subclass @@ -239,32 +240,32 @@ def start(self): else: resource.logger.info("%s started", resource) - def start_in_pool(self, pool: mp.ThreadPool): + def start_in_pool( + self, pool: mp.ThreadPool, timeout: Optional[float] = None + ): """ Start all resources concurrently in thread pool and log exceptions. :param pool: thread pool """ - - for resource in self._resources.values(): - if not resource.async_start: - raise RuntimeError( - f"Cannot start resource {resource} in thread pool," - " its `async_start` attribute is set to False" - ) - - def start_in_thread(resource): - resource.start() - resource.wait(resource.STATUS.STARTED) - resource.logger.info("%s started", resource) + # async_start is meaningless here... + for r in self._resources.values(): + if r.auto_start: + r.cfg.set_local("async_start", False) async_r = pool.map_async( self._apply_resource_exception_logged( - self.start_exceptions, start_in_thread + self.start_exceptions, lambda r: r.start() ), (r for r in self._resources.values() if r.auto_start), ) - async_r.wait() + try: + async_r.get(timeout) + except TimeoutError: + raise RuntimeError( + f"timeout after {timeout}s when starting resources " + f"{self._resources.values()} in internal threading pool." + ) def sync_stop_resource(self, resource: "Resource"): """ @@ -346,42 +347,33 @@ def stop(self, is_reversed: bool = False): resource.force_stop() resource.logger.info("%s stopped", resource) - def stop_in_pool(self, pool: mp.ThreadPool): + def stop_in_pool( + self, pool: mp.ThreadPool, timeout: Optional[float] = None + ): """ Stop all resources concurrently in thread pool and log exceptions. :param pool: thread pool """ - for resource in self._resources.values(): - if not resource.async_start: - raise RuntimeError( - f"Cannot stop resource {resource} in thread pool," - " its `async_start` attribute is set to False" - ) - - def stop_in_thread(resource: "Resource"): - try: - if resource.status not in ( - resource.STATUS.STOPPING, - resource.STATUS.STOPPED, - ): - resource.stop() - if resource.status == resource.STATUS.STOPPING: - resource.wait(resource.STATUS.STOPPED) - except: - # Resource status should be STOPPED even it failed to stop - resource.force_stop() - raise - finally: - resource.logger.info("%s stopped", resource) + # async_start is meaningless here... + # in practice, stop_in_pool must be called in pair of start_in_pool + for r in self._resources.values(): + if r.auto_start: + r.cfg.set_local("async_start", False) async_r = pool.map_async( self._apply_resource_exception_logged( - self.stop_exceptions, stop_in_thread + self.stop_exceptions, lambda r: r.stop() ), self._resources.values(), ) - async_r.wait() + try: + async_r.get(timeout) + except TimeoutError: + raise RuntimeError( + f"timeout after {timeout}s when stopping resources " + f"{self._resources.values()} in internal threading pool." + ) def _apply_resource_exception_logged( self, exception_record, func: Callable[["Resource"], None] diff --git a/testplan/common/remote/remote_resource.py b/testplan/common/remote/remote_resource.py index 10fbb2dff..536d55550 100644 --- a/testplan/common/remote/remote_resource.py +++ b/testplan/common/remote/remote_resource.py @@ -143,7 +143,7 @@ class RemoteResource(Entity): :param paramiko_config: Paramiko SSH client extra configuration. :param remote_runtime_builder: RuntimeBuilder instance to prepare remote python env. Default is ``SourceTransferBuilder()``. - :param status_wait_timeout: remote resource start/stop timeout, default is 60. + :param status_wait_timeout: remote resource start/stop timeout, default is 600. """ CONFIG = RemoteResourceConfig @@ -170,7 +170,7 @@ def __init__( setup_script: List[str] = None, paramiko_config: Optional[dict] = None, remote_runtime_builder: Optional[RuntimeBuilder] = None, - status_wait_timeout: int = 60, + status_wait_timeout: int = 600, **options, ) -> None: if not worker_is_remote(remote_host): diff --git a/testplan/runners/pools/remote.py b/testplan/runners/pools/remote.py index e4d991fe2..d730fdfaa 100644 --- a/testplan/runners/pools/remote.py +++ b/testplan/runners/pools/remote.py @@ -364,20 +364,26 @@ def _start_workers(self) -> None: for worker in self._workers: self._conn.register(worker) if self.pool: - self._workers.start_in_pool(self.pool) + self._workers.start_in_pool( + self.pool, + self.cfg.status_wait_timeout, + ) else: self._workers.start() def _stop_workers(self) -> None: if self.pool: - self._workers.stop_in_pool(self.pool) + self._workers.stop_in_pool( + self.pool, + self.cfg.status_wait_timeout, + ) else: self._workers.stop() def _start_thread_pool(self) -> None: size = len(self._instances) try: - if size > 2: + if size >= 2: self.pool = ThreadPool(min(size, cpu_count())) except Exception as exc: if isinstance(exc, AttributeError): diff --git a/tests/unit/testplan/common/entity/test_environment.py b/tests/unit/testplan/common/entity/test_environment.py new file mode 100644 index 000000000..f37e253f2 --- /dev/null +++ b/tests/unit/testplan/common/entity/test_environment.py @@ -0,0 +1,234 @@ +import multiprocessing.pool as mp +import time +from testplan.common.entity.base import Resource, Environment + +import pytest + + +@pytest.fixture +def environment(): + return Environment() + + +@pytest.fixture +def pool(): + return mp.ThreadPool() + + +class DummyResource(Resource): + def __init__( + self, + starting=None, + stopping=None, + wait_started=None, + wait_stopped=None, + **options, + ): + # other ops should exist in cfg + super().__init__(**options) + self._c_starting = starting or (lambda: None) + self._c_stopping = stopping or (lambda: None) + self._c_wait_started = wait_started or (lambda: None) + self._c_wait_stopped = wait_stopped or (lambda: None) + + def starting(self): + self._c_starting() + + def stopping(self): + self._c_stopping() + + def _wait_started(self, timeout=None): + self._c_wait_started() + super()._wait_started(timeout) + + def _wait_stopped(self, timeout=None): + self._c_wait_stopped() + super()._wait_stopped(timeout) + + +class TestConcurrentResourceOps: + def test_basic(self, environment, pool, mocker): + pre, post = mocker.Mock(), mocker.Mock() + environment.add( + DummyResource( + pre_start=pre, + wait_started=lambda: time.sleep(1), + post_start=post, + pre_stop=pre, + wait_stopped=lambda: time.sleep(1), + post_stop=post, + ) + ) + environment.add( + DummyResource( + pre_start=pre, + wait_started=lambda: time.sleep(1), + post_start=post, + pre_stop=pre, + wait_stopped=lambda: time.sleep(1), + post_stop=post, + ) + ) + a = time.time() + environment.start_in_pool(pool) + b = time.time() + + assert b - a < 2, "resources didn't start concurrently" + assert pre.call_count == 2, "pre-hooks not invoked" + assert post.call_count == 2, "post-hooks not invoked" + + c = time.time() + environment.stop_in_pool(pool) + d = time.time() + + assert d - c < 2, "resources didn't stop concurrently" + assert pre.call_count == 4, "pre-hooks not invoked" + assert post.call_count == 4, "post-hooks not invoked" + + def test_long_pre(self, environment, pool, mocker): + wait, post = mocker.Mock(), mocker.Mock() + environment.add( + DummyResource( + pre_start=lambda _: time.sleep(1.5), + wait_started=wait, + post_start=post, + status_wait_timeout=1, + ) + ) + environment.add( + DummyResource( + pre_start=lambda _: time.sleep(1.5), + wait_started=wait, + post_start=post, + status_wait_timeout=1, + ) + ) + a = time.time() + environment.start_in_pool(pool) + b = time.time() + environment.stop_in_pool(pool) + + assert b - a < 3, "resources didn't start concurrently" + assert wait.call_count == 2, "wait-hooks not invoked" + assert post.call_count == 2, "post-hooks not invoked" + + def test_long_post(self, environment, pool, mocker): + wait, pre = mocker.Mock(), mocker.Mock() + environment.add( + DummyResource( + post_stop=lambda _: time.sleep(1.5), + wait_stopped=wait, + pre_stop=pre, + status_wait_timeout=1, + ) + ) + environment.add( + DummyResource( + post_stop=lambda _: time.sleep(1.5), + wait_stopped=wait, + pre_stop=pre, + status_wait_timeout=1, + ) + ) + environment.start_in_pool(pool) + a = time.time() + environment.stop_in_pool(pool) + b = time.time() + + assert b - a < 3, "resources didn't stop concurrently" + assert wait.call_count == 2, "wait-hooks not invoked" + assert pre.call_count == 2, "pre-hooks not invoked" + + def test_manual_resource(self, environment, pool, mocker): + pre, wait, post = mocker.Mock(), mocker.Mock(), mocker.Mock() + m = DummyResource( + pre_start=pre, + wait_started=wait, + post_start=post, + pre_stop=pre, + wait_stopped=wait, + post_stop=post, + auto_start=False, + ) + environment.add(m) + environment.add( + DummyResource( + pre_start=lambda _: time.sleep(1), + ) + ) + environment.add( + DummyResource( + post_start=lambda _: time.sleep(1), + ) + ) + a = time.time() + environment.start_in_pool(pool) + b = time.time() + assert b - a < 2, "resources didn't start concurrently" + assert not len(environment.start_exceptions) + + m.start() + m.wait(m.STATUS.STARTED) + m.stop() + m.wait(m.STATUS.STOPPED) + + assert pre.call_count == 2, "pre-hooks not invoked" + assert wait.call_count == 2, "wait-hooks not invoked" + assert post.call_count == 2, "post-hooks not invoked" + + environment.stop_in_pool(pool) + assert not len(environment.stop_exceptions) + + def test_op_timeout(self, environment, pool, mocker): + environment.add( + DummyResource( + pre_stop=lambda _: time.sleep(2), + ) + ) + environment.add( + DummyResource( + pre_start=lambda _: time.sleep(2), + ) + ) + with pytest.raises( + RuntimeError, match="timeout after 0.2s when starting" + ): + environment.start_in_pool(pool, timeout=0.2) + with pytest.raises( + RuntimeError, match="timeout after 0.2s when stopping" + ): + environment.stop_in_pool(pool, timeout=0.2) + + def test_nested_pool(self, environment, pool, mocker): + wait, post = mocker.Mock(), mocker.Mock() + with mp.ThreadPool() as pool: + + def _nested(_): + environment.add( + DummyResource( + pre_stop=lambda _: time.sleep(1), + wait_stopped=wait, + post_stop=post, + ) + ) + environment.add( + DummyResource( + pre_stop=lambda _: time.sleep(1), + wait_stopped=wait, + post_stop=post, + ) + ) + environment.start_in_pool(pool) + a = time.time() + environment.stop_in_pool(pool) + b = time.time() + + assert b - a < 2, "resources didn't start concurrently" + + r = pool.map_async( + _nested, + [None, None], + ) + assert r.get() == [None, None], "nested pool execution failed" + assert wait.call_count == 4, "wait-hooks not invoked" + assert post.call_count == 4, "post-hooks not invoked" From 58e8e86075f06001bda1a012d59614819a3a0931 Mon Sep 17 00:00:00 2001 From: zhenyu-ms <111329301+zhenyu-ms@users.noreply.github.com> Date: Mon, 24 Nov 2025 16:47:40 +0800 Subject: [PATCH 3/3] update test --- .../common/entity/test_environment.py | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/tests/unit/testplan/common/entity/test_environment.py b/tests/unit/testplan/common/entity/test_environment.py index f37e253f2..64dc3c024 100644 --- a/tests/unit/testplan/common/entity/test_environment.py +++ b/tests/unit/testplan/common/entity/test_environment.py @@ -1,9 +1,18 @@ import multiprocessing.pool as mp +import os +import sys import time -from testplan.common.entity.base import Resource, Environment import pytest +from testplan.common.entity.base import Environment, Resource + +# Python 3.13+ uses process_cpu_count() for ThreadPool sizing +if sys.version_info >= (3, 13): + CPU_COUNT = os.process_cpu_count() or 1 +else: + CPU_COUNT = os.cpu_count() or 1 + @pytest.fixture def environment(): @@ -12,7 +21,7 @@ def environment(): @pytest.fixture def pool(): - return mp.ThreadPool() + return mp.ThreadPool(CPU_COUNT) class DummyResource(Resource): @@ -73,7 +82,8 @@ def test_basic(self, environment, pool, mocker): environment.start_in_pool(pool) b = time.time() - assert b - a < 2, "resources didn't start concurrently" + if CPU_COUNT >= 2: + assert b - a < 2, "resources didn't start concurrently" assert pre.call_count == 2, "pre-hooks not invoked" assert post.call_count == 2, "post-hooks not invoked" @@ -81,7 +91,8 @@ def test_basic(self, environment, pool, mocker): environment.stop_in_pool(pool) d = time.time() - assert d - c < 2, "resources didn't stop concurrently" + if CPU_COUNT >= 2: + assert d - c < 2, "resources didn't stop concurrently" assert pre.call_count == 4, "pre-hooks not invoked" assert post.call_count == 4, "post-hooks not invoked" @@ -108,7 +119,8 @@ def test_long_pre(self, environment, pool, mocker): b = time.time() environment.stop_in_pool(pool) - assert b - a < 3, "resources didn't start concurrently" + if CPU_COUNT >= 2: + assert b - a < 3, "resources didn't start concurrently" assert wait.call_count == 2, "wait-hooks not invoked" assert post.call_count == 2, "post-hooks not invoked" @@ -135,7 +147,8 @@ def test_long_post(self, environment, pool, mocker): environment.stop_in_pool(pool) b = time.time() - assert b - a < 3, "resources didn't stop concurrently" + if CPU_COUNT >= 2: + assert b - a < 3, "resources didn't stop concurrently" assert wait.call_count == 2, "wait-hooks not invoked" assert pre.call_count == 2, "pre-hooks not invoked" @@ -164,7 +177,8 @@ def test_manual_resource(self, environment, pool, mocker): a = time.time() environment.start_in_pool(pool) b = time.time() - assert b - a < 2, "resources didn't start concurrently" + if CPU_COUNT >= 2: + assert b - a < 2, "resources didn't start concurrently" assert not len(environment.start_exceptions) m.start() @@ -199,11 +213,13 @@ def test_op_timeout(self, environment, pool, mocker): ): environment.stop_in_pool(pool, timeout=0.2) - def test_nested_pool(self, environment, pool, mocker): + def test_nested_pool(self, mocker): wait, post = mocker.Mock(), mocker.Mock() - with mp.ThreadPool() as pool: + with mp.ThreadPool(CPU_COUNT) as another_pool: def _nested(_): + environment = Environment() + pool = mp.ThreadPool(CPU_COUNT) environment.add( DummyResource( pre_stop=lambda _: time.sleep(1), @@ -223,9 +239,10 @@ def _nested(_): environment.stop_in_pool(pool) b = time.time() - assert b - a < 2, "resources didn't start concurrently" + if CPU_COUNT >= 4: + assert b - a < 4, "resources didn't start concurrently" - r = pool.map_async( + r = another_pool.map_async( _nested, [None, None], )