Skip to content

Commit

Permalink
add validate_config function for msk module (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sharu95 authored and bradenneal1 committed May 16, 2024
1 parent 06f0450 commit d596b58
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
17 changes: 3 additions & 14 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ class SSLWantWriteError(Exception):
gssapi = None
GSSError = None

# needed for AWS_MSK_IAM authentication:
try:
from botocore.session import Session as BotoSession
except ImportError:
# no botocore available, will disable AWS_MSK_IAM mechanism
BotoSession = None

AFI_NAMES = {
socket.AF_UNSPEC: "unspecified",
socket.AF_INET: "IPv4",
Expand Down Expand Up @@ -113,7 +106,7 @@ class BrokerConnection:
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
Expand Down Expand Up @@ -235,7 +228,7 @@ def __init__(self, host, port, afi, **configs):
for key in self.config:
if key in configs:
self.config[key] = configs[key]

if self.config['connection_timeout_ms'] is None:
self.config['connection_timeout_ms'] = self.config['request_timeout_ms']

Expand All @@ -253,19 +246,15 @@ def __init__(self, host, port, afi, **configs):
assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
'security_protocol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))


if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
assert ssl_available, "Python wasn't built with SSL support"

if self.config['sasl_mechanism'] == 'AWS_MSK_IAM':
assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package'
assert self.config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL'

if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
assert self.config['sasl_mechanism'] in sasl.MECHANISMS, (
'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys()))
)
sasl.MECHANISMS[self.config['sasl_mechanism']].validate_config(self)

# This is not a general lock / this class is not generally thread-safe yet
# However, to avoid pushing responsibility for maintaining
# per-connection locks to the upstream client, we will use this lock to
Expand Down
14 changes: 12 additions & 2 deletions kafka/sasl/msk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@
from kafka.protocol.types import Int32
import kafka.errors as Errors

from botocore.session import Session as BotoSession # importing it in advance is not an option apparently...
# needed for AWS_MSK_IAM authentication:
try:
from botocore.session import Session as BotoSession
except ImportError:
# no botocore available, will disable AWS_MSK_IAM mechanism
BotoSession = None

from typing import Optional


def validate_config(conn):
assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package'
assert conn.config.get('security_protocol') == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL'

def try_authenticate(self, future):

session = BotoSession()
Expand All @@ -25,7 +35,7 @@ def try_authenticate(self, future):
region=session.get_config_variable('region'),
token=credentials.token,
)

msg = client.first_message()
size = Int32.encode(len(msg))

Expand Down

0 comments on commit d596b58

Please sign in to comment.