-
Notifications
You must be signed in to change notification settings - Fork 14
Support custom SASL mechanisms including AWS MSK #170
Conversation
There is some interest in supporting various SASL mechanisms not currently included in the library: * dpkp#2110 (DMS) * dpkp#2204 (SSPI) * dpkp#2232 (AWS_MSK_IAM) Adding these mechanisms in the core library may be undesirable due to: * Increased maintenance burden. * Unavailable testing environments. * Vendor specificity. This commit provides a quick prototype for a pluggable SASL system. --- **Example** To define a custom SASL mechanism a module must implement two methods: ```py def validate_config(conn): # Check configuration values, available libraries, etc. assert conn.config['vendor_specific_setting'] is not None, ( 'vendor_specific_setting required when sasl_mechanism=MY_SASL' ) def try_authenticate(conn, future): # Do authentication routine and return resolved Future with failed # or succeeded state. ``` And then the custom mechanism should be registered before initializing a KafkaAdminClient, KafkaConsumer, or KafkaProducer: ```py import kafka.sasl from kafka import KafkaProducer import my_sasl kafka.sasl.register_mechanism('MY_SASL', my_sasl) producer = KafkaProducer(sasl_mechanism='MY_SASL') ``` --- **Notes** **ABCs** This prototype does not implement an ABC for custom SASL mechanisms. Using an ABC would reduce a few of the explicit assertions involved with registering a mechanism and is a viable option. Due to differing feature sets between py2/py3 this option was not explored, but shouldn't be difficult. **Private Methods** This prototype relies on some methods that are currently marked as **private** in `BrokerConnection`. * `._can_send_recv` * `._lock` * `._recv_bytes_blocking` * `._send_bytes_blocking` A pluggable system would require stable interfaces for these actions. **Alternative Approach** If the module-scoped dict modification in `register_mechanism` feels too clunky maybe the addtional mechanisms can be specified via an argument when initializing one of the `Kafka*` classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some things to consider. View full project report here.
assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' | ||
assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' | ||
assert self.config['sasl_mechanism'] in sasl.MECHANISMS, ( | ||
'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f-string is easier to read, write, and less computationally expensive than legacy string formatting. Read more.
self.version = '2020_10_22' | ||
|
||
self.service = 'kafka-cluster' | ||
self.action = '{}:Connect'.format(self.service) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f-string is easier to read, write, and less computationally expensive than legacy string formatting. More.
|
||
@property | ||
def _credential(self): | ||
return '{0.access_key}/{0._scope}'.format(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, Consider using f-string instead.
|
||
@property | ||
def _scope(self): | ||
return '{0.datestamp}/{0.region}/{0.service}/aws4_request'.format(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, Consider using f-string instead.
|
||
def _build_oauth_client_request(conn): | ||
token_provider = conn.config['sasl_oauth_token_provider'] | ||
return "n,,\x01auth=Bearer {}{}\x01\x01".format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, Consider using f-string instead.
# Builds up a string separated by \x01 via a dict of key value pairs | ||
if (callable(getattr(token_provider, "extensions", None)) | ||
and len(token_provider.extensions()) > 0): | ||
msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above: Consider using f-string instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth considering. View full project report here.
This reverts commit 1c29667.
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Consolidating #146 and #147.