Skip to content

Commit

Permalink
[Integration][Kafka] Convert client methods to async (#1349)
Browse files Browse the repository at this point in the history
# Description

This PR converts all Kafka client methods to async to prevent blocking
operations. Previously, only `describe_consumer_groups` was async, while
other methods were running synchronously and could potentially block the
event loop.

This change improves the overall reliability of the Kafka integration by
ensuring that all blocking operations are properly handled in a thread
pool.


_How:_
- Converted `describe_cluster`, `describe_brokers`, and
`describe_topics` to async methods
- Added `anyio.to_thread.run_sync` for blocking Kafka operations
- Ensured all Kafka client operations run in thread pool
- Updated main.py to properly await the async methods



## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [x] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync finishes successfully
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Resync finishes successfully
- [x] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: Tom Tankilevitch <59158507+Tankilevitch@users.noreply.github.com>
Co-authored-by: Michael Kofi Armah <mikeyarmah@gmail.com>
  • Loading branch information
3 people authored Feb 5, 2025
1 parent 10d323e commit 25b6758
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 113 deletions.
8 changes: 7 additions & 1 deletion integrations/kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.120 (2025-02-05)

### Improvements

- Converted client to use asyncio and added batching


## 0.1.119 (2025-02-04)


Expand All @@ -30,7 +37,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Bumped ocean version to ^0.18.5


## 0.1.116 (2025-01-23)


Expand Down
268 changes: 164 additions & 104 deletions integrations/kafka/kafka_integration/client.py
Original file line number Diff line number Diff line change
@@ -1,98 +1,145 @@
from typing import Any
from typing import Any, AsyncIterator
from itertools import islice
import asyncio
from anyio import to_thread

import confluent_kafka # type: ignore

from confluent_kafka.admin import AdminClient, ConfigResource # type: ignore
from loguru import logger


DEFAULT_BATCH_SIZE = 50


class KafkaClient:
def __init__(self, cluster_name: str, conf: dict[str, Any]):
self.cluster_name = cluster_name
self.kafka_admin_client = AdminClient(conf)
self.cluster_metadata = self.kafka_admin_client.list_topics()

def describe_cluster(self) -> dict[str, Any]:
async def describe_cluster(self) -> dict[str, Any]:
return {
"name": self.cluster_name,
"controller_id": self.cluster_metadata.controller_id,
}

def describe_brokers(self) -> list[dict[str, Any]]:
result_brokers = []
for broker in self.cluster_metadata.brokers.values():
brokers_configs = self.kafka_admin_client.describe_configs(
[ConfigResource(confluent_kafka.admin.RESOURCE_BROKER, str(broker.id))]
async def describe_brokers(
self, batch_size: int = DEFAULT_BATCH_SIZE
) -> AsyncIterator[list[dict[str, Any]]]:
brokers = list(self.cluster_metadata.brokers.values())
if not brokers:
logger.info("No brokers found in the cluster")
return

# Process brokers in batches
brokers_iter = iter(brokers)
while current_batch_brokers := list(islice(brokers_iter, batch_size)):
tasks = []
for broker in current_batch_brokers:
tasks.append(self._process_broker(broker))

try:
current_batch = await asyncio.gather(*tasks)
yield [broker for broker in current_batch if broker is not None]
except Exception as e:
logger.error(f"Failed to process batch of brokers: {e}")
raise e

async def _process_broker(self, broker: Any) -> dict[str, Any] | None: # type: ignore[return]
try:
brokers_configs = await to_thread.run_sync(
self.kafka_admin_client.describe_configs,
[ConfigResource(confluent_kafka.admin.RESOURCE_BROKER, str(broker.id))],
)
for broker_config_resource, future in brokers_configs.items():
broker_id = broker_config_resource.name
try:
broker_config = {
key: value.value for key, value in future.result().items()
}
result_brokers.append(
{
"id": broker.id,
"address": str(broker),
"cluster_name": self.cluster_name,
"config": broker_config,
}
)
except Exception as e:
logger.error(f"Failed to describe broker {broker_id}: {e}")
raise e
return result_brokers

def describe_topics(self) -> list[dict[str, Any]]:
result_topics = []
topics_config_resources = []
topics_metadata_dict = {}

for topic in self.cluster_metadata.topics.values():
topics_config_resources.append(
ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, topic.topic)
broker_config = {
key: value.value
for key, value in (await to_thread.run_sync(future.result)).items()
}
return {
"id": broker.id,
"address": str(broker),
"cluster_name": self.cluster_name,
"config": broker_config,
}
except Exception as e:
logger.error(f"Failed to describe broker {broker.id}: {e}")
return None

async def describe_topics(
self, batch_size: int = DEFAULT_BATCH_SIZE
) -> AsyncIterator[list[dict[str, Any]]]:
topics = list(self.cluster_metadata.topics.values())
if not topics:
logger.info("No topics found in the cluster")
return

# Process topics in batches
topics_iter = iter(topics)
while current_batch_topics := list(islice(topics_iter, batch_size)):
tasks = []
topics_config_resources = []
topics_metadata_dict = {}

for topic in current_batch_topics:
topics_config_resources.append(
ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, topic.topic)
)
topics_metadata_dict[topic.topic] = topic

topics_configs = await to_thread.run_sync(
self.kafka_admin_client.describe_configs, topics_config_resources
)
topics_metadata_dict[topic.topic] = topic

topics_configs = self.kafka_admin_client.describe_configs(
topics_config_resources
)
for topic_config_resource, future in topics_configs.items():
topic_name = topic_config_resource.name
try:
topic_config = {
key: value.value for key, value in future.result().items()
}
partitions = [
{
"id": partition.id,
"leader": partition.leader,
"replicas": partition.replicas,
"isrs": partition.isrs,
}
for partition in topics_metadata_dict[
topic_name
].partitions.values()
]
result_topics.append(
{
"name": topic_name,
"cluster_name": self.cluster_name,
"partitions": partitions,
"config": topic_config,
}
for topic_config_resource, future in topics_configs.items():
tasks.append(
self._process_topic(
topic_config_resource, future, topics_metadata_dict
)
)

try:
current_batch = await asyncio.gather(*tasks)
yield [topic for topic in current_batch if topic is not None]
except Exception as e:
logger.error(f"Failed to describe topic {topic_name}: {e}")
logger.error(f"Failed to process batch of topics: {e}")
raise e
return result_topics

async def describe_consumer_groups(self) -> list[dict[str, Any]]:
async def _process_topic(
self,
topic_config_resource: Any,
future: Any,
topics_metadata_dict: dict[str, Any],
) -> dict[str, Any] | None:
topic_name = topic_config_resource.name
try:
topic_config = {
key: value.value
for key, value in (await to_thread.run_sync(future.result)).items()
}
partitions = [
{
"id": partition.id,
"leader": partition.leader,
"replicas": partition.replicas,
"isrs": partition.isrs,
}
for partition in topics_metadata_dict[topic_name].partitions.values()
]
return {
"name": topic_name,
"cluster_name": self.cluster_name,
"partitions": partitions,
"config": topic_config,
}
except Exception as e:
logger.error(f"Failed to describe topic {topic_name}: {e}")
return None

async def describe_consumer_groups(
self, batch_size: int = DEFAULT_BATCH_SIZE
) -> AsyncIterator[list[dict[str, Any]]]:
"""Describe all consumer groups in the cluster."""
result_groups: list[dict[str, Any]] = []

# List all consumer groups and wait for the future to complete
groups_metadata = await to_thread.run_sync(
self.kafka_admin_client.list_consumer_groups
)
Expand All @@ -101,45 +148,58 @@ async def describe_consumer_groups(self) -> list[dict[str, Any]]:

logger.info(f"Found {len(group_ids)} consumer groups")
if not group_ids:
return result_groups
return

# Describe the consumer groups
groups_description = await to_thread.run_sync(
self.kafka_admin_client.describe_consumer_groups, group_ids
)
# Process group_ids in batches
group_ids_iter = iter(group_ids)
while current_batch_ids := list(islice(group_ids_iter, batch_size)):
groups_description = await to_thread.run_sync(
self.kafka_admin_client.describe_consumer_groups, current_batch_ids
)

# Process all groups in the current batch concurrently
tasks = []
for group_id, future in groups_description.items():
tasks.append(self._process_consumer_group(group_id, future))

for group_id, future in groups_description.items():
try:
group_info = await to_thread.run_sync(future.result)
members = [
{
"id": member.member_id,
"client_id": member.client_id,
"host": member.host,
"assignment": {
"topic_partitions": [
{"topic": tp.topic, "partition": tp.partition}
for tp in member.assignment.topic_partitions
]
},
}
for member in group_info.members
]

result_groups.append(
{
"group_id": group_id,
"state": group_info.state.name,
"members": members,
"cluster_name": self.cluster_name,
"coordinator": group_info.coordinator.id,
"partition_assignor": group_info.partition_assignor,
"is_simple_consumer_group": group_info.is_simple_consumer_group,
"authorized_operations": group_info.authorized_operations,
}
)
current_batch = await asyncio.gather(*tasks)
yield [group for group in current_batch if group is not None]
except Exception as e:
logger.error(f"Failed to describe consumer group {group_id}: {e}")
logger.error(f"Failed to process batch of consumer groups: {e}")
raise e

return result_groups
async def _process_consumer_group(
self, group_id: str, future: Any
) -> dict[str, Any] | None:
"""Process a single consumer group and return its description."""
try:
group_info = await to_thread.run_sync(future.result)
members = [
{
"id": member.member_id,
"client_id": member.client_id,
"host": member.host,
"assignment": {
"topic_partitions": [
{"topic": tp.topic, "partition": tp.partition}
for tp in member.assignment.topic_partitions
]
},
}
for member in group_info.members
]

return {
"group_id": group_id,
"state": group_info.state.name,
"members": members,
"cluster_name": self.cluster_name,
"coordinator": group_info.coordinator.id,
"partition_assignor": group_info.partition_assignor,
"is_simple_consumer_group": group_info.is_simple_consumer_group,
"authorized_operations": group_info.authorized_operations,
}
except Exception as e:
logger.error(f"Failed to describe consumer group {group_id}: {e}")
return None
11 changes: 7 additions & 4 deletions integrations/kafka/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@ def init_clients() -> list[KafkaClient]:
async def resync_cluster(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield [kafka_client.describe_cluster()]
yield [await kafka_client.describe_cluster()]


@ocean.on_resync("broker")
async def resync_brokers(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield kafka_client.describe_brokers()
async for batch in kafka_client.describe_brokers():
yield batch


@ocean.on_resync("topic")
async def resync_topics(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield kafka_client.describe_topics()
async for batch in kafka_client.describe_topics():
yield batch


@ocean.on_resync("consumer_group")
async def resync_consumer_groups(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield await kafka_client.describe_consumer_groups()
async for batch in kafka_client.describe_consumer_groups():
yield batch
2 changes: 1 addition & 1 deletion integrations/kafka/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "kafka"
version = "0.1.119"
version = "0.1.120"
description = "Integration to import information from a Kafka cluster into Port. The integration supports importing metadata regarding the Kafka cluster, brokers and topics."
authors = ["Tal Sabag <tal@getport.io>"]

Expand Down
12 changes: 9 additions & 3 deletions integrations/kafka/tests/seed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,19 @@ def run(self) -> None:
time.sleep(1)


KAFKA_SERVERS_ARRAY = [
"localhost:19092",
"localhost:9092",
"localhost:9093",
"localhost:9094",
]
# Configuration
KAFKA_BOOTSTRAP_SERVERS = "localhost:19092,localhost:9092,localhost:9093,localhost:9094"
NUM_TOPICS = 50
PARTITIONS_PER_TOPIC = 3
KAFKA_BOOTSTRAP_SERVERS = ",".join(KAFKA_SERVERS_ARRAY)
REPLICATION_FACTOR = 1
NUM_CONSUMER_GROUPS = 20
CONSUMERS_PER_GROUP = 3
NUM_CONSUMER_GROUPS = 40
CONSUMERS_PER_GROUP = 5


def create_topics(admin_client: AdminClient, topic_names: List[str]) -> None:
Expand Down

0 comments on commit 25b6758

Please sign in to comment.