diff --git a/kafka/admin/client.py b/kafka/admin/client.py index b6b310702..fb5097aa3 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -973,49 +973,26 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): :return: Dictionary with ``{leader_id -> {partitions}}`` """ timeout_ms = self._validate_timeout(timeout_ms) - version = self._matching_api_version(MetadataRequest) partitions = set(partitions) - topics = set() - - for topic_partition in partitions: - topics.add(topic_partition.topic) - - request = MetadataRequest[version]( - topics=list(topics), - allow_auto_topic_creation=False - ) - - future = self._send_request_to_node(self._client.least_loaded_node(), request) - - self._wait_for_futures([future]) - response = future.value - - version = self._matching_api_version(DeleteRecordsRequest) + topics = set(tp.topic for tp in partitions) - PARTITIONS_INFO = 3 - NAME = 1 - PARTITION_INDEX = 1 - LEADER = 2 + response = self._get_cluster_metadata(topics=topics).to_object() - # We want to make as few requests as possible - # If a single node serves as a partition leader for multiple partitions (and/or - # topics), we can send all of those in a single request. - # For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}} leader2partitions = defaultdict(list) valid_partitions = set() - for topic in response.topics: - for partition in topic[PARTITIONS_INFO]: - t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) + for topic in response.get("topics", ()): + for partition in topic.get("partitions", ()): + t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"]) if t2p in partitions: - leader2partitions[partition[LEADER]].append(t2p) + leader2partitions[partition["leader"]].append(t2p) valid_partitions.add(t2p) if len(partitions) != len(valid_partitions): unknown = set(partitions) - valid_partitions raise UnknownTopicOrPartitionError( - "The following partitions are not known: %s" % ", " - .join(str(x) for x in unknown) + "The following partitions are not known: %s" + % ", ".join(str(x) for x in unknown) ) return leader2partitions @@ -1029,14 +1006,20 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id config. :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to this node. No check is performed verifying that this is indeed the leader for all - listed partitions, use with caution. + listed partitions: use with caution. - :return: List of DeleteRecordsResponse + :return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker. + See DeleteRecordsResponse for possible fields. error_code for all partitions is + guaranteed to be zero, otherwise an exception is raised. """ timeout_ms = self._validate_timeout(timeout_ms) responses = [] version = self._matching_api_version(DeleteRecordsRequest) + # We want to make as few requests as possible + # If a single node serves as a partition leader for multiple partitions (and/or + # topics), we can send all of those in a single request. + # For that we store {leader -> {partitions for leader}}, and do 1 request per leader if partition_leader_id is None: leader2partitions = self._get_leader_for_partitions( set(records_to_delete), timeout_ms @@ -1059,27 +1042,35 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id future = self._send_request_to_node(leader, request) self._wait_for_futures([future]) - response = future.value - responses.append(response) + responses.append(future.value.to_object()) + partition2result = {} partition2error = {} for response in responses: - for topic in getattr(response, 'topics', ()): - for partition in getattr(topic, 'partitions', ()): - if getattr(partition, 'error_code', 0) != 0: - tp = TopicPartition(topic, partition['partition_index']) - partition2error[tp] = partition['error_code'] + for topic in response["topics"]: + for partition in topic["partitions"]: + tp = TopicPartition(topic["name"], partition["partition_index"]) + partition2result[tp] = partition + if partition["error_code"] != 0: + partition2error[tp] = partition["error_code"] if partition2error: if len(partition2error) == 1: - raise Errors.for_code(partition2error[0])() + key, error = next(iter(partition2error.items())) + raise Errors.for_code(error)( + "Error deleting records from topic %s partition %s" % (key.topic, key.partition) + ) else: raise Errors.BrokerResponseError( - "The following errors occured when trying to delete records: " - ", ".join("%s: %s" % (partition, error) for partition, error in partition2error.items()) + "The following errors occured when trying to delete records: " + + ", ".join( + "%s(partition=%d): %s" % + (partition.topic, partition.partition, Errors.for_code(error).__name__) + for partition, error in partition2error.items() + ) ) - return responses + return partition2result # create delegation token protocol not yet implemented # Note: send the request to the least_loaded_node() diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 7c80972ae..d8d770d53 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -243,13 +243,13 @@ class DeleteRecordsResponse_v0(Response): API_KEY = 21 API_VERSION = 0 SCHEMA = Schema( + ('throttle_time_ms', Int32), ('topics', Array( ('name', String('utf-8')), ('partitions', Array( ('partition_index', Int32), ('low_watermark', Int64), ('error_code', Int16))))), - ('throttle_time_ms', Int32) ) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2de0be55b..add772427 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -9,8 +9,8 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) from kafka.errors import ( - KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, - GroupIdNotFoundError, UnknownTopicOrPartitionError) + BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, + GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -342,7 +342,11 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message for _ in range(600): next(consumer1) - kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) + result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) + assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition} + assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition} + assert result[t1p0] == {"low_watermark": 60, "error_code": 0, "partition_index": t1p0.partition} + assert result[t1p2] == {"low_watermark": 70, "error_code": 0, "partition_index": t1p2.partition} consumer2 = kafka_consumer_factory(group_id=None, topics=()) consumer2.assign(partitions) @@ -360,16 +364,22 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") -def test_delete_records_with_errors(kafka_admin_client, topic): +def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): sleep(1) # sometimes the topic is not created yet...? p0 = TopicPartition(topic, 0) + p1 = TopicPartition(topic, 1) + p2 = TopicPartition(topic, 2) # verify that topic has been created - kafka_admin_client.delete_records({p0: 10}, timeout_ms=1000) + send_messages(range(0, 1), partition=p2.partition, topic=p2.topic) with pytest.raises(UnknownTopicOrPartitionError): kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1}) with pytest.raises(UnknownTopicOrPartitionError): kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1}) + with pytest.raises(OffsetOutOfRangeError): + kafka_admin_client.delete_records({p0: 1000}) + with pytest.raises(BrokerResponseError): + kafka_admin_client.delete_records({p0: 1000, p1: 1000})