diff --git a/aiida/engine/daemon/client.py b/aiida/engine/daemon/client.py index 5f63d7a01b..c5314e7936 100644 --- a/aiida/engine/daemon/client.py +++ b/aiida/engine/daemon/client.py @@ -95,7 +95,7 @@ def __init__(self, profile: Profile): config = get_config() self._profile = profile self._socket_directory: str | None = None - self._daemon_timeout: int = config.get_option('daemon.timeout') + self._daemon_timeout: int = config.get_option('daemon.timeout', scope=profile.name) @property def profile(self) -> Profile: @@ -380,23 +380,25 @@ def get_tcp_endpoint(self, port=None): return endpoint @contextlib.contextmanager - def get_client(self) -> 'CircusClient': + def get_client(self, timeout: int | None = None) -> 'CircusClient': """Return an instance of the CircusClient. The endpoint is defined by the controller endpoint, which used the port that was written to the port file upon starting of the daemon. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. :return: CircusClient instance """ from circus.client import CircusClient try: - client = CircusClient(endpoint=self.get_controller_endpoint(), timeout=self._daemon_timeout) + client = CircusClient(endpoint=self.get_controller_endpoint(), timeout=timeout or self._daemon_timeout) yield client finally: client.stop() - def call_client(self, command: dict[str, t.Any]) -> dict[str, t.Any]: + def call_client(self, command: dict[str, t.Any], timeout: int | None = None) -> dict[str, t.Any]: """Call the client with a specific command. Will check whether the daemon is running first by checking for the pid file. When the pid is found yet the call @@ -404,6 +406,8 @@ def call_client(self, command: dict[str, t.Any]) -> dict[str, t.Any]: causing the pid file to not be cleaned up properly. :param command: Command to call the circus client with. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. :return: The result of the circus client call. :raises DaemonException: If the daemon is not running or cannot be reached. :raises DaemonTimeoutException: If the connection to the daemon timed out. @@ -412,7 +416,7 @@ def call_client(self, command: dict[str, t.Any]) -> dict[str, t.Any]: from circus.exc import CallError try: - with self.get_client() as client: + with self.get_client(timeout=timeout) as client: result = client.call(command) except CallError as exception: if self.get_daemon_pid() is None: @@ -431,117 +435,135 @@ def call_client(self, command: dict[str, t.Any]) -> dict[str, t.Any]: return result - def get_status(self) -> dict[str, t.Any]: + def get_status(self, timeout: int | None = None) -> dict[str, t.Any]: """Return the status of the daemon. - :returns: Instance of ``DaemonStatus``. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. + :returns: The client call response. If successful, will contain 'pid' key. """ command = {'command': 'status', 'properties': {'name': self.daemon_name}} - response = self.call_client(command) + response = self.call_client(command, timeout=timeout) response['pid'] = self.get_daemon_pid() return response - def get_numprocesses(self) -> dict[str, t.Any]: + def get_numprocesses(self, timeout: int | None = None) -> dict[str, t.Any]: """Get the number of running daemon processes. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. :return: The client call response. If successful, will contain 'numprocesses' key. """ command = {'command': 'numprocesses', 'properties': {'name': self.daemon_name}} - return self.call_client(command) + return self.call_client(command, timeout=timeout) - def get_worker_info(self) -> dict[str, t.Any]: + def get_worker_info(self, timeout: int | None = None) -> dict[str, t.Any]: """Get workers statistics for this daemon. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. :return: The client call response. If successful, will contain 'info' key. """ command = {'command': 'stats', 'properties': {'name': self.daemon_name}} - return self.call_client(command) + return self.call_client(command, timeout=timeout) - def get_daemon_info(self) -> dict[str, t.Any]: + def get_daemon_info(self, timeout: int | None = None) -> dict[str, t.Any]: """Get statistics about this daemon itself. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. :return: The client call response. If successful, will contain 'info' key. """ command = {'command': 'dstats', 'properties': {}} - return self.call_client(command) + return self.call_client(command, timeout=timeout) - def increase_workers(self, number: int) -> dict[str, t.Any]: + def increase_workers(self, number: int, timeout: int | None = None) -> dict[str, t.Any]: """Increase the number of workers. :param number: The number of workers to add. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. :return: The client call response. """ command = {'command': 'incr', 'properties': {'name': self.daemon_name, 'nb': number}} - return self.call_client(command) + return self.call_client(command, timeout=timeout) - def decrease_workers(self, number: int) -> dict[str, t.Any]: + def decrease_workers(self, number: int, timeout: int | None = None) -> dict[str, t.Any]: """Decrease the number of workers. :param number: The number of workers to remove. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. :return: The client call response. """ command = {'command': 'decr', 'properties': {'name': self.daemon_name, 'nb': number}} - return self.call_client(command) + return self.call_client(command, timeout=timeout) - def stop_daemon(self, wait: bool = True, timeout: int = 5) -> dict[str, t.Any]: - """Stop the daemon. + def start_daemon( + self, number_workers: int = 1, foreground: bool = False, wait: bool = True, timeout: int | None = None + ) -> None: + """Start the daemon in a sub process running in the background. + :param number_workers: Number of daemon workers to start. + :param foreground: Whether to launch the subprocess in the background or not. :param wait: Boolean to indicate whether to wait for the result of the command. - :param timeout: Wait this number of seconds for the ``is_daemon_running`` to return ``False`` before raising. - :return: The client call response. - :raises DaemonException: If ``is_daemon_running`` returns ``True`` after the ``timeout`` has passed. + :param timeout: Optional timeout to set for trying to reach the circus daemon after the subprocess has started. + Default is set on the client upon instantiation taken from the ``daemon.timeout`` config option. + :raises DaemonException: If the command to start the daemon subprocess excepts. + :raises DaemonTimeoutException: If the daemon starts but then is unresponsive or in an unexpected state. """ - command = {'command': 'quit', 'properties': {'waiting': wait}} - response = self.call_client(command) + self._clean_potentially_stale_pid_file() - if self._ENDPOINT_PROTOCOL == ControllerProtocol.IPC: - self.delete_circus_socket_directory() + env = self.get_env() + command = self.cmd_start_daemon(number_workers, foreground) + timeout = timeout or self._daemon_timeout + + try: + subprocess.check_output(command, env=env, stderr=subprocess.STDOUT) # pylint: disable=unexpected-keyword-arg + except subprocess.CalledProcessError as exception: + raise DaemonException('The daemon failed to start.') from exception if not wait: - return response + return self._await_condition( - lambda: not self.is_daemon_running, - DaemonException(f'The daemon failed to stop within {timeout} seconds.'), + lambda: self.is_daemon_running, + DaemonTimeoutException(f'The daemon failed to start or is unresponsive after {timeout} seconds.'), timeout=timeout, ) - return response - - def restart_daemon(self, wait: bool) -> dict[str, t.Any]: + def restart_daemon(self, wait: bool = True, timeout: int | None = None) -> dict[str, t.Any]: """Restart the daemon. :param wait: Boolean to indicate whether to wait for the result of the command. - :return: The client call response. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. + :returns: The client call response. + :raises DaemonException: If the daemon is not running or cannot be reached. + :raises DaemonTimeoutException: If the connection to the daemon timed out. + :raises DaemonException: If the connection to the daemon failed for any other reason. """ command = {'command': 'restart', 'properties': {'name': self.daemon_name, 'waiting': wait}} - return self.call_client(command) + return self.call_client(command, timeout=timeout) - def start_daemon(self, number_workers: int = 1, foreground: bool = False, timeout: int = 5) -> None: - """Start the daemon in a sub process running in the background. + def stop_daemon(self, wait: bool = True, timeout: int | None = None) -> dict[str, t.Any]: + """Stop the daemon. - :param number_workers: Number of daemon workers to start. - :param foreground: Whether to launch the subprocess in the background or not. - :param timeout: Wait this number of seconds for the ``is_daemon_running`` to return ``True`` before raising. - :raises DaemonException: If the daemon fails to start. - :raises DaemonException: If the daemon starts but then is unresponsive or in an unexpected state. - :raises DaemonException: If ``is_daemon_running`` returns ``False`` after the ``timeout`` has passed. + :param wait: Boolean to indicate whether to wait for the result of the command. + :param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon + instantiation taken from the ``daemon.timeout`` config option. + :returns: The client call response. + :raises DaemonException: If the daemon is not running or cannot be reached. + :raises DaemonTimeoutException: If the connection to the daemon timed out. + :raises DaemonException: If the connection to the daemon failed for any other reason. """ - self._clean_potentially_stale_pid_file() - - env = self.get_env() - command = self.cmd_start_daemon(number_workers, foreground) + command = {'command': 'quit', 'properties': {'waiting': wait}} + response = self.call_client(command, timeout=timeout) - try: - subprocess.check_output(command, env=env, stderr=subprocess.STDOUT) # pylint: disable=unexpected-keyword-arg - except subprocess.CalledProcessError as exception: - raise DaemonException('The daemon failed to start.') from exception + if self._ENDPOINT_PROTOCOL == ControllerProtocol.IPC: + self.delete_circus_socket_directory() - self._await_condition( - lambda: self.is_daemon_running, - DaemonException(f'The daemon failed to start within {timeout} seconds.'), - timeout=timeout, - ) + return response def _clean_potentially_stale_pid_file(self) -> None: """Check the daemon PID file and delete it if it is likely to be stale.""" diff --git a/aiida/manage/configuration/schema/config-v9.schema.json b/aiida/manage/configuration/schema/config-v9.schema.json index 1a253bde63..1084b052b2 100644 --- a/aiida/manage/configuration/schema/config-v9.schema.json +++ b/aiida/manage/configuration/schema/config-v9.schema.json @@ -20,9 +20,9 @@ }, "daemon.timeout": { "type": "integer", - "default": 5, + "default": 2, "minimum": 0, - "description": "Timeout in seconds for calls to the circus client" + "description": "Used to set default timeout in the :class:`aiida.engine.daemon.client.DaemonClient` for calls to the daemon" }, "daemon.worker_process_slots": { "type": "integer", diff --git a/aiida/manage/tests/pytest_fixtures.py b/aiida/manage/tests/pytest_fixtures.py index 23c731b418..3129662653 100644 --- a/aiida/manage/tests/pytest_fixtures.py +++ b/aiida/manage/tests/pytest_fixtures.py @@ -43,7 +43,7 @@ from aiida.common.log import AIIDA_LOGGER from aiida.common.warnings import warn_deprecation from aiida.engine import Process, ProcessBuilder, submit -from aiida.engine.daemon.client import DaemonClient, DaemonNotRunningException +from aiida.engine.daemon.client import DaemonClient, DaemonNotRunningException, DaemonTimeoutException from aiida.manage import Config, Profile, get_manager, get_profile from aiida.manage.manager import Manager from aiida.orm import Computer, ProcessNode, User @@ -680,7 +680,13 @@ def stopped_daemon_client(daemon_client): """Ensure that the daemon is not running for the test profile and return the associated client.""" if daemon_client.is_daemon_running: daemon_client.stop_daemon(wait=True) - assert not daemon_client.is_daemon_running + # Give an additional grace period by manually waiting for the daemon to be stopped. In certain unit test + # scenarios, the built in wait time in ``daemon_client.stop_daemon`` is not sufficient and even though the + # daemon is stopped, ``daemon_client.is_daemon_running`` will return false for a little bit longer. + daemon_client._await_condition( # pylint: disable=protected-access + lambda: not daemon_client.is_daemon_running, + DaemonTimeoutException('The daemon failed to stop.'), + ) yield daemon_client diff --git a/docs/source/conf.py b/docs/source/conf.py index 4aa0110ec5..1263eeb4b3 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -47,7 +47,6 @@ 'internals/global_design.rst', 'internals/orm.rst', 'scheduler/index.rst', - 'topics/daemon.rst', 'working_with_aiida/**', ] diff --git a/docs/source/topics/daemon.rst b/docs/source/topics/daemon.rst index 9af3f97d72..bdf43356e5 100644 --- a/docs/source/topics/daemon.rst +++ b/docs/source/topics/daemon.rst @@ -1,12 +1,46 @@ -.. todo:: +.. _topics:daemon: - .. _topics:daemon: +****** +Daemon +****** - ****** - Daemon - ****** +AiiDA provides a daemon process that runs in the background which handles any new processes (i.e., calculations and workflows, see :ref:`process concepts `) that are submitted. +Unlike when running a process, which blocks the current Python interpreter (see the :ref:`launching ` section for details on the difference between *run* and *submit*), the daemon can handle multiple processes asynchronously. - `#4016`_ +The daemon concept in AiiDA consists of multiple *system processes*. +.. note:: -.. _#4016: https://github.com/aiidateam/aiida-core/issues/4016 + System processes, here, refers to processes that are run by the operating system, not to the AiiDA specific collective term for all calculations and workflows. + +When the daemon is started, a single system process is launched in the background that runs indefinitely until it is stopped. +This daemonized process is responsible for launching and then monitoring one or multiple daemon *workers*. +Each daemon worker is another system process that connects to RabbitMQ to retrieve calculations and workflows that have been submitted and run them to completion. +If a daemon worker dies, the daemon will automatically revive it. +When the daemon is requested to stop, it will send a signal to all workers to shut them down before shutting down itself. + +In summary: AiiDA's daemon consists of a single system process running in the background (the daemon) that manages one or more system processes that handle all submitted calculations and workflows (the daemon workers). + + +.. _topics:daemon:client: + +====== +Client +====== + +The Python API provides the :class:`~aiida.engine.daemon.client.DaemonClient` class to interact with the daemon. +It can either be constructed directly for a given profile, or the :func:`aiida.engine.daemon.client.get_daemon_client` function can be used to construct it for the current default profile. +The main methods of interest for interacting with the daemon are: + +* :meth:`~aiida.engine.daemon.client.DaemonClient.start_daemon` +* :meth:`~aiida.engine.daemon.client.DaemonClient.restart_daemon` +* :meth:`~aiida.engine.daemon.client.DaemonClient.stop_daemon` +* :meth:`~aiida.engine.daemon.client.DaemonClient.get_status` + +These methods will raise a :class:`~aiida.engine.daemon.client.DaemonException` if the daemon fails to start or calls to it fail. +All methods accept a ``timeout`` argument, which is the number of seconds the client should wait for the daemon process to respond, before raising a :class:`~aiida.engine.daemon.client.DaemonTimeoutException`. +The default for the ``timeout`` is taken from the ``daemon.timeout`` configuration option and is set when constructing the :class:`~aiida.engine.daemon.client.DaemonClient`. + +.. note:: + + The ``DaemonClient`` only directly interacts with the main daemon process, not with any of the daemon workers that it manages. diff --git a/docs/source/topics/index.rst b/docs/source/topics/index.rst index f74f2ebb22..46d80b482a 100644 --- a/docs/source/topics/index.rst +++ b/docs/source/topics/index.rst @@ -10,6 +10,7 @@ Topics calculations/index workflows/index provenance/index + daemon data_types database repository @@ -17,7 +18,3 @@ Topics schedulers transport performance - -.. todo:: - - daemon // after provenance