-
Notifications
You must be signed in to change notification settings - Fork 113
[Integration][Kafka] Convert client methods to async #1349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Convert describe_cluster, describe_brokers, and describe_topics to async methods - Use anyio.to_thread.run_sync for blocking Kafka operations - Ensure all Kafka client operations run in thread pool - Prevent blocking of event loop during Kafka operations This change aligns all client methods with describe_consumer_groups by making them async and properly handling blocking operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets improve the user experience and performance of the catalog update process, I suggest introducing pagination for batch operations. By yielding results in batches, we can achieve two key benefits:
- Faster Feedback: Entities will become visible in the catalog incrementally, rather than waiting for the entire operation to complete. This improves responsiveness and makes the catalog more interactive for users.
- Prevent "All or Nothing" Scenarios: In the current implementation, a single failure can block the entire batch of entities from being processed. With pagination, failures are isolated to individual batches, reducing the impact and improving system resilience.
If you agree, we could:
Transform the functions to generators that yields entities in chunks of a configurable batch size.
…port-labs/ocean into feat/kafka-async-client-methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left few recommendations
Co-authored-by: Michael Kofi Armah <mikeyarmah@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
This PR converts all Kafka client methods to async to prevent blocking operations. Previously, only
describe_consumer_groups
was async, while other methods were running synchronously and could potentially block the event loop.This change improves the overall reliability of the Kafka integration by ensuring that all blocking operations are properly handled in a thread pool.
How:
describe_cluster
,describe_brokers
, anddescribe_topics
to async methodsanyio.to_thread.run_sync
for blocking Kafka operationsType of change
Please leave one option from the following and delete the rest:
All tests should be run against the port production environment(using a testing org).
Core testing checklist
Integration testing checklist
examples
folder in the integration directory.Preflight checklist
Screenshots
Include screenshots from your environment showing how the resources of the integration will look.
API Documentation
Provide links to the API documentation used for this integration.