Skip to content

Commit

Permalink
Improve send performance
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Nov 26, 2023
1 parent b029fa2 commit 3554731
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,25 +626,25 @@ async def _wait_on_metadata(self, topic):
UnknownTopicOrPartitionError: if no topic or partitions found
in cluster metadata
"""
if topic in self.cluster.topics():
return self.cluster.partitions_for_topic(topic)
partitions = self.cluster.partitions_for_topic(topic)
if partitions is not None:
return partitions

Check warning on line 631 in aiokafka/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/client.py#L629-L631

Added lines #L629 - L631 were not covered by tests

# add topic to metadata topic list if it is not there already.
self.add_topic(topic)

t0 = time.monotonic()
while True:
await self.force_metadata_update()
if topic in self.cluster.topics():
break
partitions = self.cluster.partitions_for_topic(topic)
if partitions is not None:
return partitions

Check warning on line 641 in aiokafka/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/client.py#L639-L641

Added lines #L639 - L641 were not covered by tests
if (time.monotonic() - t0) > (self._request_timeout_ms / 1000):
raise UnknownTopicOrPartitionError()
if topic in self.cluster.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
await asyncio.sleep(self._retry_backoff)

return self.cluster.partitions_for_topic(topic)

async def _maybe_wait_metadata(self):
if self._md_update_fut is not None:
await asyncio.shield(self._md_update_fut)
Expand Down

0 comments on commit 3554731

Please sign in to comment.