-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add connection_timeout_ms and reset the timeout counter more often #2388
base: master
Are you sure you want to change the base?
Conversation
Let's get some tests going to characterize this better, and touch things up so it passes existing tests. |
@wbarnha I can write some tests on Monday if that is good for you |
This semantically reflects the new usage of the variable better
The test mocks parts of BrokerConnection in order to assert that the connection state machine allows long-lasting connections as long as the state progresses often enough
Should be good now! Some unrelated tests fail from time to time on my machine due to I'm also a bit unsure as to why some random tests failed in the |
I see another PR(#2381 (comment)) is struggling with the same CI errors, so I assume this PR is good. Is anything else required from my side? |
I would appreciate if I could get feedback on my review. |
I see. I am unable to see any review, did you use reviewable? |
Hello, is there any update? |
I'm still around. Did you see my review in #2388 (comment)? |
@wbarnha Hey! No, I cannot see any review. Neither on reviewable nor here i can se no code comments, and no typical "requested changes" message that usually follows a GitHub review. Am i missing something? Your link does not properly work, it takes me to this PR, but it doesn't take me to a concrete comment. Is it hidden from people who aren't maintainers? Is it not yet submitted? Attached for reference, what I see: |
Weird, let me see what's going on. |
@@ -301,7 +308,7 @@ def __init__(self, host, port, afi, **configs): | |||
if self.config['ssl_context'] is not None: | |||
self._ssl_context = self.config['ssl_context'] | |||
self._sasl_auth_future = None | |||
self.last_attempt = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern with removing the last_attempt
attribute is breaking functionality for users who may depend on this value, for whatever reason. Any particular reason why we can't retain this and allow last_activity
to coexist as a separate attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you see this @petterroea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I agree, that's a valid concern.
I don't think renaming the variable back is a good idea, as it is now updated more often than before and cannot be used to indicate the same things.
I can reinstate last_activity
and make it be updated in the same places as earlier. Only downside here is that it would be a variable that is never read internally in the library, making it excess, but it would avoid breaking compatibility. Are you okay with this solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost, let me make a small adjustment. There are some points where I'd like to update both values via self.last_activity = self.last_attempt = time.time()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok for me!
Just remember that the value of keeping self.last_activity
is diminishing if the variable isn't updated at the same places in time as before this PR. I don't know where you want to make the changes, so I can't really comment more on it, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wbarnha Happy to resolve this conversation?
I forgot to submit... 🤦♂️ |
It happens! Give me a few to re-familiarize myself and get you a proper reply |
@wbarnha how 'bout this? |
FYI: The plan is to include this in the v2.0.4/v2.1.0 release once we properly get the current status of the project sorted out. |
…terations for Kafka 0.8.2 and Python 3.12 (dpkp#159) * skip failing tests for PyPy since they work locally * Reconfigure tests for PyPy and 3.12 * Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2 * Update test_partitioner.py * Update test_producer.py * Timeout tests after ten minutes * Set 0.8.2.2 to be experimental from hereon * Formally support PyPy 3.9
* Test Kafka 0.8.2.2 using Python 3.11 in the meantime * Override PYTHON_LATEST conditionally in python-package.yml * Update python-package.yml * add python annotation to kafka version test matrix * Update python-package.yml * try python 3.10
* Remove support for EOL'ed versions of Python * Update setup.py
Too many MRs to review... so little time.
After stop/start kafka service, kafka-python may use 100% CPU caused by busy-retry while the socket was closed. This fix the issue by unregister the socket if the fd is negative. Co-authored-by: Orange Kao <orange@aiven.io>
Co-authored-by: Ryar Nyah <ryarnyah@gmail.com>
Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>
The former has been deprecated since setuptools 56 Co-authored-by: micwoj92 <45581170+micwoj92@users.noreply.github.com>
* docs: Update syntax in README.rst * docs: Update code block syntax in docs/index.rst --------- Co-authored-by: HalfSweet <60973476+HalfSweet@users.noreply.github.com>
* Fix crc32c's __main__ for Python 3 * Remove TODO from _crc32c.py --------- Co-authored-by: Yonatan Goldschmidt <yon.goldschmidt@gmail.com>
Co-authored-by: Dave Voutila <voutilad@gmail.com>
…pkp#155) * handling OSError * better error output * removed traceback logging --------- Co-authored-by: Alexander Sibiryakov <sixty-one@yandex.ru>
…pkp#134) wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis <shimon.turjeman@armis.com> Co-authored-by: shimon-armis <shimon.turjeman@armis.com>
Co-authored-by: drewdogg <drewdogg@users.noreply.github.com>
* Support custom SASL mechanisms There is some interest in supporting various SASL mechanisms not currently included in the library: * dpkp#2110 (DMS) * dpkp#2204 (SSPI) * dpkp#2232 (AWS_MSK_IAM) Adding these mechanisms in the core library may be undesirable due to: * Increased maintenance burden. * Unavailable testing environments. * Vendor specificity. This commit provides a quick prototype for a pluggable SASL system. --- **Example** To define a custom SASL mechanism a module must implement two methods: ```py def validate_config(conn): # Check configuration values, available libraries, etc. assert conn.config['vendor_specific_setting'] is not None, ( 'vendor_specific_setting required when sasl_mechanism=MY_SASL' ) def try_authenticate(conn, future): # Do authentication routine and return resolved Future with failed # or succeeded state. ``` And then the custom mechanism should be registered before initializing a KafkaAdminClient, KafkaConsumer, or KafkaProducer: ```py import kafka.sasl from kafka import KafkaProducer import my_sasl kafka.sasl.register_mechanism('MY_SASL', my_sasl) producer = KafkaProducer(sasl_mechanism='MY_SASL') ``` --- **Notes** **ABCs** This prototype does not implement an ABC for custom SASL mechanisms. Using an ABC would reduce a few of the explicit assertions involved with registering a mechanism and is a viable option. Due to differing feature sets between py2/py3 this option was not explored, but shouldn't be difficult. **Private Methods** This prototype relies on some methods that are currently marked as **private** in `BrokerConnection`. * `._can_send_recv` * `._lock` * `._recv_bytes_blocking` * `._send_bytes_blocking` A pluggable system would require stable interfaces for these actions. **Alternative Approach** If the module-scoped dict modification in `register_mechanism` feels too clunky maybe the addtional mechanisms can be specified via an argument when initializing one of the `Kafka*` classes? * Add test_msk.py by @mattoberle * add msk to __init__ and check for extension in conn.py * rename try_authenticate in msk.py * fix imports * fix imports * add botocore to requirements-dev.txt * add boto3 to requirements-dev.txt * add awscli to requirements-dev.txt * add awscli to workflow since it takes too long to install normally * just install botocore i guess * just install boto3 i guess * force reinstall awscli * try something weird * ok now the dang tests should work and if they don't i'll cry * skip the msk test for now... * Revert "skip the msk test for now..." This reverts commit 1c29667. * skip the msk test for now... * nvm just needed to update tox lol * Update kafka/sasl/gssapi.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/oauthbearer.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/plain.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/scram.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> * Update kafka/sasl/msk.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> --------- Co-authored-by: Matt Oberle <mattoberle@users.noreply.github.com> Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Now that the codebase has been modernised by using pyupgrade, we can also remove all backported vendor modules, and all uses of them.
I implemented API KEY 35 from the official Apache Kafka documentation. This functionality is requested in issue # 2163 and this is an implementation proposal. Co-authored-by: chopatate <florian.courouge@outlook.fr>
… topic naming (dpkp#172) * Update conftest.py to use request.node.originalname instead for legal topic naming Otherwise parametrization doesn't work. * Update test/conftest.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com> --------- Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
* KIP-345 Add static consumer membership support * KIP-345 Add examples to docs * KIP-345 Add leave_group_on_close flag https://issues.apache.org/jira/browse/KAFKA-6995 * KIP-345 Add tests for static membership * KIP-345 Update docs for leave_group_on_close option * Update changelog.rst * remove six from base.py * Update base.py * Update base.py * Update base.py * Update changelog.rst * Update README.rst --------- Co-authored-by: Denis Kazakov <d.kazakov@mcplat.ru> Co-authored-by: Denis Kazakov <denis@kazakov.ru.net>
* Add typing * define types as Struct for simplicity's sake
…before definition (dpkp#138) * fix if statement logic and add zstd check * fix if statement logic and add zstd uncompress * fix imports * avoid variable be used before definition * Remove unused import from legacy_records.py --------- Co-authored-by: Alexandre Souza <alexandre.souza@axonius.com>
…not installed (dpkp#175) Closes wbarnha#174.
This PR will be merged in via wbarnha#132 |
This PR complements #2386
Lacks tests
This change is