Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync with 3.29.0 #406

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
16 changes: 16 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
3.29.0
======
December 19, 2023

Features
--------
* Add support for Python 3.9 through 3.12, drop support for 3.7 (PYTHON-1283)
* Removal of dependency on six module (PR 1172)
* Raise explicit exception when deserializing a vector with a subtype that isn’t a constant size (PYTHON-1371)

Others
------
* Remove outdated Python pre-3.7 references (PR 1186)
* Remove backup(.bak) files (PR 1185)
* Fix doc typo in add_callbacks (PR 1177)

3.28.0
======
June 5, 2023
Expand Down
25 changes: 24 additions & 1 deletion cassandra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def emit(self, record):

logging.getLogger('cassandra').addHandler(NullHandler())

__version_info__ = (3, 28, 0)
__version_info__ = (3, 29, 0)
__version__ = '.'.join(map(str, __version_info__))


Expand Down Expand Up @@ -747,3 +747,26 @@ def __init__(self, op_type=None, rejected_by_coordinator=False):
self.rejected_by_coordinator = rejected_by_coordinator
message = f"[request_error_rate_limit_reached OpType={op_type.name} RejectedByCoordinator={rejected_by_coordinator}]"
Exception.__init__(self, message)


class DependencyException(Exception):
"""
Specific exception class for handling issues with driver dependencies
"""

excs = []
"""
A sequence of child exceptions
"""

def __init__(self, msg, excs=[]):
complete_msg = msg
if excs:
complete_msg += ("\nThe following exceptions were observed: \n - " + '\n - '.join(str(e) for e in excs))
Exception.__init__(self, complete_msg)

