Skip to content

Commit

Permalink
DaemonClient: Fix and homogenize use of timeout in client calls (#…
Browse files Browse the repository at this point in the history
…5960)

The `DaemonClient` provides a number of methods that will attempt to
communicate with the daemon process, e.g., `stop_daemon` and `get_status`.
This is done through the `CircusClient`, as the main daemon process is
the circus daemonizer, which takes a timeout as argument indicating the
number of seconds after which the call should raise if the daemon did
not respond in time.

The default timeout is set in the constructor of the `DaemonClient`
based on the `daemon.timeout` config option. It was incorrectly getting
the global value instead of the profile specific one, which is corrected
by using `config.get_option('daemon.timeout', scope=profile.name)` to
fetch the conifg option.

A `timeout` argument is added to all `DaemonClient` methods that call
through to the `CircusClient` that can be used to override the default
that is based on the `daemon.timeout` config option.

A default is added for `wait` in the `restart_daemon` method, to
homogenize its interface with `start_daemon` and `stop_daemon`. The
manual timeout cycle in `stop_daemon` by calling `_await_condition` has
been removed as this functionality is already performed by the circus
client itself and so is superfluous. The only place it is still used is
in `start_daemon` because there the circus client is not used, since the
circus process is not actually running yet, and a "manual" health check
needs to be performed after the daemon process is launched. The manual
check is added to the `stop_daemon` fixture since the check in certain
unit test scenarios could give a false positive without an additional
manual grace period for `is_daemon_running` to start returning `False`.

The default for the `daemon.timeout` configuration option is decreased
to 2 seconds, as this should be sufficient for most conditions for the
circus daemon process to respond. Note that this daemon process is only
charged with monitoring the daemon workers and so won't be under heavy
load that will prevent it from responding in time.
  • Loading branch information
sphuber authored Apr 14, 2023
1 parent b26c2d4 commit 30c7f44
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 72 deletions.
134 changes: 78 additions & 56 deletions aiida/engine/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(self, profile: Profile):
config = get_config()
self._profile = profile
self._socket_directory: str | None = None
self._daemon_timeout: int = config.get_option('daemon.timeout')
self._daemon_timeout: int = config.get_option('daemon.timeout', scope=profile.name)

@property
def profile(self) -> Profile:
Expand Down Expand Up @@ -380,30 +380,34 @@ def get_tcp_endpoint(self, port=None):
return endpoint

@contextlib.contextmanager
def get_client(self) -> 'CircusClient':
def get_client(self, timeout: int | None = None) -> 'CircusClient':
"""Return an instance of the CircusClient.
The endpoint is defined by the controller endpoint, which used the port that was written to the port file upon
starting of the daemon.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:return: CircusClient instance
"""
from circus.client import CircusClient

try:
client = CircusClient(endpoint=self.get_controller_endpoint(), timeout=self._daemon_timeout)
client = CircusClient(endpoint=self.get_controller_endpoint(), timeout=timeout or self._daemon_timeout)
yield client
finally:
client.stop()

