From dfe42d09e48c23816682608323a619aabd9b8a95 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Tue, 23 Jul 2024 07:05:04 +0000 Subject: [PATCH] Avoid FD spike after retrying KafkaAdminClient A caller might call kafka.KafkaAdminClient repeatedly and handle kafka.errors.NoBrokersAvailable if the broker is not available. However, each retry will cause 3 extra FD being used. Depends on how long the caller wait before retry, the FD usage can reach 300~700 before Python garbage collector collecting those FD. This commit close those FD early. --- kafka/admin/client.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 69e643dc7..326ef2541 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -214,14 +214,20 @@ def __init__(self, **configs): metric_group_prefix='admin', **self.config ) - self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) - - # Get auto-discovered version from client if necessary - if self.config['api_version'] is None: - self.config['api_version'] = self._client.config['api_version'] - - self._closed = False - self._refresh_controller_id() + try: + self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + + # Get auto-discovered version from client if necessary + if self.config['api_version'] is None: + self.config['api_version'] = self._client.config['api_version'] + + self._closed = False + self._refresh_controller_id() + except Exception: + self._metrics.close() + self._client.close() # prevent FD leak + self._closed = True + raise log.debug("KafkaAdminClient started.") def close(self):