diff --git a/components/clp-package-utils/clp_package_utils/controller.py b/components/clp-package-utils/clp_package_utils/controller.py index 950388c6af..84cab5cce4 100644 --- a/components/clp-package-utils/clp_package_utils/controller.py +++ b/components/clp-package-utils/clp_package_utils/controller.py @@ -67,6 +67,7 @@ generate_docker_compose_container_config, get_clp_home, is_retention_period_configured, + run_subprocess_with_signal_forward, validate_db_config, validate_mcp_server_config, validate_queue_config, @@ -1046,11 +1047,12 @@ def start(self) -> None: cmd = ["docker", "compose", "--project-name", self._project_name] cmd += ["--file", self._get_docker_file_name()] cmd += ["up", "--detach", "--wait"] - subprocess.run( + proc = run_subprocess_with_signal_forward( cmd, cwd=self._clp_home, - check=True, ) + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, proc.args) logger.info("Started CLP.") def stop(self) -> None: @@ -1078,11 +1080,12 @@ def stop(self) -> None: else: logger.info("Stopping all CLP containers using Docker Compose...") - subprocess.run( + proc = run_subprocess_with_signal_forward( ["docker", "compose", "--project-name", self._project_name, "down"], cwd=self._clp_home, - check=True, ) + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, proc.args) logger.info("Stopped CLP.") @staticmethod @@ -1147,7 +1150,9 @@ def _chown_recursively( :param group_id: """ chown_cmd = ["chown", "--recursive", f"{user_id}:{group_id}", str(path)] - subprocess.run(chown_cmd, stdout=subprocess.DEVNULL, check=True) + result = run_subprocess_with_signal_forward(chown_cmd, stdout=subprocess.DEVNULL) + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, chown_cmd) def _get_ip_from_hostname(hostname: str) -> str: diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index e20cd70725..3046b7af53 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -5,10 +5,12 @@ import pathlib import re import secrets +import signal import socket import subprocess import uuid from enum import auto +from typing import Any import yaml from clp_py_utils.clp_config import ( @@ -50,6 +52,8 @@ EXTRACT_JSON_CMD = "j" DOCKER_MOUNT_TYPE_STRINGS = ["bind"] +DOCKER_RUN_DEFAULT_TIMEOUT = 10 +DOCKER_RUN_TERMINATE_GRACE_FACTOR = 1.5 S3_KEY_PREFIX_COMPRESSION = "s3-key-prefix" S3_OBJECT_COMPRESSION = "s3-object" @@ -761,6 +765,51 @@ def get_celery_connection_env_vars_list(container_clp_config: ClpConfig) -> list return env_vars +def run_subprocess_with_signal_forward(*args: Any, **kwargs: Any) -> subprocess.CompletedProcess: + """ + Runs a subprocess in its own process group and forwards `SIGINT` and `SIGTERM` received on the + calling process to the entire child process group. + + :return: The completed subprocess. + """ + kwargs["start_new_session"] = True + proc = subprocess.Popen(*args, **kwargs) + + # Save original handlers + original_handlers = { + signal.SIGINT: signal.signal(signal.SIGINT, signal.SIG_DFL), + signal.SIGTERM: signal.signal(signal.SIGTERM, signal.SIG_DFL), + } + + def _signal_handler(signum: int, frame: Any) -> None: + """Run cleanup, then invoke the original handler.""" + try: + # Terminate the entire process group to cascade termination to all children. + pgid = os.getpgid(proc.pid) + os.killpg(pgid, signum) + except ProcessLookupError: + # Child already terminated. + pass + + # Restore and invoke original handler + original = original_handlers[signal.Signals(signum)] + signal.signal(signum, original) + if original not in (signal.SIG_DFL, signal.SIG_IGN): + original(signum, frame) + + signal.signal(signal.SIGINT, _signal_handler) + signal.signal(signal.SIGTERM, _signal_handler) + + try: + proc.wait() + finally: + # Restore original handlers + for sig, handler in original_handlers.items(): + signal.signal(sig, handler) + + return subprocess.CompletedProcess(proc.args, proc.returncode) + + def _is_docker_compose_project_running(project_name: str) -> bool: """ Checks if a Docker Compose project is running. diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 93e97246bc..3e9aa24f9f 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -1,7 +1,6 @@ import argparse import logging import shlex -import subprocess import sys from pathlib import Path from typing import Final @@ -27,6 +26,7 @@ get_clp_home, get_container_config_filename, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -274,7 +274,7 @@ def main(argv: list[str]) -> int: cmd: list[str] = container_start_cmd + archive_manager_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("Archive manager failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress.py b/components/clp-package-utils/clp_package_utils/scripts/compress.py index 6d6b665d2f..eb03cad583 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys import uuid @@ -27,6 +26,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -257,7 +257,7 @@ def main(argv): cmd = container_start_cmd + compress_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if ret_code != 0: logger.error("Compression failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py b/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py index 532cdded56..4f83bbd445 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys import uuid @@ -26,6 +25,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, S3_KEY_PREFIX_COMPRESSION, S3_OBJECT_COMPRESSION, validate_and_load_db_credentials_file, @@ -311,7 +311,7 @@ def main(argv): cmd = container_start_cmd + compress_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if ret_code != 0: logger.error("Compression failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py b/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py index 72fb62c9d2..ec1dfaa7a9 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py @@ -1,7 +1,6 @@ import argparse import logging import shlex -import subprocess import sys from pathlib import Path from typing import Final @@ -28,6 +27,7 @@ get_clp_home, get_container_config_filename, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -189,7 +189,7 @@ def main(argv: list[str]) -> int: cmd = container_start_cmd + dataset_manager_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("Dataset manager failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index 2adfc6588e..4bf72b3542 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys from clp_py_utils.clp_config import ( @@ -31,6 +30,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, validate_path_could_be_dir, @@ -157,7 +157,7 @@ def handle_extract_file_cmd( cmd = container_start_cmd + extract_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("file extraction failed.") @@ -266,7 +266,7 @@ def handle_extract_stream_cmd( cmd = container_start_cmd + extract_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("stream extraction failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/search.py b/components/clp-package-utils/clp_package_utils/scripts/search.py index 8b80c7e0ec..e0a54b1dae 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/search.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys from clp_py_utils.clp_config import ( @@ -25,6 +24,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -175,7 +175,7 @@ def main(argv): search_cmd.append("--raw") cmd = container_start_cmd + search_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("Search failed.")