-
Notifications
You must be signed in to change notification settings - Fork 235
Supervisor for monitoring and keeping alive services #7133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
*Motivation*: For the aiida daemon, a supervisor process for monitoring aiida worker and keeping them alive, we are using circus. This packages consists of ~15k lines of code and is nowadays mainly maintained by us. It supports a client server model for supervising processes which is useful for supervising over a network but not necessary for us. It is also useful if one wants to go beyond standard UNIX signals to communicate with processes. Both of these features are not required for the use case in aiida. I therefore suggest we move to a custom implementation that is easier to maintain for us. My second motivation was to implement support for supervising multiple services with different commands to start. Some of them are incremental like the aiida worker service some are not. While this could have been also implemented using circus, the maintenance burden motivated me to implement this feature in a new custom implementation. This feature is required to continue using the verdi daemon endpoint for the migration to airflow. The idea is that verdi daemon is now starting the airflow scheduler, dag-processor, api-sever and the triggerer. airflow has for blocking tasks workers that are managed by scheduler. For nonblocking tasks however airflow does not provide a managing supervisor. We have to therefore manage the worker for nonblocking tasks by ourselves. For that use case I want to use this implementation supporting supervising single process services and worker services consisting of a tunable amount of processes. *Implementation*: Supervisor controller and process: We need a supervisor process that checks if all services are still running and if not restarts them. In the current API aiida allows to modify the number of workers while the supervisor process is already running. We need to therefore be able to communicate with the supervisor process the changes of number services (changes of workers). The idea is that the logic the user can perform through the CLI is in the ServiceSupervisorController. This one is stateless as the CLI commands are stateless. The logic for stateful supervisor process that checks if all process is alive and restarts it if not is in the ServiceSupervisorProcess. Furthermore, for better logging separation I organized the supervisor into sessions. These are folders with timestamps when the daemon started, so for each daemon restart a new session is created. The logs for the different processes are also put into separate folder to better error messages. One can add utilities to the daemon to merge the logs into one file, if this is needed. ``` daemon/ ├── profile-<aiida_profile_name>/<session_timestamp> │ ├── <worker_service_name>/ │ │ ├── worker_service_config.json # config how to start the worker service │ │ ├── 1/ │ │ │ ├── info.json # PID, state, timestamps, failure count │ │ │ ├── stdout.log # Service stdout │ │ │ └── stderr.log # Service stderr │ │ └── 2/ │ │ ├── info.json # PID, state, timestamps, failure count │ │ ├── stdout.log # Service stdout │ │ └── stderr.log # Service stderr │ └── <service_name>/ │ ├── service_config.json # config how to start the service │ ├── info.json # PID, state, timestamps, failure count │ ├── stdout.log # Service stdout │ └── stderr.log # Service stderr ├── supervisor_info.json # Single daemon PID file ├── supervisor_config.json # Single daemon PID file └── supervisor.log # Daemon output (background mode only) ``` ServiceConfig: To add services as a developer one has to inherit from ServiceConfig and specify the class attribute `service_name` and `command`. The command is used to start a service process. Each `ServiceConfig` needs to implement `create_unique_env(self) -> dict[str, str]:` that is the environemnt variables that should be passed to the process started with the command. For the aiida worker case it only contains the `AIIDA_PATH`. Because we need to initialize the ServiceConfig on the ServiceSupervisorController side to be able to adapt to changes in the supervisor_config.json, we need some pattern to reconstruct the correct subclass from the service identifier (the service_name). For this I automatically register subclasses that set the `service_name`. *API*: I only want to expose the AiidaDaemon as part of the public API, while The supervisor implementation should be internal, but I have not put much toughts into this yet. TODO: - [ ] Cleanup various TODO in the daemon.py - [ ] Split code in daemon.py into multiple files and subdirectory - [ ] Remove circus library - [ ] Integrate fully with CLI (implement missing endpoints and)
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #7133 +/- ##
===========================================
- Coverage 79.58% 28.82% -50.75%
===========================================
Files 566 567 +1
Lines 43517 44082 +565
===========================================
- Hits 34629 12703 -21926
- Misses 8888 31379 +22491 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Motivation:
For the aiida daemon, a supervisor process for monitoring aiida worker and keeping them alive, we are using circus. This packages consists of ~15k lines of code and is nowadays mainly maintained by us. It supports a client server model for supervising processes which is useful for supervising over a network but not necessary for us. It is also useful if one wants to go beyond standard UNIX signals to communicate with processes. Both of these features are not required for the use case in aiida. I therefore suggest we move to a custom implementation that is easier to maintain for us.
My second motivation was to implement support for supervising multiple services with different commands to start. Some of them are incremental like the aiida worker service some are not. While this could have been also implemented using circus, the maintenance burden motivated me to implement this feature in a new custom implementation. This feature is required to continue using the verdi daemon endpoint for the migration to airflow. The idea is that verdi daemon is now starting the airflow scheduler, dag-processor, api-sever and the triggerer. airflow has for blocking tasks workers that are managed by scheduler. For nonblocking tasks however airflow does not provide a managing supervisor. We have to therefore manage the worker for nonblocking tasks by ourselves. For that use case I want to use this implementation supporting supervising single process services and worker services consisting of a tunable amount of processes.
Implementation:
Supervisor controller and process:
We need a supervisor process that checks if all services are still running and if not restarts them. In the current API aiida allows to modify the number of workers while the supervisor process is already running. We need to therefore be able to communicate with the supervisor process the changes of number services (changes of workers). The idea is that the logic the user can perform through the CLI is in the ServiceSupervisorController. This one is stateless as the CLI commands are stateless. The logic for stateful supervisor process that checks if all process is alive and restarts it if not is in the ServiceSupervisorProcess.
Furthermore, for better logging separation I organized the supervisor into sessions. These are folders with timestamps when the daemon started, so for each daemon restart a new session is created. The logs for the different processes are also put into separate folder to better
error messages. One can add utilities to the daemon to merge the logs
into one file, if this is needed.
After starting the ServiceSupervisorProcess through the ServiceSupervisorController.start, the status and changes are communicated through the file

