From d596b58749713c8c774bc91690a78192616f571b Mon Sep 17 00:00:00 2001 From: Sharu Kulam Date: Thu, 4 Apr 2024 17:55:48 +0200 Subject: [PATCH] add validate_config function for msk module (#176) --- kafka/conn.py | 17 +++-------------- kafka/sasl/msk.py | 14 ++++++++++++-- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index ebf314bd5..745e4bca6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -68,13 +68,6 @@ class SSLWantWriteError(Exception): gssapi = None GSSError = None -# needed for AWS_MSK_IAM authentication: -try: - from botocore.session import Session as BotoSession -except ImportError: - # no botocore available, will disable AWS_MSK_IAM mechanism - BotoSession = None - AFI_NAMES = { socket.AF_UNSPEC: "unspecified", socket.AF_INET: "IPv4", @@ -113,7 +106,7 @@ class BrokerConnection: will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000. connection_timeout_ms (int): Connection timeout in milliseconds. - Default: None, which defaults it to the same value as + Default: None, which defaults it to the same value as request_timeout_ms. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. @@ -235,7 +228,7 @@ def __init__(self, host, port, afi, **configs): for key in self.config: if key in configs: self.config[key] = configs[key] - + if self.config['connection_timeout_ms'] is None: self.config['connection_timeout_ms'] = self.config['request_timeout_ms'] @@ -253,19 +246,15 @@ def __init__(self, host, port, afi, **configs): assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, ( 'security_protocol must be in ' + ', '.join(self.SECURITY_PROTOCOLS)) - if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): assert ssl_available, "Python wasn't built with SSL support" - if self.config['sasl_mechanism'] == 'AWS_MSK_IAM': - assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' - assert self.config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' - if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): assert self.config['sasl_mechanism'] in sasl.MECHANISMS, ( 'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys())) ) sasl.MECHANISMS[self.config['sasl_mechanism']].validate_config(self) + # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index ebea5dc5a..2ae88d326 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -10,10 +10,20 @@ from kafka.protocol.types import Int32 import kafka.errors as Errors -from botocore.session import Session as BotoSession # importing it in advance is not an option apparently... +# needed for AWS_MSK_IAM authentication: +try: + from botocore.session import Session as BotoSession +except ImportError: + # no botocore available, will disable AWS_MSK_IAM mechanism + BotoSession = None + from typing import Optional +def validate_config(conn): + assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' + assert conn.config.get('security_protocol') == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' + def try_authenticate(self, future): session = BotoSession() @@ -25,7 +35,7 @@ def try_authenticate(self, future): region=session.get_config_variable('region'), token=credentials.token, ) - + msg = client.first_message() size = Int32.encode(len(msg))