Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions components/clp-package-utils/clp_package_utils/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
generate_docker_compose_container_config,
get_clp_home,
is_retention_period_configured,
run_subprocess_with_signal_forward,
validate_db_config,
validate_mcp_server_config,
validate_queue_config,
Expand Down Expand Up @@ -1046,11 +1047,12 @@ def start(self) -> None:
cmd = ["docker", "compose", "--project-name", self._project_name]
cmd += ["--file", self._get_docker_file_name()]
cmd += ["up", "--detach", "--wait"]
subprocess.run(
proc = run_subprocess_with_signal_forward(
cmd,
cwd=self._clp_home,
check=True,
)
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, proc.args)
logger.info("Started CLP.")

def stop(self) -> None:
Expand Down Expand Up @@ -1078,11 +1080,12 @@ def stop(self) -> None:
else:
logger.info("Stopping all CLP containers using Docker Compose...")

subprocess.run(
proc = run_subprocess_with_signal_forward(
["docker", "compose", "--project-name", self._project_name, "down"],
cwd=self._clp_home,
check=True,
)
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, proc.args)
logger.info("Stopped CLP.")

@staticmethod
Expand Down Expand Up @@ -1147,7 +1150,9 @@ def _chown_recursively(
:param group_id:
"""
chown_cmd = ["chown", "--recursive", f"{user_id}:{group_id}", str(path)]
subprocess.run(chown_cmd, stdout=subprocess.DEVNULL, check=True)
result = run_subprocess_with_signal_forward(chown_cmd, stdout=subprocess.DEVNULL)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, chown_cmd)


def _get_ip_from_hostname(hostname: str) -> str:
Expand Down
49 changes: 49 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import pathlib
import re
import secrets
import signal
import socket
import subprocess
import uuid
from enum import auto
from typing import Any

import yaml
from clp_py_utils.clp_config import (
Expand Down Expand Up @@ -50,6 +52,8 @@
EXTRACT_JSON_CMD = "j"

DOCKER_MOUNT_TYPE_STRINGS = ["bind"]
DOCKER_RUN_DEFAULT_TIMEOUT = 10
DOCKER_RUN_TERMINATE_GRACE_FACTOR = 1.5

S3_KEY_PREFIX_COMPRESSION = "s3-key-prefix"
S3_OBJECT_COMPRESSION = "s3-object"
Expand Down Expand Up @@ -761,6 +765,51 @@ def get_celery_connection_env_vars_list(container_clp_config: ClpConfig) -> list
return env_vars


def run_subprocess_with_signal_forward(*args: Any, **kwargs: Any) -> subprocess.CompletedProcess:
"""
Runs a subprocess in its own process group and forwards `SIGINT` and `SIGTERM` received on the
calling process to the entire child process group.

:return: The completed subprocess.
"""
kwargs["start_new_session"] = True
proc = subprocess.Popen(*args, **kwargs)

# Save original handlers
original_handlers = {
signal.SIGINT: signal.signal(signal.SIGINT, signal.SIG_DFL),
signal.SIGTERM: signal.signal(signal.SIGTERM, signal.SIG_DFL),
}

def _signal_handler(signum: int, frame: Any) -> None:
"""Run cleanup, then invoke the original handler."""
try:
# Terminate the entire process group to cascade termination to all children.
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signum)
except ProcessLookupError:
# Child already terminated.
pass

# Restore and invoke original handler
original = original_handlers[signal.Signals(signum)]
signal.signal(signum, original)
if original not in (signal.SIG_DFL, signal.SIG_IGN):
original(signum, frame)

signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)

try:
proc.wait()
finally:
# Restore original handlers
for sig, handler in original_handlers.items():
signal.signal(sig, handler)

return subprocess.CompletedProcess(proc.args, proc.returncode)
Comment on lines +803 to +810
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

fd "general.py" --path "*/clp-package-utils/*" -x cat -n {}

Repository: y-scope/clp

Length of output: 270


🏁 Script executed:

fd "general.py" -p "*/clp-package-utils/*" -x cat -n {}

Repository: y-scope/clp

Length of output: 163


🏁 Script executed:

find . -name "general.py" -type f | head -20

Repository: y-scope/clp

Length of output: 114


🏁 Script executed:

cat -n ./components/clp-package-utils/clp_package_utils/general.py | sed -n '790,820p'

Repository: y-scope/clp

Length of output: 1317


🏁 Script executed:

cat -n ./components/clp-package-utils/clp_package_utils/general.py | sed -n '750,815p'

Repository: y-scope/clp

Length of output: 2939


🏁 Script executed:

python - <<'PY'
import ast
from pathlib import Path

risky_kwargs = {"stdout", "stderr", "capture_output", "timeout", "input", "check", "text", "encoding", "errors"}

for path in Path(".").rglob("*.py"):
    try:
        src = path.read_text(encoding="utf-8")
        tree = ast.parse(src)
    except Exception:
        continue

    for node in ast.walk(tree):
        if isinstance(node, ast.Call):
            fn = node.func
            name = fn.id if isinstance(fn, ast.Name) else fn.attr if isinstance(fn, ast.Attribute) else None
            if name == "run_subprocess_with_signal_forward":
                used = {kw.arg for kw in node.keywords if kw.arg is not None}
                overlap = sorted(used & risky_kwargs)
                if overlap:
                    print(f"{path}:{node.lineno}: kwargs={overlap}")
PY

Repository: y-scope/clp

Length of output: 139


🏁 Script executed:

cat -n ./components/clp-package-utils/clp_package_utils/controller.py | sed -n '1145,1165p'

Repository: y-scope/clp

Length of output: 847


🏁 Script executed:

rg "run_subprocess_with_signal_forward" --type py -B 2 -A 2

Repository: y-scope/clp

Length of output: 9250


Replace proc.wait() with proc.communicate() to prevent deadlocks and preserve output.

The current implementation at line 804 uses proc.wait(), which can deadlock indefinitely if callers pass stdout=PIPE or stderr=PIPE (or similar output redirection). Additionally, the returned CompletedProcess at line 810 omits stdout and stderr, making the function incompatible with callers expecting subprocess-run style output capture.

Recommended fix
     try:
-        proc.wait()
+        stdout, stderr = proc.communicate()
     finally:
         # Restore original handlers
         for sig, handler in original_handlers.items():
             signal.signal(sig, handler)
 
-    return subprocess.CompletedProcess(proc.args, proc.returncode)
+    return subprocess.CompletedProcess(proc.args, proc.returncode, stdout, stderr)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/clp-package-utils/clp_package_utils/general.py` around lines 803 -
810, Replace the blocking proc.wait() call inside the try/finally block in the
function with proc.communicate() to avoid deadlocks when callers pass
stdout=PIPE or stderr=PIPE, capture the returned (stdout, stderr) and then
construct and return subprocess.CompletedProcess(proc.args, proc.returncode,
stdout=stdout, stderr=stderr); keep restoring original signal handlers in the
finally block as before and ensure you call proc.communicate() before entering
the finally so handlers are always restored even if communicate raises.


Comment on lines +768 to +811
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Signal‑forwarding helper is solid but tightly coupled to main thread and global handlers

The overall behaviour of run_subprocess_with_signal_forward looks good: it creates a new process group, forwards SIGINT/SIGTERM to that group, tolerates already‑exited children via ProcessLookupError, and returns a CompletedProcess with proc.args and returncode, which matches subprocess.run semantics for these fields.

There are a couple of important constraints worth making explicit or guarding against:

  1. Main‑thread requirement for signal.signal.
    Python only allows signal.signal(...) from the main thread; calling this helper from a background thread will raise ValueError. Given this lives in a general utilities module, it would be safer either to:

    • document in the docstring that it must be called from the main thread of a single‑threaded CLI, or
    • add a runtime guard (e.g., check threading.current_thread() is threading.main_thread() and fail fast with a clear error message).
  2. Global SIGINT/SIGTERM handler overwrite while the child runs.
    Because the helper installs its own handler for SIGINT/SIGTERM, any other code that sets handlers during the subprocess lifetime can interfere (and vice versa). This is probably acceptable for simple top‑level scripts but makes the function hazardous in more complex or multi‑threaded programs. At minimum, the docstring should call this out; if you expect broader reuse, consider refactoring into a context‑manager style API so callers can more explicitly scope and reason about handler changes.

If the intent is “CLI‑only, main‑thread‑only” usage, tightening the documentation (and optionally adding a defensive main‑thread assertion) should be enough.

🤖 Prompt for AI Agents
components/clp-package-utils/clp_package_utils/general.py lines 756-799: The
function sets global SIGINT/SIGTERM handlers which must only be done from the
main thread and will temporarily overwrite global handlers; update the docstring
to state that this helper must be called from the main thread and that it
replaces global SIGINT/SIGTERM handlers for the duration of the call, and add a
runtime guard at the start that checks threading.current_thread() is
threading.main_thread() and raises a clear ValueError if not, so callers
fail-fast; optionally consider exposing a context-manager variant for broader
use, but for now implement the docstring change and the defensive main-thread
assertion.


def _is_docker_compose_project_running(project_name: str) -> bool:
"""
Checks if a Docker Compose project is running.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import logging
import shlex
import subprocess
import sys
from pathlib import Path
from typing import Final
Expand All @@ -27,6 +26,7 @@
get_clp_home,
get_container_config_filename,
load_config_file,
run_subprocess_with_signal_forward,
validate_and_load_db_credentials_file,
validate_dataset_name,
)
Expand Down Expand Up @@ -274,7 +274,7 @@ def main(argv: list[str]) -> int:

cmd: list[str] = container_start_cmd + archive_manager_cmd

proc = subprocess.run(cmd, check=False)
proc = run_subprocess_with_signal_forward(cmd)
ret_code = proc.returncode
if 0 != ret_code:
logger.error("Archive manager failed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import pathlib
import shlex
import subprocess
import sys
import uuid

Expand All @@ -27,6 +26,7 @@
get_container_config_filename,
JobType,
load_config_file,
run_subprocess_with_signal_forward,
validate_and_load_db_credentials_file,
validate_dataset_name,
)
Expand Down Expand Up @@ -257,7 +257,7 @@ def main(argv):

cmd = container_start_cmd + compress_cmd

proc = subprocess.run(cmd, check=False)
proc = run_subprocess_with_signal_forward(cmd)
ret_code = proc.returncode
if ret_code != 0:
logger.error("Compression failed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import pathlib
import shlex
import subprocess
import sys
import uuid

Expand All @@ -26,6 +25,7 @@
get_container_config_filename,
JobType,
load_config_file,
run_subprocess_with_signal_forward,
S3_KEY_PREFIX_COMPRESSION,
S3_OBJECT_COMPRESSION,
validate_and_load_db_credentials_file,
Expand Down Expand Up @@ -311,7 +311,7 @@ def main(argv):

cmd = container_start_cmd + compress_cmd

proc = subprocess.run(cmd, check=False)
proc = run_subprocess_with_signal_forward(cmd)
ret_code = proc.returncode
if ret_code != 0:
logger.error("Compression failed.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import logging
import shlex
import subprocess
import sys
from pathlib import Path
from typing import Final
Expand All @@ -28,6 +27,7 @@
get_clp_home,
get_container_config_filename,
load_config_file,
run_subprocess_with_signal_forward,
validate_and_load_db_credentials_file,
validate_dataset_name,
)
Expand Down Expand Up @@ -189,7 +189,7 @@ def main(argv: list[str]) -> int:

cmd = container_start_cmd + dataset_manager_cmd

proc = subprocess.run(cmd, check=False)
proc = run_subprocess_with_signal_forward(cmd)
ret_code = proc.returncode
if 0 != ret_code:
logger.error("Dataset manager failed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import pathlib
import shlex
import subprocess
import sys

from clp_py_utils.clp_config import (
Expand Down Expand Up @@ -31,6 +30,7 @@
get_container_config_filename,
JobType,
load_config_file,
run_subprocess_with_signal_forward,
validate_and_load_db_credentials_file,
validate_dataset_name,
validate_path_could_be_dir,
Expand Down Expand Up @@ -157,7 +157,7 @@ def handle_extract_file_cmd(

cmd = container_start_cmd + extract_cmd

proc = subprocess.run(cmd, check=False)
proc = run_subprocess_with_signal_forward(cmd)
ret_code = proc.returncode
if 0 != ret_code:
logger.error("file extraction failed.")
Expand Down Expand Up @@ -266,7 +266,7 @@ def handle_extract_stream_cmd(

cmd = container_start_cmd + extract_cmd

proc = subprocess.run(cmd, check=False)
proc = run_subprocess_with_signal_forward(cmd)
ret_code = proc.returncode
if 0 != ret_code:
logger.error("stream extraction failed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import pathlib
import shlex
import subprocess
import sys

from clp_py_utils.clp_config import (
Expand All @@ -25,6 +24,7 @@
get_container_config_filename,
JobType,
load_config_file,
run_subprocess_with_signal_forward,
validate_and_load_db_credentials_file,
validate_dataset_name,
)
Expand Down Expand Up @@ -175,7 +175,7 @@ def main(argv):
search_cmd.append("--raw")
cmd = container_start_cmd + search_cmd

proc = subprocess.run(cmd, check=False)
proc = run_subprocess_with_signal_forward(cmd)
ret_code = proc.returncode
if 0 != ret_code:
logger.error("Search failed.")
Expand Down
Loading