def call_client(self, command: dict[str, t.Any]) -> dict[str, t.Any]:
def call_client(self, command: dict[str, t.Any], timeout: int | None = None) -> dict[str, t.Any]:
"""Call the client with a specific command.
Will check whether the daemon is running first by checking for the pid file. When the pid is found yet the call
still fails with a timeout, this means the daemon was actually not running and it was terminated unexpectedly
causing the pid file to not be cleaned up properly.
:param command: Command to call the circus client with.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:return: The result of the circus client call.
:raises DaemonException: If the daemon is not running or cannot be reached.
:raises DaemonTimeoutException: If the connection to the daemon timed out.
Expand All @@ -412,7 +416,7 @@ def call_client(self, command: dict[str, t.Any]) -> dict[str, t.Any]:
from circus.exc import CallError

try:
with self.get_client() as client:
with self.get_client(timeout=timeout) as client:
result = client.call(command)
except CallError as exception:
if self.get_daemon_pid() is None:
Expand All @@ -431,117 +435,135 @@ def call_client(self, command: dict[str, t.Any]) -> dict[str, t.Any]:

return result

def get_status(self) -> dict[str, t.Any]:
def get_status(self, timeout: int | None = None) -> dict[str, t.Any]:
"""Return the status of the daemon.
:returns: Instance of ``DaemonStatus``.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:returns: The client call response. If successful, will contain 'pid' key.
"""
command = {'command': 'status', 'properties': {'name': self.daemon_name}}
response = self.call_client(command)
response = self.call_client(command, timeout=timeout)
response['pid'] = self.get_daemon_pid()
return response

def get_numprocesses(self) -> dict[str, t.Any]:
def get_numprocesses(self, timeout: int | None = None) -> dict[str, t.Any]:
"""Get the number of running daemon processes.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:return: The client call response. If successful, will contain 'numprocesses' key.
"""
command = {'command': 'numprocesses', 'properties': {'name': self.daemon_name}}
return self.call_client(command)
return self.call_client(command, timeout=timeout)

def get_worker_info(self) -> dict[str, t.Any]:
def get_worker_info(self, timeout: int | None = None) -> dict[str, t.Any]:
"""Get workers statistics for this daemon.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:return: The client call response. If successful, will contain 'info' key.
"""
command = {'command': 'stats', 'properties': {'name': self.daemon_name}}
return self.call_client(command)
return self.call_client(command, timeout=timeout)

def get_daemon_info(self) -> dict[str, t.Any]:
def get_daemon_info(self, timeout: int | None = None) -> dict[str, t.Any]:
"""Get statistics about this daemon itself.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:return: The client call response. If successful, will contain 'info' key.
"""
command = {'command': 'dstats', 'properties': {}}
return self.call_client(command)
return self.call_client(command, timeout=timeout)

def increase_workers(self, number: int) -> dict[str, t.Any]:
def increase_workers(self, number: int, timeout: int | None = None) -> dict[str, t.Any]:
"""Increase the number of workers.
:param number: The number of workers to add.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:return: The client call response.
"""
command = {'command': 'incr', 'properties': {'name': self.daemon_name, 'nb': number}}
return self.call_client(command)
return self.call_client(command, timeout=timeout)

def decrease_workers(self, number: int) -> dict[str, t.Any]:
def decrease_workers(self, number: int, timeout: int | None = None) -> dict[str, t.Any]:
"""Decrease the number of workers.
:param number: The number of workers to remove.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:return: The client call response.
"""
command = {'command': 'decr', 'properties': {'name': self.daemon_name, 'nb': number}}
return self.call_client(command)
return self.call_client(command, timeout=timeout)

def stop_daemon(self, wait: bool = True, timeout: int = 5) -> dict[str, t.Any]:
"""Stop the daemon.
def start_daemon(
self, number_workers: int = 1, foreground: bool = False, wait: bool = True, timeout: int | None = None
) -> None:
"""Start the daemon in a sub process running in the background.
:param number_workers: Number of daemon workers to start.
:param foreground: Whether to launch the subprocess in the background or not.
:param wait: Boolean to indicate whether to wait for the result of the command.
:param timeout: Wait this number of seconds for the ``is_daemon_running`` to return ``False`` before raising.
:return: The client call response.
:raises DaemonException: If ``is_daemon_running`` returns ``True`` after the ``timeout`` has passed.
:param timeout: Optional timeout to set for trying to reach the circus daemon after the subprocess has started.
Default is set on the client upon instantiation taken from the ``daemon.timeout`` config option.
:raises DaemonException: If the command to start the daemon subprocess excepts.
:raises DaemonTimeoutException: If the daemon starts but then is unresponsive or in an unexpected state.
"""
command = {'command': 'quit', 'properties': {'waiting': wait}}
response = self.call_client(command)
self._clean_potentially_stale_pid_file()

if self._ENDPOINT_PROTOCOL == ControllerProtocol.IPC:
self.delete_circus_socket_directory()
env = self.get_env()
command = self.cmd_start_daemon(number_workers, foreground)
timeout = timeout or self._daemon_timeout

try:
subprocess.check_output(command, env=env, stderr=subprocess.STDOUT) # pylint: disable=unexpected-keyword-arg
except subprocess.CalledProcessError as exception:
raise DaemonException('The daemon failed to start.') from exception

