Skip to content

Commit

Permalink
Fix AsyncIOThreadSafeScheduler deadlock (#567)
Browse files Browse the repository at this point in the history
* Fix AsyncIOThreadSafeScheduler deadlock

* Add test for deadlock

* Revert "Fix AsyncIOThreadSafeScheduler deadlock"

This reverts commit e9c802b.

* Reland "Fix AsyncIOThreadSafeScheduler deadlock"

* Rename _is_loop_running_on_another_thread

* Add test and deduplicate code
  • Loading branch information
kormang authored Apr 18, 2021
1 parent 1bd44fe commit 78385ee
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 3 deletions.
36 changes: 33 additions & 3 deletions rx/scheduler/eventloop/asynciothreadsafescheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def interval() -> None:
handle = self._loop.call_soon_threadsafe(interval)

def dispose() -> None:
if self._on_self_loop_or_not_running():
handle.cancel()
return

future: Future = Future()

def cancel_handle() -> None:
Expand Down Expand Up @@ -96,14 +100,21 @@ def stage2() -> None:
handle.append(self._loop.call_soon_threadsafe(stage2))

def dispose() -> None:
future: Future = Future()

def cancel_handle() -> None:
def do_cancel_handles():
try:
handle.pop().cancel()
handle.pop().cancel()
except Exception:
pass

if self._on_self_loop_or_not_running():
do_cancel_handles()
return

future: Future = Future()

def cancel_handle() -> None:
do_cancel_handles()
future.set_result(0)

self._loop.call_soon_threadsafe(cancel_handle)
Expand All @@ -130,3 +141,22 @@ def schedule_absolute(self,

duetime = self.to_datetime(duetime)
return self.schedule_relative(duetime - self.now, action, state=state)

def _on_self_loop_or_not_running(self):
"""
Returns True if either self._loop is not running, or we're currently
executing on self._loop. In both cases, waiting for a future to be
resolved on the loop would result in a deadlock.
"""
if not self._loop.is_running():
return True
current_loop = None
try:
# In python 3.7 there asyncio.get_running_loop() is prefered.
current_loop = asyncio.get_event_loop()
except RuntimeError:
# If there is no loop in current thread at all, and it is not main
# thread, we get error like:
# RuntimeError: There is no current event loop in thread 'Thread-1'
pass
return self._loop == current_loop
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,71 @@ def schedule():
assert ran is False

loop.run_until_complete(go())

def cancel_same_thread_common(self, test_body):
update_state = {
'ran': False,
'dispose_completed': False
}

def action(scheduler, state):
update_state['ran'] = True

# Make the actual test body run in deamon thread, so that in case of
# failure it doesn't hang indefinitely.
def thread_target():
loop = asyncio.new_event_loop()
scheduler = AsyncIOThreadSafeScheduler(loop)

test_body(scheduler, action, update_state)

@asyncio.coroutine
def go():
yield from asyncio.sleep(0.2, loop=loop)

loop.run_until_complete(go())

thread = threading.Thread(target=thread_target)
thread.daemon = True
thread.start()
thread.join(0.3)
assert update_state['dispose_completed'] is True
assert update_state['ran'] is False


def test_asyncio_threadsafe_cancel_non_relative_same_thread(self):
def test_body(scheduler, action, update_state):
d = scheduler.schedule(action)

# Test case when dispose is called on thread on which loop is not
# yet running, and non-relative schedele is used.
d.dispose()
update_state['dispose_completed'] = True

self.cancel_same_thread_common(test_body)


def test_asyncio_threadsafe_schedule_action_cancel_same_thread(self):
def test_body(scheduler, action, update_state):
d = scheduler.schedule_relative(0.05, action)

# Test case when dispose is called on thread on which loop is not
# yet running, and relative schedule is used.
d.dispose()
update_state['dispose_completed'] = True

self.cancel_same_thread_common(test_body)


def test_asyncio_threadsafe_schedule_action_cancel_same_loop(self):
def test_body(scheduler, action, update_state):
d = scheduler.schedule_relative(0.1, action)

def do_dispose():
d.dispose()
update_state['dispose_completed'] = True

# Test case when dispose is called in loop's callback.
scheduler._loop.call_soon(do_dispose)

self.cancel_same_thread_common(test_body)

0 comments on commit 78385ee

Please sign in to comment.