Skip to content

Commit

Permalink
catch ctrl-c for long running queries in tools.storm (SYN-7932) (#4048)
Browse files Browse the repository at this point in the history
Co-authored-by: epiphyte <epiphyte@vertex.link>
  • Loading branch information
OCBender and vEpiphyte authored Dec 26, 2024
1 parent 2f44645 commit 5841b06
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 13 deletions.
5 changes: 5 additions & 0 deletions changes/5433695fd5f2187f2cd9f7a4ba0ae4d9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
desc: Fixed SIGINT handling in the ``synapse.tools.storm`` CLI tool.
prs: []
type: bug
...
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies = [
'aiohttp-socks>=0.9.0,<0.10.0',
'aioimaplib>=1.1.0,<1.2.0',
'aiosmtplib>=3.0.0,<3.1.0',
'prompt-toolkit>=3.0.4,<3.1.0',
'prompt_toolkit>=3.0.29,<3.1.0',
'lark==1.2.2',
'Pygments>=2.7.4,<2.18.0',
'packaging>=20.0,<25.0',
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ aiohttp>=3.10.0,<4.0
aiohttp-socks>=0.9.0,<0.10.0
aioimaplib>=1.1.0,<1.2.0
aiosmtplib>=3.0.0,<3.1.0
prompt-toolkit>=3.0.4,<3.1.0
prompt_toolkit>=3.0.29,<3.1.0
lark==1.2.2
Pygments>=2.7.4,<2.18.0
fastjsonschema>=2.18.0,<2.20.0
Expand Down
31 changes: 20 additions & 11 deletions synapse/lib/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,26 @@ async def _onItemFini(self):

await self.fini()

async def addSignalHandlers(self):
async def addSignalHandlers(self): # pragma: no cover
'''
Register SIGINT signal handler with the ioloop to cancel the currently running cmdloop task.
Removes the handler when the cli is fini'd.
'''

def sigint():
self.printf('<ctrl-c>')
if self.cmdtask is not None:
self.cmdtask.cancel()

self.loop.add_signal_handler(signal.SIGINT, sigint)

def onfini():
# N.B. This is reaches into some loop / handle internals but
# prevents us from removing a handler that overwrote our own.
hndl = self.loop._signal_handlers.get(signal.SIGINT, None) # type: asyncio.Handle
if hndl is not None and hndl._callback is sigint:
self.loop.remove_signal_handler(signal.SIGINT)

self.onfini(onfini)

def get(self, name, defval=None):
return self.locs.get(name, defval)

Expand Down Expand Up @@ -324,8 +332,12 @@ async def prompt(self, text=None):
if text is None:
text = self.cmdprompt

with patch_stdout():
retn = await self.sess.prompt_async(text, vi_mode=self.vi_mode, enable_open_in_editor=True)
with patch_stdout(): # pragma: no cover
retn = await self.sess.prompt_async(text,
vi_mode=self.vi_mode,
enable_open_in_editor=True,
handle_sigint=False # We handle sigint in the loop
)
return retn

def printf(self, mesg, addnl=True, color=None):
Expand Down Expand Up @@ -390,7 +402,7 @@ async def runCmdLoop(self):
self.cmdtask = self.schedCoro(coro)
await self.cmdtask

except KeyboardInterrupt:
except (KeyboardInterrupt, asyncio.CancelledError):

if self.isfini:
return
Expand All @@ -408,11 +420,8 @@ async def runCmdLoop(self):
if self.cmdtask is not None:
self.cmdtask.cancel()
try:
self.cmdtask.result()
except asyncio.CancelledError:
# Wait a beat to let any remaining nodes to print out before we print the prompt
await asyncio.sleep(1)
except Exception:
await asyncio.wait_for(self.cmdtask, timeout=0.1)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass

async def runCmdLine(self, line):
Expand Down
95 changes: 95 additions & 0 deletions synapse/tests/test_tools_storm.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,59 @@
import os
import sys
import signal
import asyncio
import multiprocessing

import synapse.tests.utils as s_test

from prompt_toolkit.document import Document
from prompt_toolkit.completion import Completion, CompleteEvent

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.telepath as s_telepath

import synapse.lib.coro as s_coro
import synapse.lib.output as s_output
import synapse.lib.msgpack as s_msgpack
import synapse.tools.storm as s_t_storm

def run_cli_till_print(url, evt1):
'''
Run the stormCLI until we get a print mesg then set the event.
This is a Process target.
'''
async def main():
outp = s_output.OutPutStr() # Capture output instead of sending it to stdout
async with await s_telepath.openurl(url) as proxy:
async with await s_t_storm.StormCli.anit(proxy, outp=outp) as scli:
cmdqueue = asyncio.Queue()
await cmdqueue.put('while (true) { $lib.print(go) $lib.time.sleep(1) }')
await cmdqueue.put('!quit')

async def fake_prompt():
return await cmdqueue.get()

scli.prompt = fake_prompt

d = {'evt1': False}
async def onmesg(event):
if d.get('evt1'):
return
mesg = event[1].get('mesg')
if mesg[0] != 'print':
return
evt1.set()
d['evt1'] = True

with scli.onWith('storm:mesg', onmesg):
await scli.addSignalHandlers()
await scli.runCmdLoop()

asyncio.run(main())
sys.exit(137)

class StormCliTest(s_test.SynTest):

async def test_tools_storm(self):
Expand Down Expand Up @@ -378,3 +422,54 @@ async def get_completions(text):
),
vals
)

async def test_storm_cmdloop_interrupt(self):
'''
Test interrupting a long-running query in the command loop
'''
async with self.getTestCore() as core:

async with core.getLocalProxy() as proxy:

outp = s_test.TstOutPut()
async with await s_t_storm.StormCli.anit(proxy, outp=outp) as scli:

cmdqueue = asyncio.Queue()
await cmdqueue.put('while (true) { $lib.time.sleep(1) }')
await cmdqueue.put('!quit')

async def fake_prompt():
return await cmdqueue.get()
scli.prompt = fake_prompt

cmdloop_task = asyncio.create_task(scli.runCmdLoop())
await asyncio.sleep(0.1)

if scli.cmdtask is not None:
scli.cmdtask.cancel()

await cmdloop_task

outp.expect('<ctrl-c>')
outp.expect('o/')
self.true(scli.isfini)

async def test_storm_cmdloop_sigint(self):
'''
Test interrupting a long-running query in the command loop with a process target and SIGINT.
'''

async with self.getTestCore() as core:
url = core.getLocalUrl()

ctx = multiprocessing.get_context('spawn')

evt1 = ctx.Event()

proc = ctx.Process(target=run_cli_till_print, args=(url, evt1,))
proc.start()

self.true(await s_coro.executor(evt1.wait, timeout=30))
os.kill(proc.pid, signal.SIGINT)
proc.join(timeout=30)
self.eq(proc.exitcode, 137)

0 comments on commit 5841b06

Please sign in to comment.