Skip to content

Commit 36fb854

Browse files
committed
query: interrupt query script on SIGTERM
This now supports gracefully terminating script on SIGTERM. Graceful shutdown is also done on Ctrl-C. Also added logs.
1 parent 13e5c13 commit 36fb854

File tree

1 file changed

+81
-5
lines changed

1 file changed

+81
-5
lines changed

src/datachain/catalog/catalog.py

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import os.path
66
import posixpath
7+
import signal
78
import subprocess
89
import sys
910
import time
@@ -97,6 +98,46 @@ def noop(_: str):
9798
pass
9899

99100

101+
class TerminationSignal(RuntimeError): # noqa: N818
102+
def __init__(self, signal):
103+
self.signal = signal
104+
super().__init__("Received termination signal", signal)
105+
106+
def __repr__(self):
107+
return f"{self.__class__.__name__}({self.signal})"
108+
109+
110+
if sys.platform == "win32":
111+
SIGINT = signal.CTRL_C_EVENT
112+
else:
113+
SIGINT = signal.SIGINT
114+
115+
116+
def graceful_shutdown(
117+
proc: subprocess.Popen,
118+
interrupt: bool = True,
119+
interrupt_timeout: Optional[int] = None,
120+
terminate_timeout: Optional[int] = None,
121+
) -> None:
122+
if interrupt:
123+
logger.info("sending interrupt signal to the process %s", proc.pid)
124+
proc.send_signal(SIGINT)
125+
126+
logger.info("waiting for the process %s to finish", proc.pid)
127+
try:
128+
proc.wait(interrupt_timeout)
129+
except subprocess.TimeoutExpired:
130+
logger.info(
131+
"timed out waiting, sending terminate signal to the process %s", proc.pid
132+
)
133+
proc.terminate()
134+
try:
135+
proc.wait(terminate_timeout)
136+
except subprocess.TimeoutExpired:
137+
logger.info("timed out waiting, killing the process %s", proc.pid)
138+
proc.kill()
139+
140+
100141
def _process_stream(stream: "IO[bytes]", callback: Callable[[str], None]) -> None:
101142
buffer = b""
102143
while byt := stream.read(1): # Read one byte at a time
@@ -1493,6 +1534,9 @@ def query(
14931534
output_hook: Callable[[str], None] = noop,
14941535
params: Optional[dict[str, str]] = None,
14951536
job_id: Optional[str] = None,
1537+
interrupt_timeout: Optional[int] = None,
1538+
terminate_timeout: Optional[int] = None,
1539+
send_interrupt: bool = False,
14961540
) -> None:
14971541
cmd = [python_executable, "-c", query_script]
14981542
env = dict(env or os.environ)
@@ -1506,13 +1550,45 @@ def query(
15061550
if capture_output:
15071551
popen_kwargs = {"stdout": subprocess.PIPE, "stderr": subprocess.STDOUT}
15081552

1553+
def signal_handler(sig: int, frame: Any) -> NoReturn:
1554+
raise TerminationSignal(sig)
1555+
1556+
orig_handler = signal.getsignal(signal.SIGTERM)
1557+
signal.signal(signal.SIGTERM, signal_handler)
1558+
15091559
with subprocess.Popen(cmd, env=env, **popen_kwargs) as proc: # noqa: S603
1510-
if capture_output:
1511-
args = (proc.stdout, output_hook)
1512-
thread = Thread(target=_process_stream, args=args, daemon=True)
1513-
thread.start()
1514-
thread.join() # wait for the reader thread
1560+
logger.info("Running script with PID %s", proc.pid)
1561+
try:
1562+
if capture_output:
1563+
args = (proc.stdout, output_hook)
1564+
thread = Thread(target=_process_stream, args=args, daemon=True)
1565+
thread.start()
1566+
thread.join() # wait for the reader thread
1567+
proc.wait()
1568+
except (KeyboardInterrupt, TerminationSignal) as exc:
1569+
if orig_handler is not None:
1570+
signal.signal(signal.SIGTERM, orig_handler)
1571+
1572+
logging.info("Terminating process %s, received %r", proc.pid, exc)
1573+
1574+
# If process is running in the foreground, TTY sends signals to all
1575+
# processes in the process group. So, by default, we don't send
1576+
# interrupt signal to the child process again.
1577+
# If interrupt needs to be sent, this can be enabled by setting
1578+
# send_interrupt to True.
1579+
graceful_shutdown(
1580+
proc,
1581+
send_interrupt,
1582+
interrupt_timeout=interrupt_timeout,
1583+
terminate_timeout=terminate_timeout,
1584+
)
1585+
if proc.returncode:
1586+
raise
1587+
finally:
1588+
if orig_handler is not None:
1589+
signal.signal(signal.SIGTERM, orig_handler)
15151590

1591+
logging.info("Process %s exited with return code %s", proc.pid, proc.returncode)
15161592
if proc.returncode == QUERY_SCRIPT_CANCELED_EXIT_CODE:
15171593
raise QueryScriptCancelError(
15181594
"Query script was canceled by user",

0 commit comments

Comments
 (0)