From 78385ee28210f6dd2f77eb20f4fe47d13c244a2a Mon Sep 17 00:00:00 2001 From: kormang Date: Sun, 18 Apr 2021 20:01:04 +0300 Subject: [PATCH] Fix AsyncIOThreadSafeScheduler deadlock (#567) * Fix AsyncIOThreadSafeScheduler deadlock * Add test for deadlock * Revert "Fix AsyncIOThreadSafeScheduler deadlock" This reverts commit e9c802b98d866ccb1dc3c75c0a82ceb67c760daa. * Reland "Fix AsyncIOThreadSafeScheduler deadlock" * Rename _is_loop_running_on_another_thread * Add test and deduplicate code --- .../eventloop/asynciothreadsafescheduler.py | 36 +++++++++- .../test_asynciothreadsafescheduler.py | 68 +++++++++++++++++++ 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/rx/scheduler/eventloop/asynciothreadsafescheduler.py b/rx/scheduler/eventloop/asynciothreadsafescheduler.py index 52f6e30df..79880d41a 100644 --- a/rx/scheduler/eventloop/asynciothreadsafescheduler.py +++ b/rx/scheduler/eventloop/asynciothreadsafescheduler.py @@ -50,6 +50,10 @@ def interval() -> None: handle = self._loop.call_soon_threadsafe(interval) def dispose() -> None: + if self._on_self_loop_or_not_running(): + handle.cancel() + return + future: Future = Future() def cancel_handle() -> None: @@ -96,14 +100,21 @@ def stage2() -> None: handle.append(self._loop.call_soon_threadsafe(stage2)) def dispose() -> None: - future: Future = Future() - - def cancel_handle() -> None: + def do_cancel_handles(): try: handle.pop().cancel() handle.pop().cancel() except Exception: pass + + if self._on_self_loop_or_not_running(): + do_cancel_handles() + return + + future: Future = Future() + + def cancel_handle() -> None: + do_cancel_handles() future.set_result(0) self._loop.call_soon_threadsafe(cancel_handle) @@ -130,3 +141,22 @@ def schedule_absolute(self, duetime = self.to_datetime(duetime) return self.schedule_relative(duetime - self.now, action, state=state) + + def _on_self_loop_or_not_running(self): + """ + Returns True if either self._loop is not running, or we're currently + executing on self._loop. In both cases, waiting for a future to be + resolved on the loop would result in a deadlock. + """ + if not self._loop.is_running(): + return True + current_loop = None + try: + # In python 3.7 there asyncio.get_running_loop() is prefered. + current_loop = asyncio.get_event_loop() + except RuntimeError: + # If there is no loop in current thread at all, and it is not main + # thread, we get error like: + # RuntimeError: There is no current event loop in thread 'Thread-1' + pass + return self._loop == current_loop diff --git a/tests/test_scheduler/test_eventloop/test_asynciothreadsafescheduler.py b/tests/test_scheduler/test_eventloop/test_asynciothreadsafescheduler.py index 849b0d3e2..e09af2ca1 100644 --- a/tests/test_scheduler/test_eventloop/test_asynciothreadsafescheduler.py +++ b/tests/test_scheduler/test_eventloop/test_asynciothreadsafescheduler.py @@ -92,3 +92,71 @@ def schedule(): assert ran is False loop.run_until_complete(go()) + + def cancel_same_thread_common(self, test_body): + update_state = { + 'ran': False, + 'dispose_completed': False + } + + def action(scheduler, state): + update_state['ran'] = True + + # Make the actual test body run in deamon thread, so that in case of + # failure it doesn't hang indefinitely. + def thread_target(): + loop = asyncio.new_event_loop() + scheduler = AsyncIOThreadSafeScheduler(loop) + + test_body(scheduler, action, update_state) + + @asyncio.coroutine + def go(): + yield from asyncio.sleep(0.2, loop=loop) + + loop.run_until_complete(go()) + + thread = threading.Thread(target=thread_target) + thread.daemon = True + thread.start() + thread.join(0.3) + assert update_state['dispose_completed'] is True + assert update_state['ran'] is False + + + def test_asyncio_threadsafe_cancel_non_relative_same_thread(self): + def test_body(scheduler, action, update_state): + d = scheduler.schedule(action) + + # Test case when dispose is called on thread on which loop is not + # yet running, and non-relative schedele is used. + d.dispose() + update_state['dispose_completed'] = True + + self.cancel_same_thread_common(test_body) + + + def test_asyncio_threadsafe_schedule_action_cancel_same_thread(self): + def test_body(scheduler, action, update_state): + d = scheduler.schedule_relative(0.05, action) + + # Test case when dispose is called on thread on which loop is not + # yet running, and relative schedule is used. + d.dispose() + update_state['dispose_completed'] = True + + self.cancel_same_thread_common(test_body) + + + def test_asyncio_threadsafe_schedule_action_cancel_same_loop(self): + def test_body(scheduler, action, update_state): + d = scheduler.schedule_relative(0.1, action) + + def do_dispose(): + d.dispose() + update_state['dispose_completed'] = True + + # Test case when dispose is called in loop's callback. + scheduler._loop.call_soon(do_dispose) + + self.cancel_same_thread_common(test_body)