From 3554731f74f936cc004ab4479f9443ed9576b5ae Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 26 Nov 2023 20:22:44 +0200 Subject: [PATCH] Improve send performance --- aiokafka/client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/aiokafka/client.py b/aiokafka/client.py index 9ecd23cd..26436fec 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -626,8 +626,9 @@ async def _wait_on_metadata(self, topic): UnknownTopicOrPartitionError: if no topic or partitions found in cluster metadata """ - if topic in self.cluster.topics(): - return self.cluster.partitions_for_topic(topic) + partitions = self.cluster.partitions_for_topic(topic) + if partitions is not None: + return partitions # add topic to metadata topic list if it is not there already. self.add_topic(topic) @@ -635,16 +636,15 @@ async def _wait_on_metadata(self, topic): t0 = time.monotonic() while True: await self.force_metadata_update() - if topic in self.cluster.topics(): - break + partitions = self.cluster.partitions_for_topic(topic) + if partitions is not None: + return partitions if (time.monotonic() - t0) > (self._request_timeout_ms / 1000): raise UnknownTopicOrPartitionError() if topic in self.cluster.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(topic) await asyncio.sleep(self._retry_backoff) - return self.cluster.partitions_for_topic(topic) - async def _maybe_wait_metadata(self): if self._md_update_fut is not None: await asyncio.shield(self._md_update_fut)