From 0bdf25d1d193d29fa115177901def0c555bdf363 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Tue, 2 Dec 2025 14:59:45 -0500 Subject: [PATCH 1/4] fix(clp-package): Forward signals for cleanup of native containers in sbin scripts (fixes #1701). --- .../clp_package_utils/controller.py | 15 ++++--- .../clp_package_utils/general.py | 43 +++++++++++++++++++ .../scripts/archive_manager.py | 4 +- .../clp_package_utils/scripts/compress.py | 4 +- .../scripts/compress_from_s3.py | 4 +- .../scripts/dataset_manager.py | 4 +- .../clp_package_utils/scripts/decompress.py | 6 +-- .../clp_package_utils/scripts/search.py | 4 +- 8 files changed, 66 insertions(+), 18 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/controller.py b/components/clp-package-utils/clp_package_utils/controller.py index 746d118f3c..4c762cb852 100644 --- a/components/clp-package-utils/clp_package_utils/controller.py +++ b/components/clp-package-utils/clp_package_utils/controller.py @@ -56,6 +56,7 @@ generate_docker_compose_container_config, get_clp_home, is_retention_period_configured, + run_subprocess_with_signal_forward, validate_db_config, validate_mcp_server_config, validate_queue_config, @@ -908,11 +909,12 @@ def start(self) -> None: cmd = ["docker", "compose", "--project-name", self._project_name] cmd += ["--file", self._get_docker_file_name()] cmd += ["up", "--detach", "--wait"] - subprocess.run( + proc = run_subprocess_with_signal_forward( cmd, cwd=self._clp_home, - check=True, ) + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, proc.args) logger.info("Started CLP.") def stop(self) -> None: @@ -940,11 +942,12 @@ def stop(self) -> None: else: logger.info("Stopping all CLP containers using Docker Compose...") - subprocess.run( + proc = run_subprocess_with_signal_forward( ["docker", "compose", "--project-name", self._project_name, "down"], cwd=self._clp_home, - check=True, ) + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, proc.args) logger.info("Stopped CLP.") @staticmethod @@ -1012,7 +1015,9 @@ def _chown_recursively( :param group_id: """ chown_cmd = ["chown", "--recursive", f"{user_id}:{group_id}", str(path)] - subprocess.run(chown_cmd, stdout=subprocess.DEVNULL, check=True) + result = run_subprocess_with_signal_forward(chown_cmd, stdout=subprocess.DEVNULL) + if result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, chown_cmd) def _get_ip_from_hostname(hostname: str) -> str: diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 9e325ea593..ec746e2d84 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -5,10 +5,12 @@ import pathlib import re import secrets +import signal import socket import subprocess import uuid from enum import auto +from typing import Any import yaml from clp_py_utils.clp_config import ( @@ -49,6 +51,8 @@ EXTRACT_JSON_CMD = "j" DOCKER_MOUNT_TYPE_STRINGS = ["bind"] +DOCKER_RUN_DEFAULT_TIMEOUT = 10 +DOCKER_RUN_TERMINATE_GRACE_FACTOR = 1.5 S3_KEY_PREFIX_COMPRESSION = "s3-key-prefix" S3_OBJECT_COMPRESSION = "s3-object" @@ -749,6 +753,45 @@ def get_celery_connection_env_vars_list(container_clp_config: ClpConfig) -> list return env_vars +def run_subprocess_with_signal_forward(*args: Any, **kwargs: Any) -> subprocess.CompletedProcess: + """ + Runs a subprocess in its own process group and forwards `SIGINT` and `SIGTERM` received on the + calling process to the entire child process group. + """ + kwargs["start_new_session"] = True + proc = subprocess.Popen(*args, **kwargs) + + # Save original handlers + original_handlers = { + signal.SIGINT: signal.signal(signal.SIGINT, signal.SIG_DFL), + signal.SIGTERM: signal.signal(signal.SIGTERM, signal.SIG_DFL), + } + + def _signal_handler(signum: int, frame: Any) -> None: + """Run cleanup, then invoke the original handler.""" + # Terminate the entire process group to cascade termination to all children. + pgid = os.getpgid(proc.pid) + os.killpg(pgid, signum) + + # Restore and invoke original handler + original = original_handlers[signal.Signals(signum)] + signal.signal(signum, original) + if original not in (signal.SIG_DFL, signal.SIG_IGN): + original(signum, frame) + + signal.signal(signal.SIGINT, _signal_handler) + signal.signal(signal.SIGTERM, _signal_handler) + + try: + proc.wait() + finally: + # Restore original handlers + for sig, handler in original_handlers.items(): + signal.signal(sig, handler) + + return subprocess.CompletedProcess(args[0], proc.returncode) + + def _is_docker_compose_project_running(project_name: str) -> bool: """ Checks if a Docker Compose project is running. diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 6f7a6a4251..e78939a159 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -1,7 +1,6 @@ import argparse import logging import shlex -import subprocess import sys from pathlib import Path from typing import Final @@ -27,6 +26,7 @@ get_clp_home, get_container_config_filename, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -278,7 +278,7 @@ def main(argv: list[str]) -> int: cmd: list[str] = container_start_cmd + archive_manager_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("Archive manager failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress.py b/components/clp-package-utils/clp_package_utils/scripts/compress.py index e93d086eb4..dd9fde3808 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys import uuid @@ -27,6 +26,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -267,7 +267,7 @@ def main(argv): cmd = container_start_cmd + compress_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if ret_code != 0: logger.error("Compression failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py b/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py index bc64716374..82f093c1d8 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys import uuid @@ -26,6 +25,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, S3_KEY_PREFIX_COMPRESSION, S3_OBJECT_COMPRESSION, validate_and_load_db_credentials_file, @@ -321,7 +321,7 @@ def main(argv): cmd = container_start_cmd + compress_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if ret_code != 0: logger.error("Compression failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py b/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py index 2737d937d7..1a37300c74 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py @@ -1,7 +1,6 @@ import argparse import logging import shlex -import subprocess import sys from pathlib import Path from typing import Final @@ -28,6 +27,7 @@ get_clp_home, get_container_config_filename, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -193,7 +193,7 @@ def main(argv: list[str]) -> int: cmd = container_start_cmd + dataset_manager_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("Dataset manager failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index f0aa6e4657..432a4dc4a3 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys from clp_py_utils.clp_config import ( @@ -31,6 +30,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, validate_path_could_be_dir, @@ -161,7 +161,7 @@ def handle_extract_file_cmd( cmd = container_start_cmd + extract_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("file extraction failed.") @@ -270,7 +270,7 @@ def handle_extract_stream_cmd( cmd = container_start_cmd + extract_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("stream extraction failed.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/search.py b/components/clp-package-utils/clp_package_utils/scripts/search.py index 5d117f0b3b..29ebbdb565 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/search.py @@ -2,7 +2,6 @@ import logging import pathlib import shlex -import subprocess import sys from clp_py_utils.clp_config import ( @@ -25,6 +24,7 @@ get_container_config_filename, JobType, load_config_file, + run_subprocess_with_signal_forward, validate_and_load_db_credentials_file, validate_dataset_name, ) @@ -180,7 +180,7 @@ def main(argv): search_cmd.append("--raw") cmd = container_start_cmd + search_cmd - proc = subprocess.run(cmd, check=False) + proc = run_subprocess_with_signal_forward(cmd) ret_code = proc.returncode if 0 != ret_code: logger.error("Search failed.") From e2ce3eadc2427fa886a93afb05624c896b2f5ba9 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Tue, 2 Dec 2025 15:26:00 -0500 Subject: [PATCH 2/4] Add missing return statement description --- components/clp-package-utils/clp_package_utils/general.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index ec746e2d84..cf06c3de49 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -757,6 +757,8 @@ def run_subprocess_with_signal_forward(*args: Any, **kwargs: Any) -> subprocess. """ Runs a subprocess in its own process group and forwards `SIGINT` and `SIGTERM` received on the calling process to the entire child process group. + + :return: The completed subprocess. """ kwargs["start_new_session"] = True proc = subprocess.Popen(*args, **kwargs) From faa192a76ada8009f9cf2dfe3d6960d4ea5b3922 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Wed, 3 Dec 2025 02:13:49 -0500 Subject: [PATCH 3/4] Handle ProcessLookupError if child has already exited --- .../clp-package-utils/clp_package_utils/general.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index cf06c3de49..1e9406551c 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -771,9 +771,13 @@ def run_subprocess_with_signal_forward(*args: Any, **kwargs: Any) -> subprocess. def _signal_handler(signum: int, frame: Any) -> None: """Run cleanup, then invoke the original handler.""" - # Terminate the entire process group to cascade termination to all children. - pgid = os.getpgid(proc.pid) - os.killpg(pgid, signum) + try: + # Terminate the entire process group to cascade termination to all children. + pgid = os.getpgid(proc.pid) + os.killpg(pgid, signum) + except ProcessLookupError: + # Child already terminated. + pass # Restore and invoke original handler original = original_handlers[signal.Signals(signum)] From 3bc97f225ab90b4e14cd476dfe827d4d11c94dd3 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Wed, 3 Dec 2025 02:14:53 -0500 Subject: [PATCH 4/4] use proc.args in CompletedProcess. --- components/clp-package-utils/clp_package_utils/general.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 1e9406551c..dbd2a50e32 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -795,7 +795,7 @@ def _signal_handler(signum: int, frame: Any) -> None: for sig, handler in original_handlers.items(): signal.signal(sig, handler) - return subprocess.CompletedProcess(args[0], proc.returncode) + return subprocess.CompletedProcess(proc.args, proc.returncode) def _is_docker_compose_project_running(project_name: str) -> bool: