Skip to content

Commit

Permalink
query: interrupt query script on SIGTERM
Browse files Browse the repository at this point in the history
This now supports gracefully terminating script on SIGTERM.
Graceful shutdown is also done on Ctrl-C.

Also added logs.
  • Loading branch information
skshetry committed Jan 24, 2025
1 parent 13e5c13 commit ff93db2
Showing 1 changed file with 84 additions and 5 deletions.
89 changes: 84 additions & 5 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import os.path
import posixpath
import signal
import subprocess
import sys
import time
Expand Down Expand Up @@ -97,6 +98,46 @@ def noop(_: str):
pass


class TerminationSignal(RuntimeError): # noqa: N818
def __init__(self, signal):
self.signal = signal
super().__init__("Received termination signal", signal)

Check warning on line 104 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L103-L104

Added lines #L103 - L104 were not covered by tests

def __repr__(self):
return f"{self.__class__.__name__}({self.signal})"

Check warning on line 107 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L107

Added line #L107 was not covered by tests


if sys.platform == "win32":
SIGINT = signal.CTRL_C_EVENT
else:
SIGINT = signal.SIGINT


def graceful_shutdown(
proc: subprocess.Popen,
interrupt: bool = True,
interrupt_timeout: Optional[int] = None,
terminate_timeout: Optional[int] = None,
) -> None:
if interrupt:
logger.info("sending interrupt signal to the process %s", proc.pid)
proc.send_signal(SIGINT)

Check warning on line 124 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L123-L124

Added lines #L123 - L124 were not covered by tests

logger.info("waiting for the process %s to finish", proc.pid)
try:
proc.wait(interrupt_timeout)
except subprocess.TimeoutExpired:
logger.info(

Check warning on line 130 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L126-L130

Added lines #L126 - L130 were not covered by tests
"timed out waiting, sending terminate signal to the process %s", proc.pid
)
proc.terminate()
try:
proc.wait(terminate_timeout)
except subprocess.TimeoutExpired:
logger.info("timed out waiting, killing the process %s", proc.pid)
proc.kill()

Check warning on line 138 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L133-L138

Added lines #L133 - L138 were not covered by tests


def _process_stream(stream: "IO[bytes]", callback: Callable[[str], None]) -> None:
buffer = b""
while byt := stream.read(1): # Read one byte at a time
Expand Down Expand Up @@ -1493,6 +1534,9 @@ def query(
output_hook: Callable[[str], None] = noop,
params: Optional[dict[str, str]] = None,
job_id: Optional[str] = None,
interrupt_timeout: Optional[int] = None,
terminate_timeout: Optional[int] = None,
send_interrupt: bool = False,
) -> None:
cmd = [python_executable, "-c", query_script]
env = dict(env or os.environ)
Expand All @@ -1506,13 +1550,48 @@ def query(
if capture_output:
popen_kwargs = {"stdout": subprocess.PIPE, "stderr": subprocess.STDOUT}

def signal_handler(sig: int, frame: Any) -> NoReturn:
raise TerminationSignal(sig)

Check warning on line 1554 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1554

Added line #L1554 was not covered by tests

orig_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, signal_handler)

thread = None
with subprocess.Popen(cmd, env=env, **popen_kwargs) as proc: # noqa: S603
if capture_output:
args = (proc.stdout, output_hook)
thread = Thread(target=_process_stream, args=args, daemon=True)
thread.start()
thread.join() # wait for the reader thread
logger.info("Running script with PID %s", proc.pid)
try:
if capture_output:
args = (proc.stdout, output_hook)
thread = Thread(target=_process_stream, args=args, daemon=True)
thread.start()

proc.wait()
except (KeyboardInterrupt, TerminationSignal) as exc:

Check warning on line 1569 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1569

Added line #L1569 was not covered by tests
if orig_handler is not None:
signal.signal(signal.SIGTERM, orig_handler)

Check warning on line 1571 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1571

Added line #L1571 was not covered by tests

logging.info("Terminating process %s, received %r", proc.pid, exc)

Check warning on line 1573 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1573

Added line #L1573 was not covered by tests

# If process is running in the foreground, TTY sends signals to all
# processes in the process group. So, by default, we don't send
# interrupt signal to the child process again.
# If interrupt needs to be sent, this can be enabled by setting
# send_interrupt to True.
graceful_shutdown(

Check warning on line 1580 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1580

Added line #L1580 was not covered by tests
proc,
send_interrupt,
interrupt_timeout=interrupt_timeout,
terminate_timeout=terminate_timeout,
)
if proc.returncode:
raise

Check warning on line 1587 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1587

Added line #L1587 was not covered by tests
finally:
if orig_handler is not None:
signal.signal(signal.SIGTERM, orig_handler)
if thread:
thread.join()

logging.info("Process %s exited with return code %s", proc.pid, proc.returncode)
if proc.returncode == QUERY_SCRIPT_CANCELED_EXIT_CODE:
raise QueryScriptCancelError(
"Query script was canceled by user",
Expand Down

0 comments on commit ff93db2

Please sign in to comment.