diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 95f92e26e0..fc02392511 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -1,5 +1,7 @@ -from cassandra.connection import Connection, ConnectionShutdown +import threading +from cassandra.connection import Connection, ConnectionShutdown +import sys import asyncio import logging import os @@ -88,9 +90,11 @@ def __init__(self, *args, **kwargs): self._connect_socket() self._socket.setblocking(0) - - self._write_queue = asyncio.Queue() - self._write_queue_lock = asyncio.Lock() + loop_args = dict() + if sys.version_info[0] == 3 and sys.version_info[1] < 10: + loop_args['loop'] = self._loop + self._write_queue = asyncio.Queue(**loop_args) + self._write_queue_lock = asyncio.Lock(**loop_args) # see initialize_reactor -- loop is running in a separate thread, so we # have to use a threadsafe call @@ -108,8 +112,11 @@ def initialize_reactor(cls): if cls._pid != os.getpid(): cls._loop = None if cls._loop is None: - cls._loop = asyncio.new_event_loop() - asyncio.set_event_loop(cls._loop) + try: + cls._loop = asyncio.get_running_loop() + except RuntimeError: + cls._loop = asyncio.new_event_loop() + asyncio.set_event_loop(cls._loop) if not cls._loop_thread: # daemonize so the loop will be shut down on interpreter @@ -162,7 +169,7 @@ def push(self, data): else: chunks = [data] - if self._loop_thread.ident != get_ident(): + if self._loop_thread != threading.current_thread(): asyncio.run_coroutine_threadsafe( self._push_msg(chunks), loop=self._loop @@ -173,7 +180,7 @@ def push(self, data): async def _push_msg(self, chunks): # This lock ensures all chunks of a message are sequential in the Queue - with await self._write_queue_lock: + async with self._write_queue_lock: for chunk in chunks: self._write_queue.put_nowait(chunk) diff --git a/tests/__init__.py b/tests/__init__.py index 4735bbd383..8b56d7657f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -112,3 +112,4 @@ def is_windows(): notwindows = unittest.skipUnless(not is_windows(), "This test is not adequate for windows") notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy") +notasyncio = unittest.skipUnless(not EVENT_LOOP_MANAGER == 'asyncio', "This tests is not suitable for EVENT_LOOP_MANAGER=asyncio") diff --git a/tests/integration/cqlengine/model/test_model.py b/tests/integration/cqlengine/model/test_model.py index 859facf0e1..73096e1b5d 100644 --- a/tests/integration/cqlengine/model/test_model.py +++ b/tests/integration/cqlengine/model/test_model.py @@ -256,10 +256,9 @@ class SensitiveModel(Model): rows[-1] rows[-1:] - # Asyncio complains loudly about old syntax on python 3.7+, so get rid of all of those - relevant_warnings = [warn for warn in w if "with (yield from lock)" not in str(warn.message)] + # ignore DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.') + relevant_warnings = [warn for warn in w if "The loop argument is deprecated" not in str(warn.message)] - self.assertEqual(len(relevant_warnings), 4) self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[0].message)) self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[1].message)) self.assertIn("ModelQuerySet indexing with negative indices support will be removed in 4.0.", diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 11a9fba0ab..0029641cb6 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -36,7 +36,7 @@ from cassandra import connection from cassandra.connection import DefaultEndPoint -from tests import notwindows +from tests import notwindows, notasyncio from tests.integration import use_singledc, get_server_versions, CASSANDRA_VERSION, \ execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \ get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \ @@ -1107,6 +1107,7 @@ def test_add_profile_timeout(self): raise Exception("add_execution_profile didn't timeout after {0} retries".format(max_retry_count)) @notwindows + @notasyncio # asyncio can't do timeouts smaller than 1ms, as this test requires def test_execute_query_timeout(self): with TestCluster() as cluster: session = cluster.connect(wait_for_all_pools=True)