Skip to content

Commit

Permalink
Content from PR #1189
Browse files Browse the repository at this point in the history
  • Loading branch information
absurdfarce committed May 29, 2024
1 parent a2720ce commit 3bfccf0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
23 changes: 15 additions & 8 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from cassandra.connection import Connection, ConnectionShutdown
import threading

from cassandra.connection import Connection, ConnectionShutdown
import sys
import asyncio
import logging
import os
Expand Down Expand Up @@ -88,9 +90,11 @@ def __init__(self, *args, **kwargs):

self._connect_socket()
self._socket.setblocking(0)

self._write_queue = asyncio.Queue()
self._write_queue_lock = asyncio.Lock()
loop_args = dict()
if sys.version_info[0] == 3 and sys.version_info[1] < 10:
loop_args['loop'] = self._loop
self._write_queue = asyncio.Queue(**loop_args)
self._write_queue_lock = asyncio.Lock(**loop_args)

# see initialize_reactor -- loop is running in a separate thread, so we
# have to use a threadsafe call
Expand All @@ -108,8 +112,11 @@ def initialize_reactor(cls):
if cls._pid != os.getpid():
cls._loop = None
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
try:
cls._loop = asyncio.get_running_loop()
except RuntimeError:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)

if not cls._loop_thread:
# daemonize so the loop will be shut down on interpreter
Expand Down Expand Up @@ -162,7 +169,7 @@ def push(self, data):
else:
chunks = [data]

if self._loop_thread.ident != get_ident():
if self._loop_thread != threading.current_thread():
asyncio.run_coroutine_threadsafe(
self._push_msg(chunks),
loop=self._loop
Expand All @@ -173,7 +180,7 @@ def push(self, data):

async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with await self._write_queue_lock:
async with self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)

Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,4 @@ def is_windows():

notwindows = unittest.skipUnless(not is_windows(), "This test is not adequate for windows")
notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy")
notasyncio = unittest.skipUnless(not EVENT_LOOP_MANAGER == 'asyncio', "This tests is not suitable for EVENT_LOOP_MANAGER=asyncio")
5 changes: 2 additions & 3 deletions tests/integration/cqlengine/model/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,9 @@ class SensitiveModel(Model):
rows[-1]
rows[-1:]

# Asyncio complains loudly about old syntax on python 3.7+, so get rid of all of those
relevant_warnings = [warn for warn in w if "with (yield from lock)" not in str(warn.message)]
# ignore DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.')
relevant_warnings = [warn for warn in w if "The loop argument is deprecated" not in str(warn.message)]

self.assertEqual(len(relevant_warnings), 4)
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[0].message))
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[1].message))
self.assertIn("ModelQuerySet indexing with negative indices support will be removed in 4.0.",
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from cassandra import connection
from cassandra.connection import DefaultEndPoint

from tests import notwindows
from tests import notwindows, notasyncio
from tests.integration import use_singledc, get_server_versions, CASSANDRA_VERSION, \
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \
Expand Down Expand Up @@ -1107,6 +1107,7 @@ def test_add_profile_timeout(self):
raise Exception("add_execution_profile didn't timeout after {0} retries".format(max_retry_count))

@notwindows
@notasyncio # asyncio can't do timeouts smaller than 1ms, as this test requires
def test_execute_query_timeout(self):
with TestCluster() as cluster:
session = cluster.connect(wait_for_all_pools=True)
Expand Down

0 comments on commit 3bfccf0

Please sign in to comment.