class VectorDeserializationFailure(DriverException):
"""
The driver was unable to deserialize a given vector
"""
pass
128 changes: 103 additions & 25 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
from copy import copy
from functools import partial, wraps
from functools import partial, reduce, wraps
from itertools import groupby, count, chain
import json
import logging
Expand All @@ -45,7 +45,7 @@
from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest,
OperationTimedOut, UnsupportedOperation,
SchemaTargetType, DriverException, ProtocolVersion,
UnresolvableContactPoints)
UnresolvableContactPoints, DependencyException)
from cassandra.auth import _proxy_execute_key, PlainTextAuthProvider
from cassandra.connection import (ConnectionException, ConnectionShutdown,
ConnectionHeartbeat, ProtocolVersionUnsupported,
Expand Down Expand Up @@ -98,12 +98,12 @@

try:
from cassandra.io.twistedreactor import TwistedConnection
except ImportError:
except DependencyException:
TwistedConnection = None

try:
from cassandra.io.eventletreactor import EventletConnection
except (ImportError, AttributeError):
except DependencyException:
# AttributeError was add for handling python 3.12 https://github.com/eventlet/eventlet/issues/812
# TODO: remove it when eventlet issue would be fixed
EventletConnection = None
Expand All @@ -113,6 +113,22 @@
except ImportError:
from cassandra.util import WeakSet # NOQA


def _is_gevent_monkey_patched():
if 'gevent.monkey' not in sys.modules:
return False
import gevent.socket
return socket.socket is gevent.socket.socket


def _try_gevent_import():
if _is_gevent_monkey_patched():
from cassandra.io.geventreactor import GeventConnection
return (GeventConnection,None)
else:
return (None,None)


def _is_eventlet_monkey_patched():
if 'eventlet.patcher' not in sys.modules:
return False
Expand All @@ -124,37 +140,93 @@ def _is_eventlet_monkey_patched():
# TODO: remove it when eventlet issue would be fixed
return False


def _is_gevent_monkey_patched():
if 'gevent.monkey' not in sys.modules:
return False
import gevent.socket
return socket.socket is gevent.socket.socket
try:
import eventlet.patcher
return eventlet.patcher.is_monkey_patched('socket')
# Another case related to PYTHON-1364
except AttributeError:
return False


# default to gevent when we are monkey patched with gevent, eventlet when
# monkey patched with eventlet, otherwise if libev is available, use that as
# the default because it's fastest. Otherwise, use asyncore.
if _is_gevent_monkey_patched():
from cassandra.io.geventreactor import GeventConnection as DefaultConnection
elif _is_eventlet_monkey_patched():
from cassandra.io.eventletreactor import EventletConnection as DefaultConnection
else:
def _try_eventlet_import():
try:
from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA
except ImportError:
try:
from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA
except ImportError:
from cassandra.io.asyncioreactor import AsyncioConnection as DefaultConnection # NOQA
from cassandra.io.eventletreactor import EventletConnection
except DependencyException as e:
return None, e
if _is_eventlet_monkey_patched():
return EventletConnection, None
return None, DependencyException("eventlet is not patched")

def _try_libev_import():
try:
from cassandra.io.libevreactor import LibevConnection
return (LibevConnection,None)
except DependencyException as e:
return (None, e)

def _try_asyncore_import():
try:
from cassandra.io.asyncorereactor import AsyncoreConnection
return (AsyncoreConnection,None)
except DependencyException as e:
return (None, e)

def _try_twisted_import():
try:
from cassandra.io.twistedreactor import TwistedConnection
return TwistedConnection, None
except DependencyException as e:
return None, e

def _connection_reduce_fn(val,import_fn):
(rv, excs) = val
# If we've already found a workable Connection class return immediately
if rv:
return val
(import_result, exc) = import_fn()
if exc:
excs.append(exc)
return (rv or import_result, excs)

log = logging.getLogger(__name__)

def get_all_supported_connections_classes():
classes = []
excs = []
for try_fn in (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import, _try_twisted_import):
conn, exc = try_fn()
if conn is not None:
classes.append(conn)
else:
excs.append(exc)
return tuple(classes), tuple(excs)

def get_default_connection_class():
excs = []
for try_fn in (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import, _try_twisted_import):
conn, exc = try_fn()
if conn is not None:
return conn, None
if exc:
excs.append(exc)
return None, tuple(excs)


(conn_class, excs) = get_default_connection_class()
if not conn_class:
raise DependencyException("Unable to load a default connection class", excs)
DefaultConnection = conn_class

# Forces load of utf8 encoding module to avoid deadlock that occurs
# if code that is being imported tries to import the module in a seperate
# thread.
# See http://bugs.python.org/issue10923
"".encode('utf8')

log = logging.getLogger(__name__)


DEFAULT_MIN_REQUESTS = 5
DEFAULT_MAX_REQUESTS = 100
Expand Down Expand Up @@ -802,9 +874,9 @@ def default_retry_policy(self, policy):
Using ssl_options without ssl_context is deprecated and will be removed in the
next major release.

An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket`` (or
``ssl.wrap_socket()`` if used without ssl_context) when new sockets are created.
This should be used when client encryption is enabled in Cassandra.
An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket``
when new sockets are created. This should be used when client encryption is enabled
in Cassandra.

The following documentation only applies when ssl_options is used without ssl_context.

Expand All @@ -820,6 +892,12 @@ def default_retry_policy(self, policy):
should almost always require the option ``'cert_reqs': ssl.CERT_REQUIRED``. Note also that this functionality was not built into
Python standard library until (2.7.9, 3.2). To enable this mechanism in earlier versions, patch ``ssl.match_hostname``
with a custom or `back-ported function <https://pypi.org/project/backports.ssl_match_hostname/>`_.

.. versionchanged:: 3.29.0

``ssl.match_hostname`` has been deprecated since Python 3.7 (and removed in Python 3.12). This functionality is now implemented
via ``ssl.SSLContext.check_hostname``. All options specified above (including ``check_hostname``) should continue to behave in a
way that is consistent with prior implementations.
"""

ssl_context = None
Expand Down
File renamed without changes.
85 changes: 64 additions & 21 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,6 @@ class Connection(object):
_socket = None

_socket_impl = socket
_ssl_impl = ssl

_check_hostname = False
_product_type = None
Expand Down Expand Up @@ -780,7 +779,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
self.endpoint = host if isinstance(host, EndPoint) else DefaultEndPoint(host, port)

self.authenticator = authenticator
self.ssl_options = ssl_options.copy() if ssl_options else None
self.ssl_options = ssl_options.copy() if ssl_options else {}
self.ssl_context = ssl_context
self.sockopts = sockopts
self.compression = compression
Expand All @@ -800,15 +799,20 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
self._on_orphaned_stream_released = on_orphaned_stream_released

if ssl_options:
self._check_hostname = bool(self.ssl_options.pop('check_hostname', False))
if self._check_hostname:
if not getattr(ssl, 'match_hostname', None):
raise RuntimeError("ssl_options specify 'check_hostname', but ssl.match_hostname is not provided. "
"Patch or upgrade Python to use this option.")
self.ssl_options.update(self.endpoint.ssl_options or {})
elif self.endpoint.ssl_options:
self.ssl_options = self.endpoint.ssl_options

# PYTHON-1331
#
# We always use SSLContext.wrap_socket() now but legacy configs may have other params that were passed to ssl.wrap_socket()...
# and either could have 'check_hostname'. Remove these params into a separate map and use them to build an SSLContext if
# we need to do so.
#
# Note the use of pop() here; we are very deliberately removing these params from ssl_options if they're present. After this
# operation ssl_options should contain only args needed for the ssl_context.wrap_socket() call.
if not self.ssl_context and self.ssl_options:
self.ssl_context = self._build_ssl_context_from_options()

if protocol_version >= 3:
self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1)
Expand Down Expand Up @@ -882,15 +886,48 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs):
else:
return conn

def _build_ssl_context_from_options(self):

# Extract a subset of names from self.ssl_options which apply to SSLContext creation
ssl_context_opt_names = ['ssl_version', 'cert_reqs', 'check_hostname', 'keyfile', 'certfile', 'ca_certs', 'ciphers']
opts = {k:self.ssl_options.get(k, None) for k in ssl_context_opt_names if k in self.ssl_options}

# Python >= 3.10 requires either PROTOCOL_TLS_CLIENT or PROTOCOL_TLS_SERVER so we'll get ahead of things by always
# being explicit
ssl_version = opts.get('ssl_version', None) or ssl.PROTOCOL_TLS_CLIENT
cert_reqs = opts.get('cert_reqs', None) or ssl.CERT_REQUIRED
rv = ssl.SSLContext(protocol=int(ssl_version))
rv.check_hostname = bool(opts.get('check_hostname', False))
rv.options = int(cert_reqs)

certfile = opts.get('certfile', None)
keyfile = opts.get('keyfile', None)
if certfile:
rv.load_cert_chain(certfile, keyfile)
ca_certs = opts.get('ca_certs', None)
if ca_certs:
rv.load_verify_locations(ca_certs)
ciphers = opts.get('ciphers', None)
if ciphers:
rv.set_ciphers(ciphers)

return rv

def _wrap_socket_from_context(self):
ssl_options = self.ssl_options or {}

# Extract a subset of names from self.ssl_options which apply to SSLContext.wrap_socket (or at least the parts
# of it that don't involve building an SSLContext under the covers)
wrap_socket_opt_names = ['server_side', 'do_handshake_on_connect', 'suppress_ragged_eofs', 'server_hostname']
opts = {k:self.ssl_options.get(k, None) for k in wrap_socket_opt_names if k in self.ssl_options}

# PYTHON-1186: set the server_hostname only if the SSLContext has
# check_hostname enabled and it is not already provided by the EndPoint ssl options
if (self.ssl_context.check_hostname and
'server_hostname' not in ssl_options):
ssl_options = ssl_options.copy()
ssl_options['server_hostname'] = self.endpoint.address
self._socket = self.ssl_context.wrap_socket(self._socket, **ssl_options)
#opts['server_hostname'] = self.endpoint.address
if (self.ssl_context.check_hostname and 'server_hostname' not in opts):
server_hostname = self.endpoint.address
opts['server_hostname'] = server_hostname

return self.ssl_context.wrap_socket(self._socket, **opts)

def _initiate_connection(self, sockaddr):
if self.features.shard_id is not None:
Expand All @@ -904,8 +941,11 @@ def _initiate_connection(self, sockaddr):

self._socket.connect(sockaddr)

def _match_hostname(self):
ssl.match_hostname(self._socket.getpeercert(), self.endpoint.address)
# PYTHON-1331
#
# Allow implementations specific to an event loop to add additional behaviours
def _validate_hostname(self):
pass

def _get_socket_addresses(self):
address, port = self.endpoint.resolve()
Expand All @@ -927,18 +967,21 @@ def _connect_socket(self):
try:
self._socket = self._socket_impl.socket(af, socktype, proto)
if self.ssl_context:
self._wrap_socket_from_context()
elif self.ssl_options:
if not self._ssl_impl:
raise RuntimeError("This version of Python was not compiled with SSL support")
self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
self._socket = self._wrap_socket_from_context()
self._socket.settimeout(self.connect_timeout)
self._initiate_connection(sockaddr)
self._socket.settimeout(None)

local_addr = self._socket.getsockname()
log.debug("Connection %s: '%s' -> '%s'", id(self), local_addr, sockaddr)

# PYTHON-1331
#
# Most checking is done via the check_hostname param on the SSLContext.
# Subclasses can add additional behaviours via _validate_hostname() so
# run that here.
if self._check_hostname:
self._match_hostname()
self._validate_hostname()
sockerr = None
break
except socket.error as err:
Expand Down
Loading
Loading