Here a snippet of the status (this contains a lot of debug information for me at the moment)
Class diagram
classDiagram %% Enums class ServiceState { <<enumeration>> ALIVE DEAD } %% Abstract Base Classes class FolderIdentifier { <<abstract>> -_identifier: str +identifier: str +__init__(identifier: str) +__str__() str +__repr__() str +__hash__() int +__eq__(other) bool +is_valid_identifier(name: str)$ bool } class ServiceConfig { <<abstract>> +service_name: ClassVar~str~ +command: ClassVar~str~ +__init_subclass__() +create_unique_env()* dict[str, str] +to_dict() dict } %% Identifier Classes class ServiceIdentifier { } class WorkerIdentifier { -_service_identifier: str -_worker_num: int +service_identifier: str +worker_num: int +__init__(service_identifier: str, worker_num: int) +from_full_identifier(full_identifier: str)$ WorkerIdentifier +__repr__() str } %% Dataclass Mixins class JsonSerialization { <<dataclass>> +from_file(path: Path)$ Self +to_file(path: Path) } class ProcessInfo { <<dataclass>> +pid: int +create_time: float } %% Info Classes class ServiceInfo { <<dataclass>> +service_name: str +command: str +state: str +last_check: float +failures: int } class SupervisorInfo { <<dataclass>> } %% Config Classes class NonWorkerServiceConfig { <<dataclass>> } class WorkerServiceConfig { <<dataclass>> +num_workers: int } class AiidaWorkerConfig { <<dataclass>> +service_name: ClassVar~str~ = "aiida_worker" +command: ClassVar~str~ = "verdi daemon worker" +create_unique_env() dict[str, str] } class SleepServiceConfig { <<dataclass>> +service_name: ClassVar~str~ = "sleep10" +command: ClassVar~str~ = "sleep 10" +create_unique_env() dict[str, str] } %% Factory and Collection Classes class ServiceConfigFactory { +from_file(path: Path)$ ServiceConfig +from_dict(values: dict)$ ServiceConfig } class ServiceConfigMap { -_configs: dict[ServiceIdentifier, ServiceConfig] +__init__(configs: List[ServiceConfig]) +__getitem__(identifier: str | ServiceIdentifier) ServiceConfig +__contains__(identifier: str | ServiceIdentifier) bool +__len__() int +__iter__() +keys() +values() +items() +to_file(path: Path) +from_file(path: Path)$ Self } %% Core Daemon Classes class ServiceSupervisorCommon { +SUPERVISOR_INFO_FILE: str = "supervisor_info.json" +SUPERVISOR_CONFIG_FILE: str = "supervisor_config.json" +SUPERVISOR_LOG_FILE: str = "supervisor.log" +PROCESS_INFO_FILE: str = "info.json" +KILL_TIMEOUT: float = 10.0 +_start_service_process(service_dir: Path, config: ServiceConfig, info: ServiceInfo)$ +_start_worker_service_process(service_dir: Path, config: ServiceConfig, worker_num: int, info: ServiceInfo)$ +_start_process(process_dir: Path, config: ServiceConfig, info: ServiceInfo)$ +_kill_service(pid: int)$ bool +_is_alive(pid: int, create_time: float)$ bool +stop(session_dir: Path)$ } class ServiceSupervisorProcess { -_session_dir: Path -_log_fd: file +monitor_thread: Thread +running: bool +__init__(session_dir: Path, foreground: bool) -_daemonize() -_save_supervisor_info() -_setup_logging() -_setup_child_reaper() -_check_service_process_health(config: ServiceConfig) -_check_worker_process_health(config: ServiceConfig, worker_num: int) -_check_process_health(process_dir, config: ServiceConfig)$ -_health_monitor() -_shutdown() -_signal_handler(signum, frame) } class ServiceSupervisorController { +start(supervisor_dir: Path, service_configs: ServiceConfigMap, foreground: bool)$ +stop(supervisor_dir: Path)$ +status(supervisor_dir: Path)$ -_start_service(session_dir: Path, config: ServiceConfig)$ -_is_running(session_dir: Path)$ bool -_create_new_session_dir(supervisor_dir: Path)$ Path -_get_latest_session_dir(supervisor_dir: Path)$ Path | None -_validate_supervisor_dir(supervisor_dir: Path)$ } class SessionDirUtils { +SESSION_DIR_TIMESTAMP_FORMAT: str +SESSION_DIR_PATTERN: str +generate_dirname()$ str +match_dirname(dirname: str)$ bool } %% AiiDA Integration class AiidaDaemon { -_daemon_dir: Path +__init__(profile_identifier: str | None) +start(num_workers: int, foreground: bool) +stop() +status() } %% Relationships - Inheritance FolderIdentifier <|-- ServiceIdentifier FolderIdentifier <|-- WorkerIdentifier ServiceConfig <|-- NonWorkerServiceConfig ServiceConfig <|-- WorkerServiceConfig NonWorkerServiceConfig <|-- SleepServiceConfig WorkerServiceConfig <|-- AiidaWorkerConfig ProcessInfo <|-- ServiceInfo ProcessInfo <|-- SupervisorInfo JsonSerialization <|-- ServiceInfo JsonSerialization <|-- SupervisorInfo %% Relationships - Composition ServiceConfigMap *-- ServiceConfig : contains ServiceConfigMap *-- ServiceIdentifier : uses as keys ServiceSupervisorController *-- SessionDirUtils : nested class ServiceSupervisorProcess --> ServiceSupervisorCommon : uses ServiceSupervisorController --> ServiceSupervisorCommon : uses ServiceSupervisorController --> ServiceSupervisorProcess : creates %% Relationships - Dependencies ServiceConfigFactory ..> ServiceConfig : creates ServiceConfigMap ..> ServiceConfigFactory : uses ServiceSupervisorCommon ..> ServiceInfo : manages ServiceSupervisorCommon ..> SupervisorInfo : manages ServiceSupervisorCommon ..> ServiceConfig : uses ServiceSupervisorProcess ..> ServiceConfigMap : reads ServiceSupervisorController ..> ServiceConfigMap : uses AiidaDaemon --> ServiceSupervisorController : delegates to AiidaDaemon ..> ServiceConfigMap : creates AiidaDaemon ..> AiidaWorkerConfig : uses AiidaDaemon ..> SleepServiceConfig : uses %% Registry relationship ServiceConfig ..> SERVICE_CONFIG_REGISTRY : auto-registers inServiceConfig:
To add services as a developer one has to inherit from ServiceConfig and specify the class attribute
service_nameandcommand. The command is used to start a service process. EachServiceConfigneeds to implementcreate_unique_env(self) -> dict[str, str]:that is the environemnt variables that should be passed to the process started with the command. For the aiida worker case it only contains theAIIDA_PATH. Because we need to initialize the ServiceConfig on the ServiceSupervisorController side to be able to adapt to changes in the supervisor_config.json, we need some pattern to reconstruct the correct subclass from the service identifier (the service_name). For this I automatically register subclasses that set theservice_name.API:
I only want to expose the AiidaDaemon as part of the public API, while The supervisor implementation should be internal, but I have not put much thoughts into this yet.
TODOs: