Skip to content

Commit

Permalink
Merge pull request #946 from ods/issue943-send-perfomance
Browse files Browse the repository at this point in the history
Improve send performance (#943)
  • Loading branch information
ods authored Nov 27, 2023
2 parents b029fa2 + 3554731 commit 8814e7b
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,25 +626,25 @@ async def _wait_on_metadata(self, topic):
UnknownTopicOrPartitionError: if no topic or partitions found
in cluster metadata
"""
if topic in self.cluster.topics():
return self.cluster.partitions_for_topic(topic)
partitions = self.cluster.partitions_for_topic(topic)
if partitions is not None:
return partitions

# add topic to metadata topic list if it is not there already.
self.add_topic(topic)

t0 = time.monotonic()
while True:
await self.force_metadata_update()
if topic in self.cluster.topics():
break
partitions = self.cluster.partitions_for_topic(topic)
if partitions is not None:
return partitions
if (time.monotonic() - t0) > (self._request_timeout_ms / 1000):
raise UnknownTopicOrPartitionError()
if topic in self.cluster.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
await asyncio.sleep(self._retry_backoff)

return self.cluster.partitions_for_topic(topic)

async def _maybe_wait_metadata(self):
if self._md_update_fut is not None:
await asyncio.shield(self._md_update_fut)
Expand Down

0 comments on commit 8814e7b

Please sign in to comment.