Skip to content

Commit

Permalink
better response structure, better tests, reuse more code, fix protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
alfaix committed Mar 21, 2023
1 parent f9afb4e commit d944b36
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 50 deletions.
79 changes: 35 additions & 44 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,49 +973,26 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
:return: Dictionary with ``{leader_id -> {partitions}}``
"""
timeout_ms = self._validate_timeout(timeout_ms)
version = self._matching_api_version(MetadataRequest)

partitions = set(partitions)
topics = set()

for topic_partition in partitions:
topics.add(topic_partition.topic)

request = MetadataRequest[version](
topics=list(topics),
allow_auto_topic_creation=False
)

future = self._send_request_to_node(self._client.least_loaded_node(), request)

self._wait_for_futures([future])
response = future.value

version = self._matching_api_version(DeleteRecordsRequest)
topics = set(tp.topic for tp in partitions)

PARTITIONS_INFO = 3
NAME = 1
PARTITION_INDEX = 1
LEADER = 2
response = self._get_cluster_metadata(topics=topics).to_object()

# We want to make as few requests as possible
# If a single node serves as a partition leader for multiple partitions (and/or
# topics), we can send all of those in a single request.
# For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}}
leader2partitions = defaultdict(list)
valid_partitions = set()
for topic in response.topics:
for partition in topic[PARTITIONS_INFO]:
t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX])
for topic in response.get("topics", ()):
for partition in topic.get("partitions", ()):
t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"])
if t2p in partitions:
leader2partitions[partition[LEADER]].append(t2p)
leader2partitions[partition["leader"]].append(t2p)
valid_partitions.add(t2p)

if len(partitions) != len(valid_partitions):
unknown = set(partitions) - valid_partitions
raise UnknownTopicOrPartitionError(
"The following partitions are not known: %s" % ", "
.join(str(x) for x in unknown)
"The following partitions are not known: %s"
% ", ".join(str(x) for x in unknown)
)

return leader2partitions
Expand All @@ -1029,14 +1006,20 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
config.
:param partition_leader_id: ``str``: If specified, all deletion requests will be sent to
this node. No check is performed verifying that this is indeed the leader for all
listed partitions, use with caution.
listed partitions: use with caution.
:return: List of DeleteRecordsResponse
:return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker.
See DeleteRecordsResponse for possible fields. error_code for all partitions is
guaranteed to be zero, otherwise an exception is raised.
"""
timeout_ms = self._validate_timeout(timeout_ms)
responses = []
version = self._matching_api_version(DeleteRecordsRequest)

# We want to make as few requests as possible
# If a single node serves as a partition leader for multiple partitions (and/or
# topics), we can send all of those in a single request.
# For that we store {leader -> {partitions for leader}}, and do 1 request per leader
if partition_leader_id is None:
leader2partitions = self._get_leader_for_partitions(
set(records_to_delete), timeout_ms
Expand All @@ -1059,27 +1042,35 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
future = self._send_request_to_node(leader, request)
self._wait_for_futures([future])

response = future.value
responses.append(response)
responses.append(future.value.to_object())

partition2result = {}
partition2error = {}
for response in responses:
for topic in getattr(response, 'topics', ()):
for partition in getattr(topic, 'partitions', ()):
if getattr(partition, 'error_code', 0) != 0:
tp = TopicPartition(topic, partition['partition_index'])
partition2error[tp] = partition['error_code']
for topic in response["topics"]:
for partition in topic["partitions"]:
tp = TopicPartition(topic["name"], partition["partition_index"])
partition2result[tp] = partition
if partition["error_code"] != 0:
partition2error[tp] = partition["error_code"]

if partition2error:
if len(partition2error) == 1:
raise Errors.for_code(partition2error[0])()
key, error = next(iter(partition2error.items()))
raise Errors.for_code(error)(
"Error deleting records from topic %s partition %s" % (key.topic, key.partition)
)
else:
raise Errors.BrokerResponseError(
"The following errors occured when trying to delete records: "
", ".join("%s: %s" % (partition, error) for partition, error in partition2error.items())
"The following errors occured when trying to delete records: " +
", ".join(
"%s(partition=%d): %s" %
(partition.topic, partition.partition, Errors.for_code(error).__name__)
for partition, error in partition2error.items()
)
)

return responses
return partition2result

# create delegation token protocol not yet implemented
# Note: send the request to the least_loaded_node()
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
('throttle_time_ms', Int32),
('topics', Array(
('name', String('utf-8')),
('partitions', Array(
('partition_index', Int32),
('low_watermark', Int64),
('error_code', Int16))))),
('throttle_time_ms', Int32)
)


Expand Down
20 changes: 15 additions & 5 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
from kafka.errors import (
KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
GroupIdNotFoundError, UnknownTopicOrPartitionError)
BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError)


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
Expand Down Expand Up @@ -342,7 +342,11 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message
for _ in range(600):
next(consumer1)

kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000)
result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000)
assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition}
assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition}
assert result[t1p0] == {"low_watermark": 60, "error_code": 0, "partition_index": t1p0.partition}
assert result[t1p2] == {"low_watermark": 70, "error_code": 0, "partition_index": t1p2.partition}

consumer2 = kafka_consumer_factory(group_id=None, topics=())
consumer2.assign(partitions)
Expand All @@ -360,16 +364,22 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0")
def test_delete_records_with_errors(kafka_admin_client, topic):
def test_delete_records_with_errors(kafka_admin_client, topic, send_messages):
sleep(1) # sometimes the topic is not created yet...?
p0 = TopicPartition(topic, 0)
p1 = TopicPartition(topic, 1)
p2 = TopicPartition(topic, 2)
# verify that topic has been created
kafka_admin_client.delete_records({p0: 10}, timeout_ms=1000)
send_messages(range(0, 1), partition=p2.partition, topic=p2.topic)

with pytest.raises(UnknownTopicOrPartitionError):
kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1})
with pytest.raises(UnknownTopicOrPartitionError):
kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1})
with pytest.raises(OffsetOutOfRangeError):
kafka_admin_client.delete_records({p0: 1000})
with pytest.raises(BrokerResponseError):
kafka_admin_client.delete_records({p0: 1000, p1: 1000})



0 comments on commit d944b36

Please sign in to comment.