if not wait:
return response
return

self._await_condition(
lambda: not self.is_daemon_running,
DaemonException(f'The daemon failed to stop within {timeout} seconds.'),
lambda: self.is_daemon_running,
DaemonTimeoutException(f'The daemon failed to start or is unresponsive after {timeout} seconds.'),
timeout=timeout,
)

return response

def restart_daemon(self, wait: bool) -> dict[str, t.Any]:
def restart_daemon(self, wait: bool = True, timeout: int | None = None) -> dict[str, t.Any]:
"""Restart the daemon.
:param wait: Boolean to indicate whether to wait for the result of the command.
:return: The client call response.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:returns: The client call response.
:raises DaemonException: If the daemon is not running or cannot be reached.
:raises DaemonTimeoutException: If the connection to the daemon timed out.
:raises DaemonException: If the connection to the daemon failed for any other reason.
"""
command = {'command': 'restart', 'properties': {'name': self.daemon_name, 'waiting': wait}}
return self.call_client(command)
return self.call_client(command, timeout=timeout)

def start_daemon(self, number_workers: int = 1, foreground: bool = False, timeout: int = 5) -> None:
"""Start the daemon in a sub process running in the background.
def stop_daemon(self, wait: bool = True, timeout: int | None = None) -> dict[str, t.Any]:
"""Stop the daemon.
:param number_workers: Number of daemon workers to start.
:param foreground: Whether to launch the subprocess in the background or not.
:param timeout: Wait this number of seconds for the ``is_daemon_running`` to return ``True`` before raising.
:raises DaemonException: If the daemon fails to start.
:raises DaemonException: If the daemon starts but then is unresponsive or in an unexpected state.
:raises DaemonException: If ``is_daemon_running`` returns ``False`` after the ``timeout`` has passed.
:param wait: Boolean to indicate whether to wait for the result of the command.
:param timeout: Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon
instantiation taken from the ``daemon.timeout`` config option.
:returns: The client call response.
:raises DaemonException: If the daemon is not running or cannot be reached.
:raises DaemonTimeoutException: If the connection to the daemon timed out.
:raises DaemonException: If the connection to the daemon failed for any other reason.
"""
self._clean_potentially_stale_pid_file()

env = self.get_env()
command = self.cmd_start_daemon(number_workers, foreground)
command = {'command': 'quit', 'properties': {'waiting': wait}}
response = self.call_client(command, timeout=timeout)

try:
subprocess.check_output(command, env=env, stderr=subprocess.STDOUT) # pylint: disable=unexpected-keyword-arg
except subprocess.CalledProcessError as exception:
raise DaemonException('The daemon failed to start.') from exception
if self._ENDPOINT_PROTOCOL == ControllerProtocol.IPC:
self.delete_circus_socket_directory()

self._await_condition(
lambda: self.is_daemon_running,
DaemonException(f'The daemon failed to start within {timeout} seconds.'),
timeout=timeout,
)
return response

def _clean_potentially_stale_pid_file(self) -> None:
"""Check the daemon PID file and delete it if it is likely to be stale."""
Expand Down
4 changes: 2 additions & 2 deletions aiida/manage/configuration/schema/config-v9.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
},
"daemon.timeout": {
"type": "integer",
"default": 5,
"default": 2,
"minimum": 0,
"description": "Timeout in seconds for calls to the circus client"
"description": "Used to set default timeout in the :class:`aiida.engine.daemon.client.DaemonClient` for calls to the daemon"
},
"daemon.worker_process_slots": {
"type": "integer",
Expand Down
10 changes: 8 additions & 2 deletions aiida/manage/tests/pytest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from aiida.common.log import AIIDA_LOGGER
from aiida.common.warnings import warn_deprecation
from aiida.engine import Process, ProcessBuilder, submit
from aiida.engine.daemon.client import DaemonClient, DaemonNotRunningException
from aiida.engine.daemon.client import DaemonClient, DaemonNotRunningException, DaemonTimeoutException
from aiida.manage import Config, Profile, get_manager, get_profile
from aiida.manage.manager import Manager
from aiida.orm import Computer, ProcessNode, User
Expand Down Expand Up @@ -680,7 +680,13 @@ def stopped_daemon_client(daemon_client):
"""Ensure that the daemon is not running for the test profile and return the associated client."""
if daemon_client.is_daemon_running:
daemon_client.stop_daemon(wait=True)
assert not daemon_client.is_daemon_running
# Give an additional grace period by manually waiting for the daemon to be stopped. In certain unit test
# scenarios, the built in wait time in ``daemon_client.stop_daemon`` is not sufficient and even though the
# daemon is stopped, ``daemon_client.is_daemon_running`` will return false for a little bit longer.
daemon_client._await_condition( # pylint: disable=protected-access
lambda: not daemon_client.is_daemon_running,
DaemonTimeoutException('The daemon failed to stop.'),
)

yield daemon_client

Expand Down
1 change: 0 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
'internals/global_design.rst',
'internals/orm.rst',
'scheduler/index.rst',
'topics/daemon.rst',
'working_with_aiida/**',
]

Expand Down
48 changes: 41 additions & 7 deletions docs/source/topics/daemon.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,46 @@
.. todo::
.. _topics:daemon:

.. _topics:daemon:
******
Daemon
******

******
Daemon
******
AiiDA provides a daemon process that runs in the background which handles any new processes (i.e., calculations and workflows, see :ref:`process concepts <topics:processes:concepts>`) that are submitted.
Unlike when running a process, which blocks the current Python interpreter (see the :ref:`launching <topics:processes:usage:launching>` section for details on the difference between *run* and *submit*), the daemon can handle multiple processes asynchronously.

`#4016`_
The daemon concept in AiiDA consists of multiple *system processes*.

.. note::

.. _#4016: https://github.com/aiidateam/aiida-core/issues/4016
System processes, here, refers to processes that are run by the operating system, not to the AiiDA specific collective term for all calculations and workflows.

When the daemon is started, a single system process is launched in the background that runs indefinitely until it is stopped.
This daemonized process is responsible for launching and then monitoring one or multiple daemon *workers*.
Each daemon worker is another system process that connects to RabbitMQ to retrieve calculations and workflows that have been submitted and run them to completion.
If a daemon worker dies, the daemon will automatically revive it.
When the daemon is requested to stop, it will send a signal to all workers to shut them down before shutting down itself.

In summary: AiiDA's daemon consists of a single system process running in the background (the daemon) that manages one or more system processes that handle all submitted calculations and workflows (the daemon workers).


.. _topics:daemon:client:

======
Client
======

The Python API provides the :class:`~aiida.engine.daemon.client.DaemonClient` class to interact with the daemon.
It can either be constructed directly for a given profile, or the :func:`aiida.engine.daemon.client.get_daemon_client` function can be used to construct it for the current default profile.
The main methods of interest for interacting with the daemon are:

* :meth:`~aiida.engine.daemon.client.DaemonClient.start_daemon`
* :meth:`~aiida.engine.daemon.client.DaemonClient.restart_daemon`
* :meth:`~aiida.engine.daemon.client.DaemonClient.stop_daemon`
* :meth:`~aiida.engine.daemon.client.DaemonClient.get_status`

These methods will raise a :class:`~aiida.engine.daemon.client.DaemonException` if the daemon fails to start or calls to it fail.
All methods accept a ``timeout`` argument, which is the number of seconds the client should wait for the daemon process to respond, before raising a :class:`~aiida.engine.daemon.client.DaemonTimeoutException`.
The default for the ``timeout`` is taken from the ``daemon.timeout`` configuration option and is set when constructing the :class:`~aiida.engine.daemon.client.DaemonClient`.

.. note::

The ``DaemonClient`` only directly interacts with the main daemon process, not with any of the daemon workers that it manages.
5 changes: 1 addition & 4 deletions docs/source/topics/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ Topics
calculations/index
workflows/index
provenance/index
daemon
data_types
database
repository
plugins
schedulers
transport
performance

.. todo::

daemon // after provenance

0 comments on commit 30c7f44

Please sign in to comment.