Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support IOLoop.close() when called after asyncio.run #3292

Open
graingert opened this issue Jul 6, 2023 · 1 comment
Open

support IOLoop.close() when called after asyncio.run #3292

graingert opened this issue Jul 6, 2023 · 1 comment
Labels

Comments

@graingert
Copy link
Contributor

graingert commented Jul 6, 2023

I've been using the following function to run the distributed test suite:

def _run_and_close_tornado(async_fn, /, *args, **kwargs):
    tornado_loop = None

    async def inner_fn():
        nonlocal tornado_loop
        tornado_loop = IOLoop.current()
        return await async_fn(*args, **kwargs)

    try:
        return asyncio.run(inner_fn())
    finally:
        tornado_loop.close(all_fds=True)

Which has worked great with non-overlapping calls to IOLoop.current(), however I recently introduced an overlapping call, which results in a KeyError:

____________________ test_inproc_specific_different_threads ____________________

    @gen_test()
    async def test_inproc_specific_different_threads():
>       await check_inproc_specific(run_coro_in_thread)

distributed/comm/tests/test_comms.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/comm/tests/test_comms.py:431: in check_inproc_specific
    await asyncio.gather(*futures)
distributed/comm/tests/test_comms.py:447: in run_coro_in_thread
    return await asyncio.to_thread(run_and_close_tornado, run_with_timeout)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/threads.py:25: in to_thread
    return await loop.run_in_executor(None, func_call)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:506: in run_and_close_tornado
    tornado_loop.close(all_fds=True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f7a6addb280>
all_fds = True

    def close(self, all_fds: bool = False) -> None:
        self.closing = True
        for fd in list(self.handlers):
            fileobj, handler_func = self.handlers[fd]
            self.remove_handler(fd)
            if all_fds:
                self.close_fd(fileobj)
        # Remove the mapping before closing the asyncio loop. If this
        # happened in the other order, we could race against another
        # initialize() call which would see the closed asyncio loop,
        # assume it was closed from the asyncio side, and do this
        # cleanup for us, leading to a KeyError.
>       del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
E       KeyError: <_UnixSelectorEventLoop running=False closed=True debug=False>

https://github.com/dask/distributed/actions/runs/5475797836/jobs/9972404903#step:19:2420

@bdarnell could you catch the KeyError so that I can run AbstractEventLoop.close() then IOLoop.close()?

@bdarnell
Copy link
Member

bdarnell commented Jul 8, 2023

My assumption has been that you wouldn't mix IOLoop and asyncio event loop control methods in the same app: you could use IOLoop.run_sync instead of asyncio.run, but in that case you'd rely on the asyncio lifecycle instead of Tornado's. Asyncio doesn't have an all_fds=True equivalent, but I'd expect that if you're relying on that you'd use the ioloop methods instead of asyncio.run. Is there some reason you can't either use the IOLoop methods here or avoid relying on all_fds=True? (If you need this functionality we should look into getting it added to asyncio - I want to eventually phase out all usage of IOLoop)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants